Repository: ignite Updated Branches: refs/heads/ignite-5578 ae9144cd2 -> 81dbfd527
ignite-5578 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81dbfd52 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81dbfd52 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81dbfd52 Branch: refs/heads/ignite-5578 Commit: 81dbfd52704890ab6c87fc48dfa424a34a853c8f Parents: ae9144c Author: sboikov <[email protected]> Authored: Wed Aug 2 15:28:59 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Aug 2 16:35:08 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 3 + .../GridCachePartitionExchangeManager.java | 15 ++ .../GridDhtPartitionsExchangeFuture.java | 4 +- .../GridDhtPartitionsSingleRequest.java | 6 +- .../internal/TestDelayingCommunicationSpi.java | 63 +++++ .../distributed/CacheExchangeMergeTest.java | 230 ++++++++++++++++++- ...eAtomicInvalidPartitionHandlingSelfTest.java | 36 +-- 7 files changed, 317 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81dbfd52/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 2fa52b6..d3cba2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -168,6 +168,9 @@ public final class IgniteSystemProperties { /** Maximum size for exchange history. Default value is {@code 1000}.*/ public static final String IGNITE_EXCHANGE_HISTORY_SIZE = "IGNITE_EXCHANGE_HISTORY_SIZE"; + /** */ + public static final String IGNITE_EXCHANGE_MERGE_DELAY = "IGNITE_EXCHANGE_MERGE_DELAY"; + /** * Name of the system property defining name of command line program. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/81dbfd52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 19cb14c..fb91a6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -132,6 +132,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE, 1_000); /** */ + private final long IGNITE_EXCHANGE_MERGE_DELAY = + IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY, 0); + + /** */ private static final IgniteProductVersion EXCHANGE_PROTOCOL_2_SINCE = IgniteProductVersion.fromString("2.2.0"); /** Atomic reference for pending partition resend timeout object. */ @@ -1815,6 +1819,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @return {@code False} if need wait messages for merged exchanges. */ public boolean mergeExchangesOnCoordinator(GridDhtPartitionsExchangeFuture curFut) { + if (IGNITE_EXCHANGE_MERGE_DELAY > 0) { + try { + U.sleep(IGNITE_EXCHANGE_MERGE_DELAY); + } + catch (IgniteInterruptedCheckedException e) { + U.warn(log, "Failed to wait for exchange merge, thread interrupted: " + e); + + return true; + } + } + AffinityTopologyVersion exchMergeTestWaitVer = this.exchMergeTestWaitVer; if (exchMergeTestWaitVer != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/81dbfd52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 8241fdf..1a5a8e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1454,9 +1454,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte initFut.onDone(err == null); - ExchangeDiscoveryEvents evts = exchCtx.events(); + if (exchCtx != null && exchCtx.events().hasServerLeft()) { + ExchangeDiscoveryEvents evts = exchCtx.events(); - if (evts.hasServerLeft()) { for (DiscoveryEvent evt : exchCtx.events().events()) { if (evts.serverLeftEvent(evt)) { for (CacheGroupContext grp : cctx.cache().cacheGroups()) http://git-wip-us.apache.org/repos/asf/ignite/blob/81dbfd52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java index 4f1cdc5..82feb12 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java @@ -55,7 +55,7 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes * @param restoreExchId Initial exchange ID for current exchange. * @return Message. */ - public static GridDhtPartitionsSingleRequest restoreStateRequest(GridDhtPartitionExchangeId msgExchId, GridDhtPartitionExchangeId restoreExchId) { + static GridDhtPartitionsSingleRequest restoreStateRequest(GridDhtPartitionExchangeId msgExchId, GridDhtPartitionExchangeId restoreExchId) { GridDhtPartitionsSingleRequest msg = new GridDhtPartitionsSingleRequest(msgExchId); msg.restoreState(true); @@ -65,11 +65,11 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes return msg; } - public GridDhtPartitionExchangeId restoreExchangeId() { + GridDhtPartitionExchangeId restoreExchangeId() { return restoreExchId; } - public void restoreExchangeId(GridDhtPartitionExchangeId restoreExchId) { + void restoreExchangeId(GridDhtPartitionExchangeId restoreExchId) { this.restoreExchId = restoreExchId; } http://git-wip-us.apache.org/repos/asf/ignite/blob/81dbfd52/modules/core/src/test/java/org/apache/ignite/internal/TestDelayingCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestDelayingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestDelayingCommunicationSpi.java new file mode 100644 index 0000000..e49d5da --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/TestDelayingCommunicationSpi.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.jsr166.ThreadLocalRandom8; + +/** + * + */ +public abstract class TestDelayingCommunicationSpi extends TcpCommunicationSpi { + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) + throws IgniteSpiException { + try { + GridIoMessage ioMsg = (GridIoMessage)msg; + + if (delayMessage(ioMsg.message(), ioMsg)) + U.sleep(ThreadLocalRandom8.current().nextInt(delayMillis()) + 1); + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteSpiException(e); + } + + super.sendMessage(node, msg, ackC); + } + + /** + * @param msg Message. + * @param ioMsg Wrapper message. + * @return {@code True} if need delay message. + */ + protected abstract boolean delayMessage(Message msg, GridIoMessage ioMsg); + + /** + * @return Max delay time. + */ + protected int delayMillis() { + return 250; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/81dbfd52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java index 499399f..8d0cb39 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cluster.ClusterNode; @@ -44,11 +45,15 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.TestDelayingCommunicationSpi; import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -65,6 +70,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; +import org.eclipse.jetty.util.ConcurrentHashSet; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; @@ -89,6 +95,9 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { private boolean testSpi; /** */ + private boolean testDelaySpi; + + /** */ private static String[] cacheNames = {"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10"}; /** */ @@ -108,6 +117,8 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { if (testSpi) cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + else if (testDelaySpi) + cfg.setCommunicationSpi(new TestDelayExchangeMessagesSpi()); Boolean clientMode = client.get(); @@ -185,11 +196,204 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { } // TODO IGNITE-5578 joined merged node failed (client/server). - // TODO IGNITE-5578 random topology changes, random delay for exchange messages. // TODO IGNITE-5578 check exchanges/affinity consistency. - // TODO IGNITE-5578 join with start cache, merge with fail - // TODO IGNITE-5578 join with start cache, merge with join, coordinator left - // TODO IGNITE-5578 join with start cache, merge with join, become coordinator + + /** + * @throws Exception If failed. + */ + public void testDelayExchangeMessages() throws Exception { + testDelaySpi = true; + + System.setProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY, "2000"); + + try { + final int srvs = 6; + final int clients = 3; + + startGridsMultiThreaded(srvs); + + for (int i = 0; i < clients; i++) { + client.set(true); + + startGrid(srvs + i); + } + + final int initNodes = srvs + clients; + + final AtomicInteger stopIdx = new AtomicInteger(); + + IgniteInternalFuture stopFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + Thread.sleep(ThreadLocalRandom.current().nextLong(500) + 1); + + stopGrid(stopIdx.incrementAndGet()); + + return null; + } + }, 3, "stop-srv"); + + final AtomicInteger startIdx = new AtomicInteger(initNodes); + + IgniteInternalFuture startFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int nodeIdx = startIdx.incrementAndGet(); + + if (rnd.nextInt(3) == 0) { + log.info("Start client: " + nodeIdx); + + client.set(true); + } + else + log.info("Start server: " + nodeIdx); + + startGrid(nodeIdx); + + if (rnd.nextBoolean()) { + log.info("Stop started node: " + nodeIdx); + + stopGrid(nodeIdx); + } + + return null; + } + }, 5, "start-node"); + + stopFut.get(); + + checkCaches(); + } + finally { + System.clearProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY); + } + } + + /** + * @throws Exception If failed. + */ + public void testMergeStartRandomClientsServers() throws Exception { + for (int iter = 0; iter < 3; iter++) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + final int srvs = rnd.nextInt(3) + 1; + final int clients = rnd.nextInt(3); + + log.info("Iteration [iter=" + iter + ", srvs=" + srvs + ", clients=" + clients + ']'); + + Ignite srv0 = startGrids(srvs); + + for (int i = 0; i < clients; i++) { + client.set(true); + + startGrid(srvs + i); + } + + final int threads = 8; + + final int initNodes = srvs + clients; + + mergeExchangeWaitVersion(srv0, initNodes + threads); + + final AtomicInteger idx = new AtomicInteger(initNodes); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int nodeIdx = idx.incrementAndGet(); + + if (rnd.nextInt(3) == 0) { + log.info("Start client: " + nodeIdx); + + client.set(true); + } + else + log.info("Start server: " + nodeIdx); + + startGrid(nodeIdx); + + return null; + } + }, threads, "test-thread"); + + fut.get(); + + checkCaches(); + + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testMergeStartStopRandomClientsServers() throws Exception { + for (int iter = 0; iter < 3; iter++) { + final int srvs = 5; + final int clients = 5; + + Ignite srv0 = startGrids(srvs); + + for (int i = 0; i < clients; i++) { + client.set(true); + + startGrid(srvs + i); + } + + final int threads = 8; + + final int initNodes = srvs + clients; + + mergeExchangeWaitVersion(srv0, initNodes + threads); + + final AtomicInteger idx = new AtomicInteger(initNodes); + + final ConcurrentHashSet<Integer> stopNodes = new ConcurrentHashSet<>(); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + if (rnd.nextBoolean()) { + Integer stopIdx; + + for (;;) { + stopIdx = rnd.nextInt(initNodes - 1) + 1; + + if (stopNodes.add(stopIdx)) + break; + } + + log.info("Stop node: " + stopIdx); + + stopGrid(getTestIgniteInstanceName(stopIdx), true, false); + } + else { + int nodeIdx = idx.incrementAndGet(); + + if (rnd.nextInt(5) == 0) { + log.info("Start client: " + nodeIdx); + + client.set(true); + } + else + log.info("Start server: " + nodeIdx); + + startGrid(nodeIdx); + } + + return null; + } + }, threads, "test-thread"); + + fut.get(); + + checkCaches(); + + stopAllGrids(); + } + } /** * @throws Exception If failed. @@ -325,6 +529,10 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { checkCaches(); } + // TODO IGNITE-5578 join with start cache, merge with fail + // TODO IGNITE-5578 join with start cache, merge with join, coordinator left + // TODO IGNITE-5578 join with start cache, merge with join, become coordinator + /** * @throws Exception If failed. */ @@ -1035,4 +1243,18 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { */ NON_CRD_RCVD } + + /** + * + */ + + static class TestDelayExchangeMessagesSpi extends TestDelayingCommunicationSpi { + /** {@inheritDoc} */ + @Override protected boolean delayMessage(Message msg, GridIoMessage ioMsg) { + if (msg instanceof GridDhtPartitionsAbstractMessage) + return ((GridDhtPartitionsAbstractMessage)msg).exchangeId() != null || (msg instanceof GridDhtPartitionsSingleRequest); + + return false; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/81dbfd52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java index f94e34b..3f2fe8a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java @@ -29,7 +29,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CachePartialUpdateException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.affinity.Affinity; @@ -37,8 +36,8 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.TestDelayingCommunicationSpi; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -47,16 +46,12 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.spi.IgniteSpiException; -import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jsr166.ThreadLocalRandom8; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; @@ -367,32 +362,11 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA /** * */ - private static class DelayCommunicationSpi extends TcpCommunicationSpi { + private static class DelayCommunicationSpi extends TestDelayingCommunicationSpi { /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) - throws IgniteSpiException { - try { - if (delayMessage((GridIoMessage)msg)) - U.sleep(ThreadLocalRandom8.current().nextInt(250) + 1); - } - catch (IgniteInterruptedCheckedException e) { - throw new IgniteSpiException(e); - } - - super.sendMessage(node, msg, ackC); - } - - /** - * Checks if message should be delayed. - * - * @param msg Message to check. - * @return {@code True} if message should be delayed. - */ - private boolean delayMessage(GridIoMessage msg) { - Object origMsg = msg.message(); - - return delay && - ((origMsg instanceof GridNearAtomicAbstractUpdateRequest) || (origMsg instanceof GridDhtAtomicAbstractUpdateRequest)); + @Override protected boolean delayMessage(Message msg, GridIoMessage ioMsg) { + return delay && ((msg instanceof GridNearAtomicAbstractUpdateRequest) || + (msg instanceof GridDhtAtomicAbstractUpdateRequest)); } } }
