Do not re-create node2part map on every singleMap message

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/46cba2a4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/46cba2a4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/46cba2a4

Branch: refs/heads/ignite-5398
Commit: 46cba2a46966759e6d658f3ba991f15722a6634f
Parents: 250b4e0
Author: Alexey Goncharuk <[email protected]>
Authored: Wed May 17 19:04:10 2017 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Wed May 17 19:04:39 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    |   9 +
 .../GridCachePartitionExchangeManager.java      |   8 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  16 +-
 .../colocated/GridDhtColocatedLockFuture.java   |   5 +
 .../GridDhtPartitionsExchangeFuture.java        |   3 +
 .../processors/cluster/ClusterProcessor.java    |   2 +-
 .../ignite/internal/util/nio/GridNioServer.java |  18 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  10 +-
 .../resources/META-INF/classnames.properties    |  30 ++-
 .../CacheClientsConcurrentStartTest.java        | 248 +++++++++++++++++++
 10 files changed, 338 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 277d176..4557561 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -319,6 +319,15 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
             else
                 U.error(log, msg0.toString());
 
+            try {
+                cacheMsg.onClassError(new IgniteCheckedException("Failed to 
find message handler for message: " + cacheMsg));
+
+                processFailedMessage(nodeId, cacheMsg, c);
+            }
+            catch (Exception e) {
+                U.error(log, "Failed to process failed message: " + e, e);
+            }
+
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 02da4fd..79166f2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -2000,8 +2000,14 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             GridDhtPartitionsExchangeFuture fut) {
             GridDhtPartitionsExchangeFuture cur = super.addx(fut);
 
-            while (size() > EXCHANGE_HISTORY_SIZE)
+            while (size() > EXCHANGE_HISTORY_SIZE) {
+                GridDhtPartitionsExchangeFuture last = last();
+
+                if (last != null && !last.isDone())
+                    break;
+
                 removeLast();
+            }
 
             // Return the value in the set.
             return cur == null ? fut : cur;

http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index d1283c3..d98358a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -90,6 +90,9 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     /** Logger. */
     private final IgniteLogger log;
 
+    /** Time logger. */
+    private final IgniteLogger timeLog;
+
     /** */
     private final AtomicReferenceArray<GridDhtLocalPartition> locParts;
 
@@ -146,6 +149,7 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         this.entryFactory = entryFactory;
 
         log = cctx.logger(getClass());
+        timeLog = cctx.logger(GridDhtPartitionsExchangeFuture.EXCHANGE_LOG);
 
         locParts = new 
AtomicReferenceArray<>(cctx.config().getAffinity().partitions());
     }
@@ -1262,9 +1266,19 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
             return null;
         }
 
