Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 ae9144cd2 -> 81dbfd527


ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81dbfd52
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81dbfd52
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81dbfd52

Branch: refs/heads/ignite-5578
Commit: 81dbfd52704890ab6c87fc48dfa424a34a853c8f
Parents: ae9144c
Author: sboikov <[email protected]>
Authored: Wed Aug 2 15:28:59 2017 +0300
Committer: sboikov <[email protected]>
Committed: Wed Aug 2 16:35:08 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   3 +
 .../GridCachePartitionExchangeManager.java      |  15 ++
 .../GridDhtPartitionsExchangeFuture.java        |   4 +-
 .../GridDhtPartitionsSingleRequest.java         |   6 +-
 .../internal/TestDelayingCommunicationSpi.java  |  63 +++++
 .../distributed/CacheExchangeMergeTest.java     | 230 ++++++++++++++++++-
 ...eAtomicInvalidPartitionHandlingSelfTest.java |  36 +--
 7 files changed, 317 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/81dbfd52/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 2fa52b6..d3cba2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -168,6 +168,9 @@ public final class IgniteSystemProperties {
     /** Maximum size for exchange history. Default value is {@code 1000}.*/
     public static final String IGNITE_EXCHANGE_HISTORY_SIZE = 
"IGNITE_EXCHANGE_HISTORY_SIZE";
 
+    /** */
+    public static final String IGNITE_EXCHANGE_MERGE_DELAY = 
"IGNITE_EXCHANGE_MERGE_DELAY";
+
     /**
      * Name of the system property defining name of command line program.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/81dbfd52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 19cb14c..fb91a6d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -132,6 +132,10 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         
IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE,
 1_000);
 
     /** */
+    private final long IGNITE_EXCHANGE_MERGE_DELAY =
+        
IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY,
 0);
+
+    /** */
     private static final IgniteProductVersion EXCHANGE_PROTOCOL_2_SINCE = 
IgniteProductVersion.fromString("2.2.0");
 
     /** Atomic reference for pending partition resend timeout object. */
