IGNITE-9013 Fail cache future when local node is stopping - Fixes #4369.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/85b20027 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/85b20027 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/85b20027 Branch: refs/heads/ignite-8783 Commit: 85b2002796fb601d7e7ce7d7320943f9323c2bdd Parents: 66e547a Author: EdShangGG <[email protected]> Authored: Tue Jul 17 18:04:38 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Jul 17 18:04:38 2018 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 14 ++++---- .../distributed/dht/GridDhtTxPrepareFuture.java | 22 ++++-------- .../service/IgniteServiceReassignmentTest.java | 38 ++++++++++++++++++++ 3 files changed, 52 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/85b20027/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 1f0d270..4b8644e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2949,12 +2949,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V @Override public V op(GridNearTxLocal tx) throws IgniteCheckedException { K key0 = keepBinary ? (K)ctx.toCacheKeyObject(key) : key; - V ret = tx.removeAllAsync(ctx, - null, - Collections.singletonList(key0), - /*retval*/true, - null, - /*singleRmv*/false).get().value(); + IgniteInternalFuture<GridCacheReturn> fut = tx.removeAllAsync(ctx, + null, + Collections.singletonList(key0), + /*retval*/true, + null, + /*singleRmv*/false); + + V ret = fut.get().value(); if (ctx.config().getInterceptor() != null) { K key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key0, true, false) : key0; http://git-wip-us.apache.org/repos/asf/ignite/blob/85b20027/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 622774d..0beff6c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1385,23 +1385,13 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite fut.onNodeLeft(); } catch (IgniteCheckedException e) { - if (!cctx.kernalContext().isStopping()) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + n.id() + ']'); - } - - fut.onResult(e); - } - else { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, failed to send request dht, ignore [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + n.id() + - ", err=" + e + ']'); - } + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + ']'); } + + fut.onResult(e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/85b20027/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java index 865f121..e74b27d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java @@ -244,6 +244,44 @@ public class IgniteServiceReassignmentTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testNodeStopWhileThereAreCacheActivitiesInServiceProcessor() throws Exception { + final int nodesCnt = 2; + final int maxSvc = 1024; + + startGridsMultiThreaded(nodesCnt); + + IgniteEx ignite = grid(0); + + IgniteInternalCache<GridServiceAssignmentsKey, Object> sysCache = ignite.utilityCache(); + + // Adding some assignments without deployments. + for (int i = 0; i < maxSvc; i++) { + String name = "svc-" + i; + + ServiceConfiguration svcCfg = new ServiceConfiguration(); + + svcCfg.setName(name); + + GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(name); + + UUID nodeId = grid(i % nodesCnt).localNode().id(); + + sysCache.put(key, new GridServiceAssignments(svcCfg, nodeId, ignite.cluster().topologyVersion())); + } + + // Simulate exchange with merge. + GridTestUtils.runAsync(() -> startGrid(nodesCnt)); + GridTestUtils.runAsync(() -> startGrid(nodesCnt + 1)); + startGrid(nodesCnt + 2); + + Thread.sleep((int)(1000 * ThreadLocalRandom.current().nextDouble())); + + stopAllGrids(); + } + + /** * @param node Node. * @throws Exception If failed. */