+        long now = U.currentTimeMillis();
+
         lock.writeLock().lock();
 
         try {
+            long acquired = U.currentTimeMillis();
+
+            if (acquired - now >= 100) {
+                if (timeLog.isInfoEnabled())
+                    timeLog.info("Waited too long to acquire topology write 
lock " +
+                        "[cache=" + cctx.cacheId() + ", waitTime=" + (acquired 
- now) + ']');
+            }
+
             if (stopping)
                 return null;
 
@@ -1316,7 +1330,7 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
-            node2part = new GridDhtPartitionFullMap(node2part, updateSeq);
+            node2part.updateSequence(updateSeq);
 
             boolean changed = false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 00bcd10..9a0f090 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -1444,6 +1444,9 @@ public final class GridDhtColocatedLockFuture extends 
GridCompoundIdentityFuture
         /** */
         private boolean rcvRes;
 
+        /** Remap topology version for debug purpose. */
+        private AffinityTopologyVersion remapTopVer;
+
         /**
          * @param node Node.
          * @param keys Keys.
@@ -1515,6 +1518,8 @@ public final class GridDhtColocatedLockFuture extends 
GridCompoundIdentityFuture
                     return;
 
                 rcvRes = true;
+
+                remapTopVer = res.clientRemapVersion();
             }
 
             if (res.error() != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 4e04156..fff1702 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -113,6 +113,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
         
IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD,
 10);
 
+    /** */
     public static final String EXCHANGE_LOG = 
"org.apache.ignite.internal.exchange.time";
 
     /** */
@@ -551,6 +552,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
             exchLog.info("Start exchange init [topVer=" + topVer +
                 ", crd=" + crdNode +
                 ", evt=" + discoEvt.type() +
+                ", node=" + discoEvt.node() +
+                ", evtNode=" + discoEvt.node() +
                 ", customEvt=" + (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT 
? ((DiscoveryCustomEvent)discoEvt).customMessage() : null) +
                 ']');
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index 317b274..28d8cc4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -187,7 +187,7 @@ public class ClusterProcessor extends GridProcessorAdapter {
                         try {
                             ctx.io().send(node, 
GridTopic.TOPIC_INTERNAL_DIAGNOSTIC, res, GridIoPolicy.SYSTEM_POOL);
                         }
-                        catch (ClusterTopologyCheckedException e) {
+                        catch (ClusterTopologyCheckedException ignore) {
                             if (diagnosticLog.isDebugEnabled()) {
                                 diagnosticLog.debug("Failed to send diagnostic 
response, node left " +
                                     "[node=" + nodeId + ", msg=" + msg + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index cbba5da..de3f363 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -2152,7 +2152,14 @@ public class GridNioServer<T> {
                     // This exception will be handled in bodyInternal() method.
                     throw e;
                 }
-                catch (Exception e) {
+                catch (Exception | Error e) { // TODO IGNITE-2659.
+                    try {
+                        U.sleep(1000);
+                    }
+                    catch (IgniteInterruptedCheckedException ignore) {
+                        // No-op.
+                    }
+
                     U.warn(log, "Failed to process selector key (will close): 
" + ses, e);
 
                     close(ses, new GridNioException(e));
@@ -2197,7 +2204,14 @@ public class GridNioServer<T> {
                     // This exception will be handled in bodyInternal() method.
                     throw e;
                 }
-                catch (Exception e) {
+                catch (Exception | Error e) { // TODO IGNITE-2659.
+                    try {
+                        U.sleep(1000);
+                    }
+                    catch (IgniteInterruptedCheckedException ignore) {
+                        // No-op.
+                    }
+
                     if (!closed)
                         U.warn(log, "Failed to process selector key (will 
close): " + ses, e);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index a8a5bd3..9a122d7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3024,7 +3024,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             "in order to prevent parties from waiting forever 
in case of network issues " +
                             "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
 
-                    errs.addSuppressed(new IgniteCheckedException("Failed to 
connect to address: " + addr, e));
+                    errs.addSuppressed(new IgniteCheckedException("Failed to 
connect to address " +
+                        "[addr=" + addr + ", err=" + e.getMessage() + ']', e));
 
                     // Reconnect for the second time, if connection is not 
established.
                     if (!failureDetThrReached && connectAttempts < 2 &&
@@ -3054,11 +3055,10 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
             if (getSpiContext().node(node.id()) != null && 
(CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
                 X.hasCause(errs, ConnectException.class, 
SocketTimeoutException.class, HandshakeTimeoutException.class,
                     IgniteSpiOperationTimeoutException.class)) {
-                LT.warn(log, "TcpCommunicationSpi failed to establish 
connection to node, node will be dropped from " +
+
+                U.error(log, "TcpCommunicationSpi failed to establish 
connection to node, node will be dropped from " +
                     "cluster [" +
-                    "rmtNode=" + node +
-                    ", err=" + errs +
-                    ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + 
']');
+                    "rmtNode=" + node + "]", errs);
 
                 getSpiContext().failNode(node.id(), "TcpCommunicationSpi 
failed to establish connection to node [" +
                     "rmtNode=" + node +

http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties 
b/modules/core/src/main/resources/META-INF/classnames.properties
index a9923f6..b548098 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1971,4 +1971,32 @@ 
org.apache.ignite.transactions.TransactionRollbackException
 org.apache.ignite.transactions.TransactionState
 org.apache.ignite.transactions.TransactionTimeoutException
 org.apache.ignite.util.AttributeNodeFilter
-org.apache.ignite.internal.util.GridPartitionStateMap
\ No newline at end of file
+org.apache.ignite.internal.util.GridPartitionStateMap
+org.gridgain.grid.cache.db.GridCacheOffheapManager$RebalanceIteratorAdapter
+org.gridgain.grid.cache.db.wal.FileWriteAheadLogManager$FileArchiver$1
+org.gridgain.grid.cache.db.wal.FileWriteAheadLogManager$Mode
+org.gridgain.grid.cache.db.wal.FileWriteAheadLogManager$RecordsIterator
+org.gridgain.grid.internal.processors.cache.database.CollectDependantSnapshotSetTask
+org.gridgain.grid.internal.processors.cache.database.CollectDependantSnapshotSetTask$CollectDependantSnapshotSetJob
+org.gridgain.grid.internal.processors.cache.database.CollectSnapshotInfoTask
+org.gridgain.grid.internal.processors.cache.database.CollectSnapshotInfoTask$CollectSnapshotInfoJob
+org.gridgain.grid.internal.processors.cache.database.CollectSnapshotInfoTaskResult
+org.gridgain.grid.internal.processors.cache.database.FullPageIdIterableComparator
+org.gridgain.grid.internal.processors.cache.database.GetOngoingOperationTask
+org.gridgain.grid.internal.processors.cache.database.GetOngoingOperationTask$GetOngoingOperationJob
+org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$13
+org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$15
+org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$18
+org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$CheckpointEntryType
+org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$Checkpointer$2
+org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$FullPageIdComparator
+org.gridgain.grid.internal.processors.cache.database.SnapshotOperationFuture$1
+org.gridgain.grid.internal.processors.cache.database.SnapshotTaskBase
+org.gridgain.grid.internal.processors.cache.database.messages.CheckSnapshotFinishedMessage
+org.gridgain.grid.internal.processors.cache.database.messages.CheckSnapshotMetadataMessage
+org.gridgain.grid.internal.processors.cache.database.messages.ClusterWideSnapshotOperationFinishedMessage
+org.gridgain.grid.internal.processors.cache.database.messages.SnapshotIssueMessage
+org.gridgain.grid.internal.processors.cache.database.messages.SnapshotOperationFinishedMessage
+org.gridgain.grid.internal.processors.cache.database.messages.SnapshotProgressMessage
+org.gridgain.grid.internal.visor.database.VisorCheckpointMetrics
+org.gridgain.grid.internal.visor.database.VisorMemoryPoolMetrics

http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java
new file mode 100644
index 0000000..44425f1
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteAtomicSequence;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ *
+ */
+public class CacheClientsConcurrentStartTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int SRV_CNT = 4;
+
+    /** */
+    private static final int CLIENTS_CNT = 16;
+
+    /** */
+    private static final int CACHES = 30;
+
+    /** Stopped. */
+    private volatile boolean stopped;
+
+    /** Iteration. */
+    private static final int ITERATIONS = 2;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi testSpi = new TcpDiscoverySpi() {
+            @Override protected void writeToSocket(Socket sock, OutputStream 
out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, 
IgniteCheckedException {
+                if (msg instanceof TcpDiscoveryCustomEventMessage && 
msg.verified()) {
+                    try {
+                        System.out.println(Thread.currentThread().getName() + 
" delay custom message");
+
+                        U.sleep(ThreadLocalRandom.current().nextLong(500) + 
100);
+                    }
+                    catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+
+                super.writeToSocket(sock, out, msg, timeout);
+            }
+        };
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setMarshaller(null);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        if (getTestGridIndex(gridName) >= SRV_CNT)
+            cfg.setClientMode(true);
+        else {
+            CacheConfiguration ccfgs[] = new CacheConfiguration[CACHES / 2];
+
+            for (int i = 0; i < ccfgs.length; i++)
+                ccfgs[i] = cacheConfiguration("cache-" + i);
+
+            cfg.setCacheConfiguration(ccfgs);
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60 * 1000L;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartNodes() throws Exception {
+        for (int i = 0; i < ITERATIONS; i++) {
+            try {
+                log.info("Iteration: " + (i + 1) + '/' + ITERATIONS);
+
+                doTest();
+            }
+            finally {
+                stopAllGrids(true);
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTest() throws Exception {
+        final AtomicBoolean failed = new AtomicBoolean();
+
+        startGrids(SRV_CNT);
+
+        for (int i = 0; i < SRV_CNT; i++) {
+            
((TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi()).blockMessages(new
 IgnitePredicate<GridIoMessage>() {
+                @Override public boolean apply(GridIoMessage msg) {
+                    if (msg.message() instanceof GridDhtPartitionsFullMessage) 
{
+                        try {
+                            U.sleep(ThreadLocalRandom.current().nextLong(500) 
+ 100);
+                        }
+                        catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+
+                    return false;
+                }
+            });
+        }
+
+        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+        for (int i = 0; i < CLIENTS_CNT; i++) {
+            final int idx = i;
+
+            IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+                @Override public void run() {
+                    Random rnd = new Random();
+
+                    try {
+                        Ignite ignite = startGrid(SRV_CNT + idx);
+
+                        assertTrue(ignite.configuration().isClientMode());
+
+                        for (int i = 0; i < CACHES / 2; i++) {
+                            String cacheName = "cache-" + rnd.nextInt(CACHES);
+
+                            IgniteCache<Object, Object> cache = 
getCache(ignite, cacheName);
+
+                            cache.put(ignite.cluster().localNode().id(), 
UUID.randomUUID());
+
+                            IgniteAtomicSequence seq = 
ignite.atomicSequence("seq-" + rnd.nextInt(20), 0, true);
+
+                            seq.getAndIncrement();
+                        }
+
+                        while (!stopped) {
+                            IgniteCache<Object, Object> cache = 
getCache(ignite, "cache-" + rnd.nextInt(CACHES));
+
+                            int val = Math.abs(rnd.nextInt(100));
+
+                            if (val >= 0 && val < 40)
+                                
cache.containsKey(ignite.cluster().localNode().id());
+                            else if (val >= 40 && val < 80)
+                                cache.get(ignite.cluster().localNode().id());
+                            else
+                                cache.put(ignite.cluster().localNode().id(), 
UUID.randomUUID());
+
+                            Thread.sleep(10);
+                        }
+                    }
+                    catch (Exception e) {
+                        log.error("Unexpected error: " + e, e);
+
+                        failed.set(true);
+                    }
+                }
+            }, 1, "client-thread");
+
+            futs.add(fut);
+        }
+
+        Thread.sleep(10_000);
+
+        stopped = true;
+
+        for (IgniteInternalFuture<?> fut : futs)
+            fut.get();
+
+        assertFalse(failed.get());
+    }
+
+    /**
+     * @param grid Grid.
+     * @return Cache.
+     */
+    private IgniteCache getCache(Ignite grid, String cacheName) {
+        return grid.getOrCreateCache(cacheConfiguration(cacheName));
+    }
+
+    private CacheConfiguration cacheConfiguration(String cacheName) {
+        CacheConfiguration ccfg = defaultCacheConfiguration();
+
+        ccfg.setName(cacheName);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setCacheMode(CacheMode.PARTITIONED);
+        ccfg.setBackups(2);
+        ccfg.setNearConfiguration(null);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+
+        return ccfg;
+    }
+}
\ No newline at end of file

Reply via email to