http://git-wip-us.apache.org/repos/asf/ignite/blob/bcfe78b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java index ffb50ca..9d6e82f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java @@ -38,7 +38,7 @@ import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.util.typedef.CAX; @@ -305,10 +305,10 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest { GridDhtPartitionFullMap fullMap = top.partitionMap(true); - for (Map.Entry<UUID, GridDhtPartitionMap> fe : fullMap.entrySet()) { + for (Map.Entry<UUID, GridDhtPartitionMap2> fe : fullMap.entrySet()) { UUID nodeId = fe.getKey(); - GridDhtPartitionMap m = fe.getValue(); + GridDhtPartitionMap2 m = fe.getValue(); for (Map.Entry<Integer, GridDhtPartitionState> e : m.entrySet()) { int p = e.getKey(); @@ -439,12 +439,12 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest { assert orig.keySet().equals(cmp.keySet()); - for (Map.Entry<UUID, GridDhtPartitionMap> entry : orig.entrySet()) { + for (Map.Entry<UUID, GridDhtPartitionMap2> entry : orig.entrySet()) { UUID nodeId = entry.getKey(); - GridDhtPartitionMap nodeMap = entry.getValue(); + GridDhtPartitionMap2 nodeMap = entry.getValue(); - GridDhtPartitionMap cmpMap = cmp.get(nodeId); + GridDhtPartitionMap2 cmpMap = cmp.get(nodeId); assert cmpMap != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bcfe78b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java index a71475c..34e4333 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java @@ -38,7 +38,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; @@ -361,7 +361,7 @@ public class GridCacheDhtPreloadSelfTest extends GridCommonAbstractTest { GridDhtPartitionFullMap allParts = dht.topology().partitionMap(false); - for (GridDhtPartitionMap parts : allParts.values()) { + for (GridDhtPartitionMap2 parts : allParts.values()) { if (!parts.nodeId().equals(g.cluster().localNode().id())) { for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet()) { int p = e.getKey(); http://git-wip-us.apache.org/repos/asf/ignite/blob/bcfe78b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java index df55f7e..dd46e23 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java @@ -38,7 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -114,7 +114,7 @@ public class GridCacheDhtTestUtils { List<Integer> affParts = new LinkedList<>(); - GridDhtPartitionMap map = dht.topology().partitions(locNode.id()); + GridDhtPartitionMap2 map = dht.topology().partitions(locNode.id()); if (map != null) for (int p : map.keySet()) @@ -146,7 +146,7 @@ public class GridCacheDhtTestUtils { System.out.println("\nNode map:"); - for (Map.Entry<UUID, GridDhtPartitionMap> e : top.partitionMap(false).entrySet()) { + for (Map.Entry<UUID, GridDhtPartitionMap2> e : top.partitionMap(false).entrySet()) { List<Integer> list = new ArrayList<>(e.getValue().keySet()); Collections.sort(list); @@ -184,7 +184,7 @@ public class GridCacheDhtTestUtils { // They should be in topology in OWNING state. Collection<Integer> affParts = new HashSet<>(); - GridDhtPartitionMap map = dht.topology().partitions(locNode.id()); + GridDhtPartitionMap2 map = dht.topology().partitions(locNode.id()); if (map != null) for (int p : map.keySet()) http://git-wip-us.apache.org/repos/asf/ignite/blob/bcfe78b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java new file mode 100644 index 0000000..a1ea7ad --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.rebalancing; + +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jsr166.ConcurrentHashMap8; + +/** + * + */ +public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** partitioned cache name. */ + protected static String CACHE = null; + + /** */ + private final ConcurrentHashMap8<UUID, Runnable> rs = new ConcurrentHashMap8<>(); + + /** */ + private volatile boolean record = false; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration iCfg = super.getConfiguration(gridName); + + TcpCommunicationSpi commSpi = new DelayableCommunicationSpi(); + + commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); + commSpi.setTcpNoDelay(true); + + iCfg.setCommunicationSpi(commSpi); + + return iCfg; + } + + /** + * Helps to delay GridDhtPartitionsFullMessages. + */ + public class DelayableCommunicationSpi extends TcpCommunicationSpi { + /** {@inheritDoc} */ + @Override public void sendMessage(final ClusterNode node, final Message msg, + final IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { + final Object msg0 = ((GridIoMessage)msg).message(); + + if (msg0 instanceof GridDhtPartitionsFullMessage && record && + ((GridDhtPartitionsFullMessage)msg0).exchangeId() == null) { + rs.putIfAbsent(node.id(), new Runnable() { + @Override public void run() { + DelayableCommunicationSpi.super.sendMessage(node, msg, ackClosure); + } + }); + } + else + try { + super.sendMessage(node, msg, ackClosure); + } + catch (Exception e) { + U.log(null, e); + } + + } + } + + /** + * @throws Exception e. + */ + public void test() throws Exception { + startGrid(0); + + CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>(); + + cfg.setName(CACHE); + cfg.setCacheMode(CacheMode.PARTITIONED); + cfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cfg.setBackups(1); + + ignite(0).getOrCreateCache(cfg); + + startGrid(1); + startGrid(2); + startGrid(3); + + awaitPartitionMapExchange(true); + + for (int i = 0; i < 2; i++) { + stopGrid(3); + + awaitPartitionMapExchange(true); + + startGrid(3); + + awaitPartitionMapExchange(true); + } + + startGrid(4); + + awaitPartitionMapExchange(true); + + assert rs.isEmpty(); + + record = true; + + while (rs.size() < 3) { // N - 1 nodes. + U.sleep(10); + } + + ignite(0).destroyCache(CACHE); + + ignite(0).getOrCreateCache(cfg); + + awaitPartitionMapExchange(); + + for (Runnable r : rs.values()) { + r.run(); + } + + U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages. + + stopGrid(3); // Forces exchange at all nodes and cause assertion failure in case obsolete partition map accepted. + + awaitPartitionMapExchange(); + + long topVer0 = grid(0).context().cache().context().exchange().readyAffinityVersion().topologyVersion(); + long topVer1 = grid(1).context().cache().context().exchange().readyAffinityVersion().topologyVersion(); + long topVer2 = grid(2).context().cache().context().exchange().readyAffinityVersion().topologyVersion(); + + stopGrid(4); // Should force exchange in case exchange manager alive. + + awaitPartitionMapExchange(); + + // Will fail in case exchange-workers are dead. + assert grid(0).context().cache().context().exchange().readyAffinityVersion().topologyVersion() > topVer0; + assert grid(1).context().cache().context().exchange().readyAffinityVersion().topologyVersion() > topVer1; + assert grid(2).context().cache().context().exchange().readyAffinityVersion().topologyVersion() > topVer2; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/bcfe78b0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java index ea13cdd..b02d022 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNea import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearPartitionedP2PEnabledByteArrayValuesSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePutArrayValueSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRabalancingDelayedPartitionMapExchangeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingUnmarshallingFailedSelfTest; @@ -141,6 +142,7 @@ public class IgniteCacheTestSuite3 extends TestSuite { suite.addTestSuite(GridCacheRebalancingSyncSelfTest.class); suite.addTestSuite(GridCacheRebalancingUnmarshallingFailedSelfTest.class); suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class); + suite.addTestSuite(GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.class); // Test for byte array value special case. suite.addTestSuite(GridCacheLocalByteArrayValuesSelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/bcfe78b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 9cf1e75..f515a78 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -58,7 +58,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; @@ -358,7 +358,7 @@ public class GridReduceQueryExecutor { private boolean hasMovingPartitions(GridCacheContext<?,?> cctx) { GridDhtPartitionFullMap fullMap = cctx.topology().partitionMap(false); - for (GridDhtPartitionMap map : fullMap.values()) { + for (GridDhtPartitionMap2 map : fullMap.values()) { if (map.hasMovingPartitions()) return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/bcfe78b0/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/WaitMapExchangeFinishCallable.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/WaitMapExchangeFinishCallable.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/WaitMapExchangeFinishCallable.java index 83c50bd..ac91b51 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/WaitMapExchangeFinishCallable.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/WaitMapExchangeFinishCallable.java @@ -25,7 +25,7 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.resources.IgniteInstanceResource; import org.yardstickframework.BenchmarkUtils; @@ -53,7 +53,7 @@ public class WaitMapExchangeFinishCallable implements IgniteCallable<Void> { boolean success = true; if (top.topologyVersion().topologyVersion() == ignite.cluster().topologyVersion()) { - for (Map.Entry<UUID, GridDhtPartitionMap> e : top.partitionMap(true).entrySet()) { + for (Map.Entry<UUID, GridDhtPartitionMap2> e : top.partitionMap(true).entrySet()) { for (Map.Entry<Integer, GridDhtPartitionState> p : e.getValue().entrySet()) { if (p.getValue() != GridDhtPartitionState.OWNING) { BenchmarkUtils.println("Not owning partition [part=" + p.getKey() + http://git-wip-us.apache.org/repos/asf/ignite/blob/bcfe78b0/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java index 83fc58f..1a700c2 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java @@ -38,7 +38,7 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.mxbean.IgniteMXBean; @@ -196,7 +196,7 @@ public abstract class IgniteFailoverAbstractBenchmark<K, V> extends IgniteCacheA GridDhtPartitionFullMap partMap = dht.topology().partitionMap(true); - for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) { + for (Map.Entry<UUID, GridDhtPartitionMap2> e : partMap.entrySet()) { log.info("Checking node: " + e.getKey()); for (Map.Entry<Integer, GridDhtPartitionState> e1 : e.getValue().entrySet()) {
