Repository: ignite
Updated Branches:
  refs/heads/master eca5c5a41 -> 6a8a2ff82


IGNITE-8564 Fixed DataStreamer reconnect to a restarted cluster - Fixes #4296.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a8a2ff8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a8a2ff8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a8a2ff8

Branch: refs/heads/master
Commit: 6a8a2ff82e5e04877ecd74336e33690439dc65a5
Parents: eca5c5a
Author: Ilya Kasnacheev <ilya.kasnach...@gmail.com>
Authored: Fri Jul 13 20:36:21 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Fri Jul 13 20:36:21 2018 +0300

----------------------------------------------------------------------
 .../affinity/GridAffinityProcessor.java         |  43 +++---
 ...rClientReconnectAfterClusterRestartTest.java | 139 +++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   2 +
 3 files changed, 164 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6a8a2ff8/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index e26c0ce..08333c3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
@@ -67,7 +68,6 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
 import static 
org.apache.ignite.internal.processors.affinity.GridAffinityUtils.affinityJob;
@@ -97,7 +97,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
         @Override public void onEvent(Event evt) {
             int evtType = evt.type();
 
-            assert evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT || 
evtType == EVT_NODE_JOINED;
+            assert evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT;
 
             if (affMap.isEmpty())
                 return; // Skip empty affinity map.
@@ -105,26 +105,24 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             final DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
 
             // Clean up affinity functions if such cache no more exists.
-            if (evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) {
-                final Collection<String> caches = ctx.cache().cacheNames();
+            final Collection<String> caches = ctx.cache().cacheNames();
 
-                final Collection<AffinityAssignmentKey> rmv = new HashSet<>();
+            final Collection<AffinityAssignmentKey> rmv = new HashSet<>();
 
-                for (AffinityAssignmentKey key : affMap.keySet()) {
-                    if (!caches.contains(key.cacheName) || 
key.topVer.topologyVersion() < discoEvt.topologyVersion() - 10)
-                        rmv.add(key);
-                }
+            for (AffinityAssignmentKey key : affMap.keySet()) {
+                if (!caches.contains(key.cacheName) || 
key.topVer.topologyVersion() < discoEvt.topologyVersion() - 10)
+                    rmv.add(key);
+            }
 
-                if (!rmv.isEmpty()) {
-                    ctx.timeout().addTimeoutObject(
-                        new GridTimeoutObjectAdapter(
-                            IgniteUuid.fromUuid(ctx.localNodeId()),
-                            AFFINITY_MAP_CLEAN_UP_DELAY) {
-                                @Override public void onTimeout() {
-                                    affMap.keySet().removeAll(rmv);
-                                }
-                            });
-                }
+            if (!rmv.isEmpty()) {
+                ctx.timeout().addTimeoutObject(
+                    new GridTimeoutObjectAdapter(
+                        IgniteUuid.fromUuid(ctx.localNodeId()),
+                        AFFINITY_MAP_CLEAN_UP_DELAY) {
+                            @Override public void onTimeout() {
+                                affMap.keySet().removeAll(rmv);
+                            }
+                        });
             }
         }
     };
@@ -140,7 +138,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
-        ctx.event().addLocalEventListener(lsnr, EVT_NODE_FAILED, 
EVT_NODE_LEFT, EVT_NODE_JOINED);
+        ctx.event().addLocalEventListener(lsnr, EVT_NODE_FAILED, 
EVT_NODE_LEFT);
     }
 
     /** {@inheritDoc} */
@@ -148,6 +146,11 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
         ctx.event().removeLocalEventListener(lsnr);
     }
 
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws 
IgniteCheckedException {
+        affMap.clear();
+    }
+
     /**
      * @param cacheName Cache name.
      * @param key Key.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a8a2ff8/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerClientReconnectAfterClusterRestartTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerClientReconnectAfterClusterRestartTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerClientReconnectAfterClusterRestartTest.java
new file mode 100644
index 0000000..239647c
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerClientReconnectAfterClusterRestartTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastreamer;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests DataStreamer reconnect behaviour when client nodes arrives at the 
same or different topVer than it left.
+ */
+public class DataStreamerClientReconnectAfterClusterRestartTest extends 
GridCommonAbstractTest {
+    /** */
+    public static final TcpDiscoveryVmIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean clientMode;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setCacheConfiguration(new CacheConfiguration<>("test"));
+
+        cfg.setClientMode(clientMode);
+
+        return cfg;
+    }
+
+    /** */
+    public void testOneClient() throws Exception {
+        clusterRestart(false, false);
+    }
+
+    /** */
+    public void testOneClientAllowOverwrite() throws Exception {
+        clusterRestart(false, true);
+    }
+
+    /** */
+    public void testTwoClients() throws Exception {
+        clusterRestart(true, false);
+    }
+
+    /** */
+    public void testTwoClientsAllowOverwrite() throws Exception {
+        clusterRestart(true, true);
+    }
+
+    /** */
+    private void clusterRestart(boolean withAnotherClient, boolean 
allowOverwrite) throws Exception {
+        CountDownLatch disconnect = new CountDownLatch(1);
+        CountDownLatch reconnect = new CountDownLatch(1);
+
+        try {
+            startGrid(0);
+
+            clientMode = true;
+
+            Ignite client = startGrid(1);
+
+            if (withAnotherClient) {
+                // Force increase of topVer
+                startGrid(2);
+
+                stopGrid(2);
+            }
+
+            clientMode = false;
+
+            try (IgniteDataStreamer<String, String> streamer = 
client.dataStreamer("test")) {
+                streamer.allowOverwrite(allowOverwrite);
+
+                streamer.addData("k1", "v1");
+            }
+
+            // Restart the cluster so that client reconnects to a new one
+            client.events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event event) {
+                    reconnect.countDown();
+
+                    return false;
+                }
+            }, EventType.EVT_CLIENT_NODE_RECONNECTED);
+
+            client.events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event event) {
+                    disconnect.countDown();
+
+                    return false;
+                }
+            }, EventType.EVT_CLIENT_NODE_DISCONNECTED);
+
+            stopGrid(0);
+
+            disconnect.await();
+
+            startGrid(0);
+
+            reconnect.await();
+
+            try (IgniteDataStreamer<String, String> streamer = 
client.dataStreamer("test")) {
+                streamer.allowOverwrite(allowOverwrite);
+
+                streamer.addData("k2", "v2");
+
+                return;
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a8a2ff8/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 1bf65e0..c607284 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -143,6 +143,7 @@ import 
org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxExcepti
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorExternalizableFailedTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorNonSerializableTest;
 import 
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest;
+import 
org.apache.ignite.internal.processors.datastreamer.DataStreamerClientReconnectAfterClusterRestartTest;
 import 
org.apache.ignite.internal.processors.datastreamer.DataStreamerImplSelfTest;
 import 
org.apache.ignite.internal.processors.datastreamer.DataStreamerMultiThreadedSelfTest;
 import 
org.apache.ignite.internal.processors.datastreamer.DataStreamerMultinodeCreateCacheTest;
@@ -253,6 +254,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(DataStreamerMultinodeCreateCacheTest.class);
         suite.addTestSuite(DataStreamerImplSelfTest.class);
         suite.addTestSuite(DataStreamerTimeoutTest.class);
+        
suite.addTestSuite(DataStreamerClientReconnectAfterClusterRestartTest.class);
         GridTestUtils.addTestIfNeeded(suite, 
GridCacheEntryMemorySizeSelfTest.class, ignoredTests);
         suite.addTestSuite(GridCacheClearAllSelfTest.class);
         suite.addTestSuite(GridCacheObjectToStringSelfTest.class);

Reply via email to