@@ -1815,6 +1819,17 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * @return {@code False} if need wait messages for merged exchanges.
      */
     public boolean mergeExchangesOnCoordinator(GridDhtPartitionsExchangeFuture 
curFut) {
+        if (IGNITE_EXCHANGE_MERGE_DELAY > 0) {
+            try {
+                U.sleep(IGNITE_EXCHANGE_MERGE_DELAY);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                U.warn(log, "Failed to wait for exchange merge, thread 
interrupted: " + e);
+
+                return true;
+            }
+        }
+
         AffinityTopologyVersion exchMergeTestWaitVer = 
this.exchMergeTestWaitVer;
 
         if (exchMergeTestWaitVer != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/81dbfd52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 8241fdf..1a5a8e2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1454,9 +1454,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             initFut.onDone(err == null);
 
-            ExchangeDiscoveryEvents evts = exchCtx.events();
+            if (exchCtx != null && exchCtx.events().hasServerLeft()) {
+                ExchangeDiscoveryEvents evts = exchCtx.events();
 
-            if (evts.hasServerLeft()) {
                 for (DiscoveryEvent evt : exchCtx.events().events()) {
                     if (evts.serverLeftEvent(evt)) {
                         for (CacheGroupContext grp : 
cctx.cache().cacheGroups())

http://git-wip-us.apache.org/repos/asf/ignite/blob/81dbfd52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
index 4f1cdc5..82feb12 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
@@ -55,7 +55,7 @@ public class GridDhtPartitionsSingleRequest extends 
GridDhtPartitionsAbstractMes
      * @param restoreExchId Initial exchange ID for current exchange.
      * @return Message.
      */
-    public static GridDhtPartitionsSingleRequest 
restoreStateRequest(GridDhtPartitionExchangeId msgExchId, 
GridDhtPartitionExchangeId restoreExchId) {
+    static GridDhtPartitionsSingleRequest 
restoreStateRequest(GridDhtPartitionExchangeId msgExchId, 
GridDhtPartitionExchangeId restoreExchId) {
         GridDhtPartitionsSingleRequest msg = new 
GridDhtPartitionsSingleRequest(msgExchId);
 
         msg.restoreState(true);
@@ -65,11 +65,11 @@ public class GridDhtPartitionsSingleRequest extends 
GridDhtPartitionsAbstractMes
         return msg;
     }
 
-    public GridDhtPartitionExchangeId restoreExchangeId() {
+    GridDhtPartitionExchangeId restoreExchangeId() {
         return restoreExchId;
     }
 
-    public void restoreExchangeId(GridDhtPartitionExchangeId restoreExchId) {
+    void restoreExchangeId(GridDhtPartitionExchangeId restoreExchId) {
         this.restoreExchId = restoreExchId;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/81dbfd52/modules/core/src/test/java/org/apache/ignite/internal/TestDelayingCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/TestDelayingCommunicationSpi.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/TestDelayingCommunicationSpi.java
new file mode 100644
index 0000000..e49d5da
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/TestDelayingCommunicationSpi.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jsr166.ThreadLocalRandom8;
+
+/**
+ *
+ */
+public abstract class TestDelayingCommunicationSpi extends TcpCommunicationSpi 
{
+    /** {@inheritDoc} */
+    @Override public void sendMessage(ClusterNode node, Message msg, 
IgniteInClosure<IgniteException> ackC)
+        throws IgniteSpiException {
+        try {
+            GridIoMessage ioMsg = (GridIoMessage)msg;
+
+            if (delayMessage(ioMsg.message(), ioMsg))
+                U.sleep(ThreadLocalRandom8.current().nextInt(delayMillis()) + 
1);
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw new IgniteSpiException(e);
+        }
+
+        super.sendMessage(node, msg, ackC);
+    }
+
+    /**
+     * @param msg Message.
+     * @param ioMsg Wrapper message.
+     * @return {@code True} if need delay message.
+     */
+    protected abstract boolean delayMessage(Message msg, GridIoMessage ioMsg);
+
+    /**
+     * @return Max delay time.
+     */
+    protected int delayMillis() {
+        return 250;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/81dbfd52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 499399f..8d0cb39 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterNode;
@@ -44,11 +45,15 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.TestDelayingCommunicationSpi;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
@@ -65,6 +70,7 @@ import 
org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
+import org.eclipse.jetty.util.ConcurrentHashSet;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -89,6 +95,9 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
     private boolean testSpi;
 
     /** */
+    private boolean testDelaySpi;
+
+    /** */
     private static String[] cacheNames = {"c1", "c2", "c3", "c4", "c5", "c6", 
"c7", "c8", "c9", "c10"};
 
     /** */
@@ -108,6 +117,8 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
 
         if (testSpi)
             cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+        else if (testDelaySpi)
+            cfg.setCommunicationSpi(new TestDelayExchangeMessagesSpi());
 
         Boolean clientMode = client.get();
 
@@ -185,11 +196,204 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
     }
 
     // TODO IGNITE-5578 joined merged node failed (client/server).
-    // TODO IGNITE-5578 random topology changes, random delay for exchange 
messages.
     // TODO IGNITE-5578 check exchanges/affinity consistency.
-    // TODO IGNITE-5578 join with start cache, merge with fail
-    // TODO IGNITE-5578 join with start cache, merge with join, coordinator 
left
-    // TODO IGNITE-5578 join with start cache, merge with join, become 
coordinator
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDelayExchangeMessages() throws Exception {
+        testDelaySpi = true;
+
+        System.setProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY, 
"2000");
+
+        try {
+            final int srvs = 6;
+            final int clients = 3;
+
+            startGridsMultiThreaded(srvs);
+
+            for (int i = 0; i < clients; i++) {
+                client.set(true);
+
+                startGrid(srvs + i);
+            }
+
+            final int initNodes = srvs + clients;
+
+            final AtomicInteger stopIdx = new AtomicInteger();
+
+            IgniteInternalFuture stopFut = 
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    Thread.sleep(ThreadLocalRandom.current().nextLong(500) + 
1);
+
+                    stopGrid(stopIdx.incrementAndGet());
+
+                    return null;
+                }
+            }, 3, "stop-srv");
+
+            final AtomicInteger startIdx = new AtomicInteger(initNodes);
+
+            IgniteInternalFuture startFut = 
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int nodeIdx = startIdx.incrementAndGet();
+
+                    if (rnd.nextInt(3) == 0) {
+                        log.info("Start client: " + nodeIdx);
+
+                        client.set(true);
+                    }
+                    else
+                        log.info("Start server: " + nodeIdx);
+
+                    startGrid(nodeIdx);
+
+                    if (rnd.nextBoolean()) {
+                        log.info("Stop started node: " + nodeIdx);
+
+                        stopGrid(nodeIdx);
+                    }
+
+                    return null;
+                }
+            }, 5, "start-node");
+
+            stopFut.get();
+
+            checkCaches();
+        }
+        finally {
+            
System.clearProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeStartRandomClientsServers() throws Exception {
+        for (int iter = 0; iter < 3; iter++) {
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            final int srvs = rnd.nextInt(3) + 1;
+            final int clients = rnd.nextInt(3);
+
+            log.info("Iteration [iter=" + iter + ", srvs=" + srvs + ", 
clients=" + clients + ']');
+
+            Ignite srv0 = startGrids(srvs);
+
+            for (int i = 0; i < clients; i++) {
+                client.set(true);
+
+                startGrid(srvs + i);
+            }
+
+            final int threads = 8;
+
+            final int initNodes = srvs + clients;
+
+            mergeExchangeWaitVersion(srv0, initNodes + threads);
+
+            final AtomicInteger idx = new AtomicInteger(initNodes);
+
+            IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int nodeIdx = idx.incrementAndGet();
+
+                    if (rnd.nextInt(3) == 0) {
+                        log.info("Start client: " + nodeIdx);
+
+                        client.set(true);
+                    }
+                    else
+                        log.info("Start server: " + nodeIdx);
+
+                    startGrid(nodeIdx);
+
+                    return null;
+                }
+            }, threads, "test-thread");
+
+            fut.get();
+
+            checkCaches();
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMergeStartStopRandomClientsServers() throws Exception {
+        for (int iter = 0; iter < 3; iter++) {
+            final int srvs = 5;
+            final int clients = 5;
+
+            Ignite srv0 = startGrids(srvs);
+
+            for (int i = 0; i < clients; i++) {
+                client.set(true);
+
+                startGrid(srvs + i);
+            }
+
+            final int threads = 8;
+
+            final int initNodes = srvs + clients;
+
+            mergeExchangeWaitVersion(srv0, initNodes + threads);
+
+            final AtomicInteger idx = new AtomicInteger(initNodes);
+
+            final ConcurrentHashSet<Integer> stopNodes = new 
ConcurrentHashSet<>();
+
+            IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    if (rnd.nextBoolean()) {
+                        Integer stopIdx;
+
+                        for (;;) {
+                            stopIdx = rnd.nextInt(initNodes - 1) + 1;
+
+                            if (stopNodes.add(stopIdx))
+                                break;
+                        }
+
+                        log.info("Stop node: " + stopIdx);
+
+                        stopGrid(getTestIgniteInstanceName(stopIdx), true, 
false);
+                    }
+                    else {
+                        int nodeIdx = idx.incrementAndGet();
+
+                        if (rnd.nextInt(5) == 0) {
+                            log.info("Start client: " + nodeIdx);
+
+                            client.set(true);
+                        }
+                        else
+                            log.info("Start server: " + nodeIdx);
+
+                        startGrid(nodeIdx);
+                    }
+
+                    return null;
+                }
+            }, threads, "test-thread");
+
+            fut.get();
+
+            checkCaches();
+
+            stopAllGrids();
+        }
+    }
 
     /**
      * @throws Exception If failed.
@@ -325,6 +529,10 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
         checkCaches();
     }
 
+    // TODO IGNITE-5578 join with start cache, merge with fail
+    // TODO IGNITE-5578 join with start cache, merge with join, coordinator 
left
+    // TODO IGNITE-5578 join with start cache, merge with join, become 
coordinator
+
     /**
      * @throws Exception If failed.
      */
@@ -1035,4 +1243,18 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
          */
         NON_CRD_RCVD
     }
+
+    /**
+     *
+     */
+
+    static class TestDelayExchangeMessagesSpi extends 
TestDelayingCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override protected boolean delayMessage(Message msg, GridIoMessage 
ioMsg) {
+            if (msg instanceof GridDhtPartitionsAbstractMessage)
+                return ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != 
null || (msg instanceof GridDhtPartitionsSingleRequest);
+
+            return false;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/81dbfd52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
index f94e34b..3f2fe8a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
@@ -29,7 +29,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CachePartialUpdateException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.Affinity;
@@ -37,8 +36,8 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.TestDelayingCommunicationSpi;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -47,16 +46,12 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jsr166.ThreadLocalRandom8;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
@@ -367,32 +362,11 @@ public class 
GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
     /**
      *
      */
-    private static class DelayCommunicationSpi extends TcpCommunicationSpi {
+    private static class DelayCommunicationSpi extends 
TestDelayingCommunicationSpi {
         /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg, 
IgniteInClosure<IgniteException> ackC)
-            throws IgniteSpiException {
-            try {
-                if (delayMessage((GridIoMessage)msg))
-                    U.sleep(ThreadLocalRandom8.current().nextInt(250) + 1);
-            }
-            catch (IgniteInterruptedCheckedException e) {
-                throw new IgniteSpiException(e);
-            }
-
-            super.sendMessage(node, msg, ackC);
-        }
-
-        /**
-         * Checks if message should be delayed.
-         *
-         * @param msg Message to check.
-         * @return {@code True} if message should be delayed.
-         */
-        private boolean delayMessage(GridIoMessage msg) {
-            Object origMsg = msg.message();
-
-            return delay &&
-                ((origMsg instanceof GridNearAtomicAbstractUpdateRequest) || 
(origMsg instanceof GridDhtAtomicAbstractUpdateRequest));
+        @Override protected boolean delayMessage(Message msg, GridIoMessage 
ioMsg) {
+            return delay && ((msg instanceof 
GridNearAtomicAbstractUpdateRequest) ||
+                (msg instanceof GridDhtAtomicAbstractUpdateRequest));
         }
     }
 }

Reply via email to