Repository: ignite Updated Branches: refs/heads/master eca5c5a41 -> 6a8a2ff82
IGNITE-8564 Fixed DataStreamer reconnect to a restarted cluster - Fixes #4296. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a8a2ff8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a8a2ff8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a8a2ff8 Branch: refs/heads/master Commit: 6a8a2ff82e5e04877ecd74336e33690439dc65a5 Parents: eca5c5a Author: Ilya Kasnacheev <ilya.kasnach...@gmail.com> Authored: Fri Jul 13 20:36:21 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Fri Jul 13 20:36:21 2018 +0300 ---------------------------------------------------------------------- .../affinity/GridAffinityProcessor.java | 43 +++--- ...rClientReconnectAfterClusterRestartTest.java | 139 +++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + 3 files changed, 164 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6a8a2ff8/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index e26c0ce..08333c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -60,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; @@ -67,7 +68,6 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; -import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST; import static org.apache.ignite.internal.processors.affinity.GridAffinityUtils.affinityJob; @@ -97,7 +97,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { @Override public void onEvent(Event evt) { int evtType = evt.type(); - assert evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT || evtType == EVT_NODE_JOINED; + assert evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT; if (affMap.isEmpty()) return; // Skip empty affinity map. @@ -105,26 +105,24 @@ public class GridAffinityProcessor extends GridProcessorAdapter { final DiscoveryEvent discoEvt = (DiscoveryEvent)evt; // Clean up affinity functions if such cache no more exists. - if (evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) { - final Collection<String> caches = ctx.cache().cacheNames(); + final Collection<String> caches = ctx.cache().cacheNames(); - final Collection<AffinityAssignmentKey> rmv = new HashSet<>(); + final Collection<AffinityAssignmentKey> rmv = new HashSet<>(); - for (AffinityAssignmentKey key : affMap.keySet()) { - if (!caches.contains(key.cacheName) || key.topVer.topologyVersion() < discoEvt.topologyVersion() - 10) - rmv.add(key); - } + for (AffinityAssignmentKey key : affMap.keySet()) { + if (!caches.contains(key.cacheName) || key.topVer.topologyVersion() < discoEvt.topologyVersion() - 10) + rmv.add(key); + } - if (!rmv.isEmpty()) { - ctx.timeout().addTimeoutObject( - new GridTimeoutObjectAdapter( - IgniteUuid.fromUuid(ctx.localNodeId()), - AFFINITY_MAP_CLEAN_UP_DELAY) { - @Override public void onTimeout() { - affMap.keySet().removeAll(rmv); - } - }); - } + if (!rmv.isEmpty()) { + ctx.timeout().addTimeoutObject( + new GridTimeoutObjectAdapter( + IgniteUuid.fromUuid(ctx.localNodeId()), + AFFINITY_MAP_CLEAN_UP_DELAY) { + @Override public void onTimeout() { + affMap.keySet().removeAll(rmv); + } + }); } } }; @@ -140,7 +138,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { - ctx.event().addLocalEventListener(lsnr, EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED); + ctx.event().addLocalEventListener(lsnr, EVT_NODE_FAILED, EVT_NODE_LEFT); } /** {@inheritDoc} */ @@ -148,6 +146,11 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ctx.event().removeLocalEventListener(lsnr); } + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { + affMap.clear(); + } + /** * @param cacheName Cache name. * @param key Key. http://git-wip-us.apache.org/repos/asf/ignite/blob/6a8a2ff8/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerClientReconnectAfterClusterRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerClientReconnectAfterClusterRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerClientReconnectAfterClusterRestartTest.java new file mode 100644 index 0000000..239647c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerClientReconnectAfterClusterRestartTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastreamer; + +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests DataStreamer reconnect behaviour when client nodes arrives at the same or different topVer than it left. + */ +public class DataStreamerClientReconnectAfterClusterRestartTest extends GridCommonAbstractTest { + /** */ + public static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean clientMode; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setCacheConfiguration(new CacheConfiguration<>("test")); + + cfg.setClientMode(clientMode); + + return cfg; + } + + /** */ + public void testOneClient() throws Exception { + clusterRestart(false, false); + } + + /** */ + public void testOneClientAllowOverwrite() throws Exception { + clusterRestart(false, true); + } + + /** */ + public void testTwoClients() throws Exception { + clusterRestart(true, false); + } + + /** */ + public void testTwoClientsAllowOverwrite() throws Exception { + clusterRestart(true, true); + } + + /** */ + private void clusterRestart(boolean withAnotherClient, boolean allowOverwrite) throws Exception { + CountDownLatch disconnect = new CountDownLatch(1); + CountDownLatch reconnect = new CountDownLatch(1); + + try { + startGrid(0); + + clientMode = true; + + Ignite client = startGrid(1); + + if (withAnotherClient) { + // Force increase of topVer + startGrid(2); + + stopGrid(2); + } + + clientMode = false; + + try (IgniteDataStreamer<String, String> streamer = client.dataStreamer("test")) { + streamer.allowOverwrite(allowOverwrite); + + streamer.addData("k1", "v1"); + } + + // Restart the cluster so that client reconnects to a new one + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event event) { + reconnect.countDown(); + + return false; + } + }, EventType.EVT_CLIENT_NODE_RECONNECTED); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event event) { + disconnect.countDown(); + + return false; + } + }, EventType.EVT_CLIENT_NODE_DISCONNECTED); + + stopGrid(0); + + disconnect.await(); + + startGrid(0); + + reconnect.await(); + + try (IgniteDataStreamer<String, String> streamer = client.dataStreamer("test")) { + streamer.allowOverwrite(allowOverwrite); + + streamer.addData("k2", "v2"); + + return; + } + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6a8a2ff8/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 1bf65e0..c607284 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -143,6 +143,7 @@ import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxExcepti import org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorExternalizableFailedTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorNonSerializableTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest; +import org.apache.ignite.internal.processors.datastreamer.DataStreamerClientReconnectAfterClusterRestartTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerImplSelfTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerMultiThreadedSelfTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerMultinodeCreateCacheTest; @@ -253,6 +254,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(DataStreamerMultinodeCreateCacheTest.class); suite.addTestSuite(DataStreamerImplSelfTest.class); suite.addTestSuite(DataStreamerTimeoutTest.class); + suite.addTestSuite(DataStreamerClientReconnectAfterClusterRestartTest.class); GridTestUtils.addTestIfNeeded(suite, GridCacheEntryMemorySizeSelfTest.class, ignoredTests); suite.addTestSuite(GridCacheClearAllSelfTest.class); suite.addTestSuite(GridCacheObjectToStringSelfTest.class);