client join race

(cherry picked from commit 682ca8e)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/925370c3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/925370c3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/925370c3

Branch: refs/heads/ignite-5398
Commit: 925370c3c6c587870e19f31f8660139884847e06
Parents: 58b6e05
Author: sboikov <[email protected]>
Authored: Sat May 6 12:15:21 2017 +0300
Committer: sboikov <[email protected]>
Committed: Wed May 10 12:00:04 2017 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  23 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   2 +
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   4 +
 .../cache/distributed/CacheStartOnJoinTest.java | 242 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 5 files changed, 270 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/925370c3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 334812c..3edf860 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -29,6 +29,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -130,6 +131,9 @@ class ClientImpl extends TcpDiscoveryImpl {
     /** Remote nodes. */
     private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new 
ConcurrentHashMap8<>();
 
+    /** */
+    private final LinkedHashMap<UUID, Map<Integer, byte[]>> delayDiscoData = 
new LinkedHashMap<>();
+
     /** Topology history. */
     private final NavigableMap<Long, Collection<ClusterNode>> topHist = new 
TreeMap<>();
 
@@ -1479,6 +1483,8 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                                 nodeAdded = false;
 
+                                delayDiscoData.clear();
+
                                 IgniteClientDisconnectedCheckedException err =
                                     new 
IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " +
                                     "client node disconnected.");
@@ -1729,9 +1735,12 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                         Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
 
-                        if (data != null)
-                            spi.onExchange(newNodeId, newNodeId, data,
-                                
U.resolveClassLoader(spi.ignite().configuration()));
+                        if (data != null) {
+                            if (joining())
+                                delayDiscoData.put(newNodeId, data);
+                            else
+                                spi.onExchange(newNodeId, newNodeId, data, 
U.resolveClassLoader(spi.ignite().configuration()));
+                        }
                     }
                 }
                 else {
@@ -1758,6 +1767,14 @@ class ClientImpl extends TcpDiscoveryImpl {
                                 
U.resolveClassLoader(spi.ignite().configuration()));
                     }
 
+                    if (!delayDiscoData.isEmpty()) {
+                        for (Map.Entry<UUID, Map<Integer, byte[]>> entry : 
delayDiscoData.entrySet())
+                            spi.onExchange(entry.getKey(), entry.getKey(), 
entry.getValue(),
+                                    
U.resolveClassLoader(spi.ignite().configuration()));
+
+                        delayDiscoData.clear();
+                    }
+
                     locNode.setAttributes(msg.clientNodeAttributes());
                     locNode.visible(true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/925370c3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index b509fc5..742227e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2522,6 +2522,8 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message to process.
          */
         @Override protected void processMessage(TcpDiscoveryAbstractMessage 
msg) {
+            spi.startMessageProcess(msg);
+
             sendHeartbeatMessage();
 
             DebugLogger log = messageLogger(msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/925370c3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 4bb8dff..8cf3910 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1414,6 +1414,10 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements DiscoverySpi, T
         writeToSocket(sock, socketStream(sock), msg, timeout);
     }
 
+    protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+        // No-op.
+    }
+
     /**
      * Writes message to the socket.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/925370c3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
new file mode 100644
index 0000000..b5723fe
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteAtomicSequence;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheStartOnJoinTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Iteration. */
+    private static final int ITERATIONS = 5;
+
+    /** */
+    private boolean client;
+
+    static void doSleep(long millis) {
+        try {
+            U.sleep(1000);
+        }
+        catch (Exception e) {
+            throw new IgniteException();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi testSpi = new TcpDiscoverySpi() {
+            @Override protected void writeToSocket(Socket sock, OutputStream 
out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, 
IgniteCheckedException {
+                super.writeToSocket(sock, out, msg, timeout);
+            }
+
+            private boolean delay = true;
+
+            @Override protected void 
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+                if (getTestGridName(0).equals(ignite.name())) {
+                    if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+                        TcpDiscoveryJoinRequestMessage msg0 = 
(TcpDiscoveryJoinRequestMessage)msg;
+
+                        if (msg0.client() && delay) {
+                            log.info("Delay join processing: " + msg0);
+
+                            delay = false;
+
+                            doSleep(5000);
+                        }
+                    }
+                }
+
+                super.startMessageProcess(msg);
+            }
+        };
+
+        testSpi.setIpFinder(ipFinder);
+        testSpi.setJoinTimeout(60_000);
+
+        cfg.setDiscoverySpi(testSpi);
+
+        MemoryConfiguration memCfg = new MemoryConfiguration();
+        memCfg.setPageSize(1024);
+        memCfg.setPageCacheSize(50 * 1024 * 1024);
+
+        cfg.setMemoryConfiguration(memCfg);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60 * 1000L;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        System.setProperty(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN, 
"true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        
System.clearProperty(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN);
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartNodes() throws Exception {
+        for (int i = 0; i < ITERATIONS; i++) {
+            try {
+                log.info("Iteration: " + (i + 1) + '/' + ITERATIONS);
+
+                doTest();
+            }
+            finally {
+                stopAllGrids(true);
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTest() throws Exception {
+        client = false;
+
+        final int CLIENTS = 5;
+        final int SRVS = 4;
+
+        Ignite srv = startGrids(SRVS);
+
+        srv.getOrCreateCaches(cacheConfigurations());
+
+        final CyclicBarrier b = new CyclicBarrier(CLIENTS);
+
+        client = true;
+
+        GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+            @Override public void apply(Integer idx) {
+                try {
+                    b.await();
+
+                    startGrid(idx + SRVS);
+                }
+                catch (Exception e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, CLIENTS, "start-client");
+
+        final int NODES = CLIENTS + SRVS;
+
+        for (int i = 0; i < CLIENTS + 1; i++) {
+            Ignite node = ignite(i);
+
+            log.info("Check node: " + node.name());
+
+            assertEquals((Boolean)(i >= SRVS), 
node.configuration().isClientMode());
+
+            for (int c = 0; c < 5; c++) {
+                Collection<ClusterNode> nodes = 
node.cluster().forCacheNodes("cache-" + c).nodes();
+
+                assertEquals(NODES, nodes.size());
+            }
+
+            for (int c = 0; c < 5; c++) {
+                for (IgniteCache cache : 
node.getOrCreateCaches(cacheConfigurations())) {
+                    cache.put(i, i);
+
+                    cache.get(i);
+                }
+            }
+        }
+    }
+
+    private Collection<CacheConfiguration> cacheConfigurations() {
+        List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+        for (int i = 0; i < 5; i++)
+            ccfgs.add(cacheConfiguration("cache-" + i));
+
+        return ccfgs;
+    }
+
+    private CacheConfiguration cacheConfiguration(String cacheName) {
+        CacheConfiguration ccfg = new CacheConfiguration(cacheName);
+
+        ccfg.setName(cacheName);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 16));
+
+        return ccfg;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/925370c3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 3abbdab..2a9fc99 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -85,6 +85,7 @@ import 
org.apache.ignite.internal.processors.cache.MarshallerCacheJobRunNodeRest
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.CacheStartOnJoinTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest;
@@ -224,6 +225,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         suite.addTestSuite(CacheAffinityEarlyTest.class);
         suite.addTestSuite(IgniteCacheCreatePutMultiNodeSelfTest.class);
         suite.addTestSuite(IgniteCacheCreatePutTest.class);
+        suite.addTestSuite(CacheStartOnJoinTest.class);
 
         suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);
 

Reply via email to