client join race (cherry picked from commit 682ca8e)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/925370c3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/925370c3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/925370c3 Branch: refs/heads/ignite-5398 Commit: 925370c3c6c587870e19f31f8660139884847e06 Parents: 58b6e05 Author: sboikov <[email protected]> Authored: Sat May 6 12:15:21 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed May 10 12:00:04 2017 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ClientImpl.java | 23 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 2 + .../spi/discovery/tcp/TcpDiscoverySpi.java | 4 + .../cache/distributed/CacheStartOnJoinTest.java | 242 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite4.java | 2 + 5 files changed, 270 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/925370c3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 334812c..3edf860 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -29,6 +29,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -130,6 +131,9 @@ class ClientImpl extends TcpDiscoveryImpl { /** Remote nodes. */ private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>(); + /** */ + private final LinkedHashMap<UUID, Map<Integer, byte[]>> delayDiscoData = new LinkedHashMap<>(); + /** Topology history. */ private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>(); @@ -1479,6 +1483,8 @@ class ClientImpl extends TcpDiscoveryImpl { nodeAdded = false; + delayDiscoData.clear(); + IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " + "client node disconnected."); @@ -1729,9 +1735,12 @@ class ClientImpl extends TcpDiscoveryImpl { Map<Integer, byte[]> data = msg.newNodeDiscoveryData(); - if (data != null) - spi.onExchange(newNodeId, newNodeId, data, - U.resolveClassLoader(spi.ignite().configuration())); + if (data != null) { + if (joining()) + delayDiscoData.put(newNodeId, data); + else + spi.onExchange(newNodeId, newNodeId, data, U.resolveClassLoader(spi.ignite().configuration())); + } } } else { @@ -1758,6 +1767,14 @@ class ClientImpl extends TcpDiscoveryImpl { U.resolveClassLoader(spi.ignite().configuration())); } + if (!delayDiscoData.isEmpty()) { + for (Map.Entry<UUID, Map<Integer, byte[]>> entry : delayDiscoData.entrySet()) + spi.onExchange(entry.getKey(), entry.getKey(), entry.getValue(), + U.resolveClassLoader(spi.ignite().configuration())); + + delayDiscoData.clear(); + } + locNode.setAttributes(msg.clientNodeAttributes()); locNode.visible(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/925370c3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index b509fc5..742227e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2522,6 +2522,8 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message to process. */ @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { + spi.startMessageProcess(msg); + sendHeartbeatMessage(); DebugLogger log = messageLogger(msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/925370c3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 4bb8dff..8cf3910 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1414,6 +1414,10 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T writeToSocket(sock, socketStream(sock), msg, timeout); } + protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { + // No-op. + } + /** * Writes message to the socket. * http://git-wip-us.apache.org/repos/asf/ignite/blob/925370c3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java new file mode 100644 index 0000000..b5723fe --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteAtomicSequence; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class CacheStartOnJoinTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Iteration. */ + private static final int ITERATIONS = 5; + + /** */ + private boolean client; + + static void doSleep(long millis) { + try { + U.sleep(1000); + } + catch (Exception e) { + throw new IgniteException(); + } + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi testSpi = new TcpDiscoverySpi() { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { + super.writeToSocket(sock, out, msg, timeout); + } + + private boolean delay = true; + + @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { + if (getTestGridName(0).equals(ignite.name())) { + if (msg instanceof TcpDiscoveryJoinRequestMessage) { + TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg; + + if (msg0.client() && delay) { + log.info("Delay join processing: " + msg0); + + delay = false; + + doSleep(5000); + } + } + } + + super.startMessageProcess(msg); + } + }; + + testSpi.setIpFinder(ipFinder); + testSpi.setJoinTimeout(60_000); + + cfg.setDiscoverySpi(testSpi); + + MemoryConfiguration memCfg = new MemoryConfiguration(); + memCfg.setPageSize(1024); + memCfg.setPageCacheSize(50 * 1024 * 1024); + + cfg.setMemoryConfiguration(memCfg); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10 * 60 * 1000L; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + System.setProperty(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN, "true"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + System.clearProperty(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testStartNodes() throws Exception { + for (int i = 0; i < ITERATIONS; i++) { + try { + log.info("Iteration: " + (i + 1) + '/' + ITERATIONS); + + doTest(); + } + finally { + stopAllGrids(true); + } + } + } + + /** + * @throws Exception If failed. + */ + private void doTest() throws Exception { + client = false; + + final int CLIENTS = 5; + final int SRVS = 4; + + Ignite srv = startGrids(SRVS); + + srv.getOrCreateCaches(cacheConfigurations()); + + final CyclicBarrier b = new CyclicBarrier(CLIENTS); + + client = true; + + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + @Override public void apply(Integer idx) { + try { + b.await(); + + startGrid(idx + SRVS); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + }, CLIENTS, "start-client"); + + final int NODES = CLIENTS + SRVS; + + for (int i = 0; i < CLIENTS + 1; i++) { + Ignite node = ignite(i); + + log.info("Check node: " + node.name()); + + assertEquals((Boolean)(i >= SRVS), node.configuration().isClientMode()); + + for (int c = 0; c < 5; c++) { + Collection<ClusterNode> nodes = node.cluster().forCacheNodes("cache-" + c).nodes(); + + assertEquals(NODES, nodes.size()); + } + + for (int c = 0; c < 5; c++) { + for (IgniteCache cache : node.getOrCreateCaches(cacheConfigurations())) { + cache.put(i, i); + + cache.get(i); + } + } + } + } + + private Collection<CacheConfiguration> cacheConfigurations() { + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + for (int i = 0; i < 5; i++) + ccfgs.add(cacheConfiguration("cache-" + i)); + + return ccfgs; + } + + private CacheConfiguration cacheConfiguration(String cacheName) { + CacheConfiguration ccfg = new CacheConfiguration(cacheName); + + ccfg.setName(cacheName); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 16)); + + return ccfg; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/925370c3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 3abbdab..2a9fc99 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -85,6 +85,7 @@ import org.apache.ignite.internal.processors.cache.MarshallerCacheJobRunNodeRest import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest; import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest; import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheStartOnJoinTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest; @@ -224,6 +225,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(CacheAffinityEarlyTest.class); suite.addTestSuite(IgniteCacheCreatePutMultiNodeSelfTest.class); suite.addTestSuite(IgniteCacheCreatePutTest.class); + suite.addTestSuite(CacheStartOnJoinTest.class); suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);
