Repository: ignite Updated Branches: refs/heads/ignite-1913 [created] 3fe6cce3e
Ignite-1913 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3fe6cce3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3fe6cce3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3fe6cce3 Branch: refs/heads/ignite-1913 Commit: 3fe6cce3ec2db2e0f2789fad922c335d24240ea1 Parents: 3a8c19e Author: Anton Vinogradov <[email protected]> Authored: Fri Nov 13 18:07:50 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Fri Nov 13 18:07:50 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 76 ++++++-- .../GridDhtPartitionsExchangeFuture.java | 3 +- .../GridDhtPartitionsSingleMessage.java | 25 ++- ...cingDelayedPartitionMapExchangeSelfTest.java | 177 +++++++++++++++++++ 4 files changed, 267 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3fe6cce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 81ff028..f93c40f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -713,12 +713,35 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @return {@code True} if message was sent, {@code false} if node left grid. */ private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) { - GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE); + boolean retry = false; - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal() && cacheCtx.started()) - m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true)); + GridDhtPartitionsFullMessage m; + + do { + AffinityTopologyVersion topVer = cctx.exchange().topologyVersion(); + + m = new GridDhtPartitionsFullMessage(null, null, topVer); + + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + cacheCtx.topology().readLock(); + + try { + if (!cacheCtx.topology().topologyVersion().equals(topVer)) { + retry = true; + + break; + } + + if (!cacheCtx.isLocal() && cacheCtx.started()) + m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true)); + + } + finally { + cacheCtx.topology().readUnlock(); + } + } } + while (retry); // It is important that client topologies be added after contexts. for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) @@ -749,17 +772,38 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param id ID. */ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) { - GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, - cctx.kernalContext().clientNode(), - cctx.versions().last()); + boolean retry = false; - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) { - GridDhtPartitionMap locMap = cacheCtx.topology().localPartitionMap(); + GridDhtPartitionsSingleMessage m; - m.addLocalPartitionMap(cacheCtx.cacheId(), locMap); + do { + AffinityTopologyVersion topVer = cctx.exchange().topologyVersion(); + + m = new GridDhtPartitionsSingleMessage(id, + cctx.kernalContext().clientNode(), + cctx.versions().last(), + topVer); + + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (!cacheCtx.isLocal()) { + cacheCtx.topology().readLock(); + + try { + if (!cacheCtx.topology().topologyVersion().equals(topVer)) { + retry = true; + + break; + } + + m.addLocalPartitionMap(cacheCtx.cacheId(), cacheCtx.topology().localPartitionMap()); + } + finally { + cacheCtx.topology().readUnlock(); + } + } } } + while (retry); for (GridClientPartitionTopology top : clientTops.values()) { GridDhtPartitionMap locMap = top.localPartitionMap(); @@ -925,6 +969,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + if (cacheCtx != null && cacheCtx.startTopologyVersion() != null && + msg.topologyVersion() != AffinityTopologyVersion.NONE && // Backward compatibility. + cacheCtx.startTopologyVersion().compareTo(msg.topologyVersion()) > 0) + continue; + if (cacheCtx != null && !cacheCtx.started()) continue; // Can safely ignore background exchange. @@ -971,6 +1020,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + if (cacheCtx != null && cacheCtx.startTopologyVersion() != null && + msg.topologyVersion() != AffinityTopologyVersion.NONE && // Backward compatibility. + cacheCtx.startTopologyVersion().compareTo(msg.topologyVersion()) > 0) + continue; + GridDhtPartitionTopology top = null; if (cacheCtx == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/3fe6cce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 2f2944d..4fc4704 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -959,7 +959,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException { GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, clientOnlyExchange, - cctx.versions().last()); + cctx.versions().last(), + id.topologyVersion()); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) http://git-wip-us.apache.org/repos/asf/ignite/blob/3fe6cce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 83fbb1a..bb6ea27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -23,12 +23,14 @@ import java.util.HashMap; import java.util.Map; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** @@ -49,6 +51,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** */ private boolean client; + /** Topology version. */ + private AffinityTopologyVersion topVer; + /** * Required by {@link Externalizable}. */ @@ -63,10 +68,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes */ public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, boolean client, - @Nullable GridCacheVersion lastVer) { + @Nullable GridCacheVersion lastVer, + @NotNull AffinityTopologyVersion topVer) { super(exchId, lastVer); this.client = client; + this.topVer = topVer; } /** @@ -140,6 +147,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes writer.incrementState(); + case 7: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + } return true; @@ -172,6 +185,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); + case 7: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridDhtPartitionsSingleMessage.class); @@ -184,7 +205,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 7; + return 8; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3fe6cce3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java new file mode 100644 index 0000000..e275a49 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.rebalancing; + +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jsr166.ConcurrentHashMap8; + +/** + * + */ +public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** partitioned cache name. */ + protected static String CACHE = null; + + /** */ + private final ConcurrentHashMap8<UUID, Runnable> rs = new ConcurrentHashMap8<>(); + + /** */ + private volatile boolean record = false; + + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration iCfg = super.getConfiguration(gridName); + + TcpCommunicationSpi commSpi = new DelayableCommunicationSpi(); + + commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); + commSpi.setTcpNoDelay(true); + + iCfg.setCommunicationSpi(commSpi); + + return iCfg; + } + + /** + * Helps to delay GridDhtPartitionsFullMessages. + */ + public class DelayableCommunicationSpi extends TcpCommunicationSpi { + /** {@inheritDoc} */ + @Override public void sendMessage(final ClusterNode node, final Message msg, + final IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { + final Object msg0 = ((GridIoMessage)msg).message(); + + if (msg0 instanceof GridDhtPartitionsFullMessage && record && + ((GridDhtPartitionsFullMessage)msg0).exchangeId() == null) { + rs.putIfAbsent(node.id(), new Runnable() { + @Override public void run() { + DelayableCommunicationSpi.super.sendMessage(node, msg, ackClosure); + } + }); + } + else + try { + super.sendMessage(node, msg, ackClosure); + } + catch (Exception e) { + U.log(null, e); + } + + } + } + + /** + * @throws Exception e. + */ + public void test() throws Exception { + startGrid(0); + + CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>(); + + cfg.setName(CACHE); + cfg.setCacheMode(CacheMode.PARTITIONED); + cfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cfg.setBackups(1); + + ignite(0).getOrCreateCache(cfg); + + startGrid(1); + startGrid(2); + startGrid(3); + + awaitPartitionMapExchange(true); + + for (int i = 0; i < 2; i++) { + stopGrid(3); + + awaitPartitionMapExchange(true); + + startGrid(3); + + awaitPartitionMapExchange(true); + } + + assert rs.isEmpty(); + + record = true; + + while (rs.size() < 3) { // N - 1 nodes. + U.sleep(10); + } + + ignite(0).destroyCache(CACHE); + + ignite(0).getOrCreateCache(cfg); + + awaitPartitionMapExchange(); + + for (Runnable r : rs.values()) { + r.run(); + } + + U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages. + + stopGrid(3); // Forces exchange at all nodes. + + awaitPartitionMapExchange(); + + long topVer1 = grid(1).context().cache().internalCache(CACHE).context().affinity().affinityTopologyVersion() + .topologyVersion(); + long topVer2 = grid(2).context().cache().internalCache(CACHE).context().affinity().affinityTopologyVersion() + .topologyVersion(); + + stopGrid(0); // Should force exchange. + + awaitPartitionMapExchange(); + + // Will fail in case exchange-workers are dead. + assert grid(1).context().cache().internalCache(CACHE).context().affinity().affinityTopologyVersion() + .topologyVersion() > topVer1; + assert grid(2).context().cache().internalCache(CACHE).context().affinity().affinityTopologyVersion() + .topologyVersion() > topVer2; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + +}
