IGNITE-9013 Fail cache future when local node is stopping - Fixes #4369.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/85b20027
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/85b20027
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/85b20027

Branch: refs/heads/ignite-8783
Commit: 85b2002796fb601d7e7ce7d7320943f9323c2bdd
Parents: 66e547a
Author: EdShangGG <[email protected]>
Authored: Tue Jul 17 18:04:38 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Tue Jul 17 18:04:38 2018 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 14 ++++----
 .../distributed/dht/GridDhtTxPrepareFuture.java | 22 ++++--------
 .../service/IgniteServiceReassignmentTest.java  | 38 ++++++++++++++++++++
 3 files changed, 52 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/85b20027/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 1f0d270..4b8644e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2949,12 +2949,14 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             @Override public V op(GridNearTxLocal tx) throws 
IgniteCheckedException {
                 K key0 = keepBinary ? (K)ctx.toCacheKeyObject(key) : key;
 
-                V ret = tx.removeAllAsync(ctx,
-                    null,
-                    Collections.singletonList(key0),
-                    /*retval*/true,
-                    null,
-                    /*singleRmv*/false).get().value();
+                IgniteInternalFuture<GridCacheReturn> fut = 
tx.removeAllAsync(ctx,
+                        null,
+                        Collections.singletonList(key0),
+                        /*retval*/true,
+                        null,
+                        /*singleRmv*/false);
+
+                V ret = fut.get().value();
 
                 if (ctx.config().getInterceptor() != null) {
                     K key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key0, 
true, false) : key0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/85b20027/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 622774d..0beff6c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1385,23 +1385,13 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
                 fut.onNodeLeft();
             }
             catch (IgniteCheckedException e) {
-                if (!cctx.kernalContext().isStopping()) {
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("DHT prepare fut, failed to send request 
dht [txId=" + tx.nearXidVersion() +
-                            ", dhtTxId=" + tx.xidVersion() +
-                            ", node=" + n.id() + ']');
-                    }
-
-                    fut.onResult(e);
-                }
-                else {
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("DHT prepare fut, failed to send request 
dht, ignore [txId=" + tx.nearXidVersion() +
-                            ", dhtTxId=" + tx.xidVersion() +
-                            ", node=" + n.id() +
-                            ", err=" + e + ']');
-                    }
+                if (msgLog.isDebugEnabled()) {
+                    msgLog.debug("DHT prepare fut, failed to send request dht 
[txId=" + tx.nearXidVersion() +
+                        ", dhtTxId=" + tx.xidVersion() +
+                        ", node=" + n.id() + ']');
                 }
+
+                fut.onResult(e);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/85b20027/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java
index 865f121..e74b27d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceReassignmentTest.java
@@ -244,6 +244,44 @@ public class IgniteServiceReassignmentTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testNodeStopWhileThereAreCacheActivitiesInServiceProcessor() 
throws Exception {
+        final int nodesCnt = 2;
+        final int maxSvc = 1024;
+
+        startGridsMultiThreaded(nodesCnt);
+
+        IgniteEx ignite = grid(0);
+
+        IgniteInternalCache<GridServiceAssignmentsKey, Object> sysCache = 
ignite.utilityCache();
+
+        // Adding some assignments without deployments.
+        for (int i = 0; i < maxSvc; i++) {
+            String name = "svc-" + i;
+
+            ServiceConfiguration svcCfg = new ServiceConfiguration();
+
+            svcCfg.setName(name);
+
+            GridServiceAssignmentsKey key = new 
GridServiceAssignmentsKey(name);
+
+            UUID nodeId = grid(i % nodesCnt).localNode().id();
+
+            sysCache.put(key, new GridServiceAssignments(svcCfg, nodeId, 
ignite.cluster().topologyVersion()));
+        }
+
+        // Simulate exchange with merge.
+        GridTestUtils.runAsync(() -> startGrid(nodesCnt));
+        GridTestUtils.runAsync(() -> startGrid(nodesCnt + 1));
+        startGrid(nodesCnt + 2);
+
+        Thread.sleep((int)(1000 * ThreadLocalRandom.current().nextDouble()));
+
+        stopAllGrids();
+    }
+
+    /**
      * @param node Node.
      * @throws Exception If failed.
      */

Reply via email to