IGNITE-5452: GridTimeoutProcessor can hang on stop. This closes #2279.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b95c261f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b95c261f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b95c261f

Branch: refs/heads/ignite-5578-locJoin
Commit: b95c261f0b1376e8523dd1d89f253a5874dbf63b
Parents: 07cc05f
Author: Andrey V. Mashenkov <andrey.mashen...@gmail.com>
Authored: Fri Jul 14 20:14:47 2017 +0300
Committer: Andrey V. Mashenkov <andrey.mashen...@gmail.com>
Committed: Fri Jul 14 20:14:47 2017 +0300

----------------------------------------------------------------------
 .../timeout/GridTimeoutProcessor.java           |  18 +-
 .../IgniteTxRemoveTimeoutObjectsTest.java       | 194 +++++++++++++++++++
 .../timeout/GridTimeoutProcessorSelfTest.java   |  68 +++++--
 .../testsuites/IgniteCacheTestSuite3.java       |   4 +-
 4 files changed, 265 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b95c261f/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index 9deca9a..8c71f76 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -37,7 +37,7 @@ import org.apache.ignite.thread.IgniteThread;
  */
 public class GridTimeoutProcessor extends GridProcessorAdapter {
     /** */
-    private final IgniteThread timeoutWorker;
+    private final TimeoutWorker timeoutWorker;
 
     /** Time-based sorted set for timeout objects. */
     private final GridConcurrentSkipListSet<GridTimeoutObject> timeoutObjs =
@@ -62,13 +62,12 @@ public class GridTimeoutProcessor extends 
GridProcessorAdapter {
     public GridTimeoutProcessor(GridKernalContext ctx) {
         super(ctx);
 
-        timeoutWorker = new IgniteThread(ctx.config().getIgniteInstanceName(), 
"grid-timeout-worker",
-            new TimeoutWorker());
+        timeoutWorker = new TimeoutWorker();
     }
 
     /** {@inheritDoc} */
     @Override public void start() {
-        timeoutWorker.start();
+        new IgniteThread(timeoutWorker).start();
 
         if (log.isDebugEnabled())
             log.debug("Timeout processor started.");
@@ -76,7 +75,7 @@ public class GridTimeoutProcessor extends 
GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void stop(boolean cancel) throws IgniteCheckedException {
-        U.interrupt(timeoutWorker);
+        timeoutWorker.cancel();
         U.join(timeoutWorker);
 
         if (log.isDebugEnabled())
@@ -159,6 +158,13 @@ public class GridTimeoutProcessor extends 
GridProcessorAdapter {
                             timeoutObj.onTimeout();
                         }
                         catch (Throwable e) {
+                            if (isCancelled() && !(e instanceof Error)){
+                                if (log.isDebugEnabled())
+                                    log.debug("Error when executing timeout 
callback: " + timeoutObj);
+
+                                return;
+                            }
+
                             U.error(log, "Error when executing timeout 
callback: " + timeoutObj, e);
 
                             if (e instanceof Error)
@@ -170,7 +176,7 @@ public class GridTimeoutProcessor extends 
GridProcessorAdapter {
                 }
 
                 synchronized (mux) {
-                    while (true) {
+                    while (!isCancelled()) {
                         // Access of the first element must be inside of
                         // synchronization block, so we don't miss out
                         // on thread notification events sent from

http://git-wip-us.apache.org/repos/asf/ignite/blob/b95c261f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxRemoveTimeoutObjectsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxRemoveTimeoutObjectsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxRemoveTimeoutObjectsTest.java
new file mode 100644
index 0000000..c0f9940
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxRemoveTimeoutObjectsTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ * Test correctness of rollback a transaction with timeout during the grid 
stop.
+ */
+public class IgniteTxRemoveTimeoutObjectsTest extends 
GridCacheAbstractSelfTest {
+    /** */
+    private static final int PUT_CNT = 1000;
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 60_000;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxRemoveTimeoutObjects() throws Exception {
+        IgniteCache<Integer, Integer> cache0 = 
grid(0).cache(DEFAULT_CACHE_NAME);
+        IgniteCache<Integer, Integer> cache1 = 
grid(1).cache(DEFAULT_CACHE_NAME);
+
+        // start additional grid to be closed.
+        IgniteCache<Integer, Integer> cacheAdditional = 
startGrid(gridCount()).cache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < PUT_CNT; i++)
+            cache0.put(i, Integer.MAX_VALUE);
+
+        logTimeoutObjectsFrequency();
+
+        info("Tx1 started");
+        try (Transaction tx = 
grid(gridCount()).transactions().txStart(PESSIMISTIC, SERIALIZABLE, 100, 
PUT_CNT)) {
+            try {
+                for (int i = 0; i < PUT_CNT; i++) {
+                    cacheAdditional.put(i, Integer.MIN_VALUE);
+
+                    if (i % 100 == 0)
+                        logTimeoutObjectsFrequency();
+                }
+
+                U.sleep(200);
+
+                tx.commit();
+
+                fail("A timeout should have happened.");
+            }
+            catch (Exception e) {
+                assertTrue(X.hasCause(e, TransactionTimeoutException.class));
+            }
+        }
+
+        assertDoesNotContainLockTimeoutObjects();
+
+        logTimeoutObjectsFrequency();
+
+        stopGrid(gridCount());
+
+        awaitPartitionMapExchange();
+
+        info("Grid2 closed.");
+
+        assertDoesNotContainLockTimeoutObjects();
+
+        logTimeoutObjectsFrequency();
+
+        // Check that the values have not changed and lock can be acquired.
+        try (Transaction tx2 = grid(1).transactions().txStart(PESSIMISTIC, 
SERIALIZABLE)) {
+            info("Tx2 started");
+
+            for (int i = 0; i < PUT_CNT; i++) {
+                assertEquals(cache1.get(i).intValue(), Integer.MAX_VALUE);
+                cache1.put(i, i);
+
+                if (i % (PUT_CNT / 5) == 0)
+                    logTimeoutObjectsFrequency();
+            }
+
+            tx2.commit();
+        }
+
+        info("Tx2 stopped");
+
+        // Check that that changes committed.
+        for (int i = 0; i < PUT_CNT; i++)
+            assertEquals(cache0.get(i).intValue(), i);
+    }
+
+    /**
+     * Fails if at least one grid contains LockTimeoutObjects.
+     */
+    private void assertDoesNotContainLockTimeoutObjects() {
+        for (Ignite ignite : G.allGrids()) {
+            for (GridTimeoutObject object : 
getTimeoutObjects((IgniteEx)ignite)) {
+                if 
(object.getClass().getSimpleName().equals("LockTimeoutObject"))
+                    fail("Grids contain LockTimeoutObjects.");
+            }
+        }
+    }
+
+    /**
+     * Print the number of each timeout object type on each grid to the log.
+     */
+    private void logTimeoutObjectsFrequency() {
+        StringBuilder sb = new StringBuilder("Timeout objects frequency [");
+
+        for (Ignite ignite : G.allGrids()) {
+            IgniteEx igniteEx = (IgniteEx)ignite;
+
+            Map<String, Integer> objFreqMap = new HashMap<>();
+
+            Set<GridTimeoutObject> objs = getTimeoutObjects(igniteEx);
+
+            for (GridTimeoutObject obj : objs) {
+                String clsName = obj.getClass().getSimpleName();
+
+                Integer cnt = objFreqMap.get(clsName);
+
+                if (cnt == null)
+                    objFreqMap.put(clsName, 1);
+                else
+                    objFreqMap.put(clsName, cnt + 1);
+            }
+
+            sb.append("[")
+                .append(igniteEx.name()).append(": size=")
+                .append(objs.size()).append(", ");
+
+            for (Map.Entry<String, Integer> entry : objFreqMap.entrySet()) {
+                sb.append(entry.getKey()).append("=")
+                    .append(entry.getValue())
+                    .append(", ");
+            }
+
+            sb.delete(sb.length() - 2, sb.length())
+                .append("]; ");
+        }
+
+        sb.delete(sb.length() - 2, sb.length())
+            .append("]");
+
+        info(sb.toString()
+            .replaceAll("distributed.IgniteTxRollbackOnStopTest", "Grid"));
+    }
+
+    /**
+     * @param igniteEx IgniteEx.
+     * @return Set of timeout objects that process on current IgniteEx.
+     */
+    private Set<GridTimeoutObject> getTimeoutObjects(IgniteEx igniteEx) {
+        GridTimeoutProcessor timeout = igniteEx.context().timeout();
+
+        return GridTestUtils.getFieldValue(timeout, timeout.getClass(), 
"timeoutObjs");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b95c261f/modules/core/src/test/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessorSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessorSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessorSelfTest.java
index eb248cf..606b102 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessorSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessorSelfTest.java
@@ -41,6 +41,11 @@ public class GridTimeoutProcessorSelfTest extends 
GridCommonAbstractTest {
     private GridTestKernalContext ctx;
 
     /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 60_000;
+    }
+
+    /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         ctx = newContext();
 
@@ -84,7 +89,9 @@ public class GridTimeoutProcessorSelfTest extends 
GridCommonAbstractTest {
                 }
 
                 /** {@inheritDoc} */
-                @Override public long endTime() { return endTime; }
+                @Override public long endTime() {
+                    return endTime;
+                }
 
                 /** {@inheritDoc} */
                 @Override public void onTimeout() {
@@ -152,10 +159,14 @@ public class GridTimeoutProcessorSelfTest extends 
GridCommonAbstractTest {
                         private final long endTime = 
System.currentTimeMillis() + RAND.nextInt(1000) + 500;
 
                         /** {@inheritDoc} */
-                        @Override public IgniteUuid timeoutId() { return id; }
+                        @Override public IgniteUuid timeoutId() {
+                            return id;
+                        }
 
                         /** {@inheritDoc} */
-                        @Override public long endTime() { return endTime; }
+                        @Override public long endTime() {
+                            return endTime;
+                        }
 
                         /** {@inheritDoc} */
                         @Override public void onTimeout() {
@@ -307,9 +318,8 @@ public class GridTimeoutProcessorSelfTest extends 
GridCommonAbstractTest {
         assert timeObjs.size() == max;
 
         // Remove timeout objects so that they aren't able to times out 
(supposing the cycle takes less than 500 ms).
-        for (GridTimeoutObject obj : timeObjs) {
+        for (GridTimeoutObject obj : timeObjs)
             ctx.timeout().removeTimeoutObject(obj);
-        }
 
         Thread.sleep(1000);
 
@@ -350,7 +360,9 @@ public class GridTimeoutProcessorSelfTest extends 
GridCommonAbstractTest {
                         }
 
                         /** {@inheritDoc} */
-                        @Override public long endTime() { return endTime; }
+                        @Override public long endTime() {
+                            return endTime;
+                        }
 
                         /** {@inheritDoc} */
                         @Override public void onTimeout() {
@@ -370,9 +382,8 @@ public class GridTimeoutProcessorSelfTest extends 
GridCommonAbstractTest {
 
                 // Remove timeout objects so that they aren't able to times out
                 // (supposing the cycle takes less than 500 ms).
-                for (GridTimeoutObject obj : timeObjs) {
+                for (GridTimeoutObject obj : timeObjs)
                     ctx.timeout().removeTimeoutObject(obj);
-                }
             }
         }, threads, "timeout-test-worker");
 
@@ -381,6 +392,9 @@ public class GridTimeoutProcessorSelfTest extends 
GridCommonAbstractTest {
         assert callCnt.get() == 0;
     }
 
+    /**
+     * @throws Exception If test failed.
+     */
     public void testAddRemoveInterleaving() throws Exception {
         final AtomicInteger callCnt = new AtomicInteger(0);
 
@@ -430,9 +444,8 @@ public class GridTimeoutProcessorSelfTest extends 
GridCommonAbstractTest {
 
                 // Remove timeout objects so that they aren't able to times out
                 // (supposing the cycle takes less than 500 ms).
-                for (GridTimeoutObject obj : timeObjs) {
+                for (GridTimeoutObject obj : timeObjs)
                     ctx.timeout().removeTimeoutObject(obj);
-                }
             }
         }, 100, "timeout-test-worker");
 
@@ -516,10 +529,14 @@ public class GridTimeoutProcessorSelfTest extends 
GridCommonAbstractTest {
             private int cnt;
 
             /** {@inheritDoc} */
-            @Override public IgniteUuid timeoutId() { return id; }
+            @Override public IgniteUuid timeoutId() {
+                return id;
+            }
 
             /** {@inheritDoc} */
-            @Override public long endTime() { return endTime; }
+            @Override public long endTime() {
+                return endTime;
+            }
 
             /** {@inheritDoc} */
             @Override public void onTimeout() {
@@ -608,4 +625,31 @@ public class GridTimeoutProcessorSelfTest extends 
GridCommonAbstractTest {
 
         assert latch.await(3000, MILLISECONDS);
     }
+
+    /**
+     * Test that eaten {@link InterruptedException} will not hang on the 
closing of the grid.
+     *
+     * @throws Exception If test failed.
+     */
+    public void testCancelingWithClearedInterruptedFlag() throws Exception {
+        final CountDownLatch onTimeoutCalled = new CountDownLatch(1);
+
+        ctx.timeout().addTimeoutObject(new GridTimeoutObjectAdapter(10) {
+            /** {@inheritDoc} */
+            @Override public void onTimeout() {
+                try {
+                    onTimeoutCalled.countDown();
+
+                    // Wait for CacheProcessor has stopped and cause 
InterruptedException
+                    // which clears interrupted flag.
+                    Thread.sleep(Long.MAX_VALUE);
+                }
+                catch (InterruptedException ignore) {
+                    // No-op.
+                }
+            }
+        });
+
+        onTimeoutCalled.await();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b95c261f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index 58e9dc3..a6be07e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -36,9 +36,9 @@ import 
org.apache.ignite.internal.processors.cache.IgniteCacheGroupsTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheInterceptorSelfTestSuite;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheScanPredicateDeploymentSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheAsyncOperationsTest;
-import 
org.apache.ignite.internal.processors.cache.distributed.CacheGroupsPreloadTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheMixedModeSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteTxGetAfterStopTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.IgniteTxRemoveTimeoutObjectsTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDaemonNodePartitionedSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedOnlyP2PDisabledByteArrayValuesSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedOnlyP2PEnabledByteArrayValuesSelfTest;
@@ -199,6 +199,8 @@ public class IgniteCacheTestSuite3 extends TestSuite {
 
         suite.addTestSuite(CacheAsyncOperationsTest.class);
 
+        suite.addTestSuite(IgniteTxRemoveTimeoutObjectsTest.class);
+
         return suite;
     }
 }

Reply via email to