IGNITE-5195 DataStreamer can fails if non-data node enter\leave the grid. This 
closes #3026.

Signed-off-by: nikolay_tikhonov <ntikho...@gridgain.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5fb04be3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5fb04be3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5fb04be3

Branch: refs/heads/ignite-zk
Commit: 5fb04be3942d8a90288fdc75f86693872c918e2d
Parents: 43aa4a8
Author: nikolay_tikhonov <ntikho...@gridgain.com>
Authored: Mon Nov 20 18:37:44 2017 +0300
Committer: nikolay_tikhonov <ntikho...@gridgain.com>
Committed: Mon Nov 20 18:39:03 2017 +0300

----------------------------------------------------------------------
 .../cache/GridCacheAffinityManager.java         |  7 ++-
 .../datastreamer/DataStreamerImpl.java          | 20 ++++++-
 .../cache/IgniteCacheDynamicStopSelfTest.java   |  2 +-
 .../datastreamer/DataStreamerImplSelfTest.java  | 60 ++++++++++++++++++++
 4 files changed, 85 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5fb04be3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 14a1344..c9ee38c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -123,7 +123,12 @@ public class GridCacheAffinityManager extends 
GridCacheManagerAdapter {
         if (cctx.isLocal())
             topVer = LOC_CACHE_TOP_VER;
 
-        return aff.assignments(topVer);
+        GridAffinityAssignmentCache aff0 = aff;
+
+        if (aff0 == null)
+            throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + 
cctx.name());
+
+        return aff0.assignments(topVer);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fb04be3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index d38132f..12eb2dc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -779,6 +779,8 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
             else
                 topVer = 
ctx.cache().context().exchange().readyAffinityVersion();
 
+            List<List<ClusterNode>> assignments = 
cctx.affinity().assignments(topVer);
+
             if (!allowOverwrite() && !cctx.isLocal()) { // Cases where cctx 
required.
                 gate = cctx.gate();
 
@@ -956,7 +958,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                     final List<GridFutureAdapter<?>> futs;
 
                     try {
-                        futs = buf.update(entriesForNode, topVer, opFut, 
remap);
+                        futs = buf.update(entriesForNode, topVer, assignments, 
opFut, remap);
 
                         opFut.markInitialized();
                     }
@@ -1411,6 +1413,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
         @Nullable List<GridFutureAdapter<?>> update(
             Iterable<DataStreamerEntry> newEntries,
             AffinityTopologyVersion topVer,
+            List<List<ClusterNode>> assignments,
             GridCompoundFuture opFut,
             boolean remap
         ) throws IgniteInterruptedCheckedException {
@@ -1441,9 +1444,19 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                         futs[b.partId] = curFut0;
                     }
 
-                    if (b.batchTopVer == null)
+                    if (b.batchTopVer == null) {
+                        b.batchTopVer = topVer;
+
+                        b.assignments = assignments;
+                    }
+
+                    // topology changed, but affinity is the same, no re-map 
is required.
+                    if (!topVer.equals(b.batchTopVer) && 
b.assignments.equals(assignments)) {
                         b.batchTopVer = topVer;
 
+                        b.assignments = assignments;
+                    }
+
                     curBatchTopVer = b.batchTopVer;
 
                     b.entries.add(entry);
@@ -2186,6 +2199,9 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
         /** */
         private final IgniteInClosure<? super IgniteInternalFuture<Object>> 
signalC;
 
+        /** Batch assignments */
+        public List<List<ClusterNode>> assignments;
+
         /**
          * @param partId Partition ID.
          * @param c Signal closure.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fb04be3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
index 5628c4d..44cd475 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
@@ -142,4 +142,4 @@ public class IgniteCacheDynamicStopSelfTest extends 
GridCommonAbstractTest {
 
         ignite(0).destroyCache(DEFAULT_CACHE_NAME);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fb04be3/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
index e90f6b0..940f8ce 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.CacheException;
@@ -38,6 +39,7 @@ import org.apache.ignite.cache.CacheServerNotFoundException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -431,6 +433,64 @@ public class DataStreamerImplSelfTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testClientEventsNotCausingRemaps() throws Exception {
+        Ignite ignite = startGrids(2);
+
+        ignite.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        IgniteDataStreamer<Object, Object> streamer = 
ignite.dataStreamer(DEFAULT_CACHE_NAME);
+
+        ((DataStreamerImpl)streamer).maxRemapCount(3);
+
+        streamer.addData(1, 1);
+
+        for (int topChanges = 0; topChanges < 30; topChanges++) {
+            IgniteEx node = 
startGrid(getConfiguration("flapping-client").setClientMode(true));
+
+            streamer.addData(1, 1);
+
+            node.close();
+
+            streamer.addData(1, 1);
+        }
+
+        streamer.flush();
+        streamer.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServerEventsCauseRemaps() throws Exception {
+        Ignite ignite = startGrids(2);
+
+        ignite.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        IgniteDataStreamer<Object, Object> streamer = 
ignite.dataStreamer(DEFAULT_CACHE_NAME);
+
+        ((DataStreamerImpl)streamer).maxRemapCount(0);
+
+        streamer.addData(1, 1);
+
+        startGrid(2);
+
+        try {
+            streamer.addData(1, 1);
+
+            streamer.flush();
+        }
+        catch (IllegalStateException ex) {
+            assert ex.getMessage().contains("Data streamer has been closed");
+
+            return;
+        }
+
+        fail("Expected exception wasn't thrown");
+    }
+
+    /**
      * Gets cache configuration.
      *
      * @return Cache configuration.

Reply via email to