Repository: ignite Updated Branches: refs/heads/ignite-9213 [created] 9b1e34961
ignite-9213 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9b1e3496 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9b1e3496 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9b1e3496 Branch: refs/heads/ignite-9213 Commit: 9b1e349615feee3caf00b9ecd7c0d77abe035542 Parents: 41f769e Author: sboikov <[email protected]> Authored: Thu Dec 13 23:47:25 2018 +0300 Committer: sboikov <[email protected]> Committed: Thu Dec 13 23:50:52 2018 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 4 +- .../CacheLockReleaseNodeLeaveTest.java | 108 ++++++++++++++++++- 3 files changed, 107 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9b1e3496/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 96158d7..364950c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -763,7 +763,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana stopErr = cctx.kernalContext().clientDisconnected() ? new IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(), "Client node disconnected: " + cctx.igniteInstanceName()) : - new IgniteInterruptedCheckedException("Node is stopping: " + cctx.igniteInstanceName()); + new IgniteCheckedException("Node is stopping: " + cctx.igniteInstanceName()); // Stop exchange worker U.cancel(exchWorker); http://git-wip-us.apache.org/repos/asf/ignite/blob/9b1e3496/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 89e03a2..5b9ebf1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -843,7 +843,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte exchLog.info("Finished exchange init [topVer=" + topVer + ", crd=" + crdNode + ']'); } catch (IgniteInterruptedCheckedException e) { - onDone(e); + assert cctx.kernalContext().isStopping(); + + onDone(new IgniteCheckedException("Node stopped")); throw e; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9b1e3496/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLockReleaseNodeLeaveTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLockReleaseNodeLeaveTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLockReleaseNodeLeaveTest.java index 2d5f5fe..409512d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLockReleaseNodeLeaveTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLockReleaseNodeLeaveTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import org.apache.ignite.Ignite; @@ -144,8 +145,6 @@ public class CacheLockReleaseNodeLeaveTest extends GridCommonAbstractTest { */ @Test public void testLockTopologyChange() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-9213"); - final int nodeCnt = 5; int threadCnt = 8; final int keys = 100; @@ -167,7 +166,12 @@ public class CacheLockReleaseNodeLeaveTest extends GridCommonAbstractTest { Lock lock = cache.lock(i); lock.lock(); - cache.put(i, i); + try { + cache.put(i, i); + } + finally { + lock.unlock(); + } lock.unlock(); } @@ -184,8 +188,102 @@ public class CacheLockReleaseNodeLeaveTest extends GridCommonAbstractTest { IgniteInternalFuture<Long> f; - while ((f = q.poll()) != null) - f.get(2_000); + Exception err = null; + + while ((f = q.poll()) != null) { + try { + f.get(60_000); + } + catch (Exception e) { + error("Test operation failed: " + e, e); + + if (err == null) + err = e; + } + } + + if (err != null) + fail("Test operation failed, see log for details"); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + @Test + public void testLockNodeStop() throws Exception { + final int nodeCnt = 3; + int threadCnt = 2; + final int keys = 100; + + try { + final AtomicBoolean stop = new AtomicBoolean(false); + + Queue<IgniteInternalFuture<Long>> q = new ArrayDeque<>(nodeCnt); + + for (int i = 0; i < nodeCnt; i++) { + final Ignite ignite = startGrid(i); + + IgniteInternalFuture<Long> f = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + while (!Thread.currentThread().isInterrupted() && !stop.get()) { + try { + IgniteCache<Integer, Integer> cache = ignite.cache(REPLICATED_TEST_CACHE); + + for (int i = 0; i < keys; i++) { + Lock lock = cache.lock(i); + lock.lock(); + + try { + cache.put(i, i); + } + finally { + lock.unlock(); + } + } + } + catch (Exception e) { + log.info("Ignore error: " + e); + + break; + } + } + } + }, threadCnt, "test-lock-thread"); + + q.add(f); + + U.sleep(1_000); + } + + U.sleep(ThreadLocalRandom.current().nextLong(500) + 500); + + // Stop all nodes, check that threads executing cache operations do not hang. + stopAllGrids(); + + stop.set(true); + + IgniteInternalFuture<Long> f; + + Exception err = null; + + while ((f = q.poll()) != null) { + try { + f.get(60_000); + } + catch (Exception e) { + error("Test operation failed: " + e, e); + + if (err == null) + err = e; + } + } + + if (err != null) + fail("Test operation failed, see log for details"); } finally { stopAllGrids();
