1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93caa0b8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93caa0b8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93caa0b8 Branch: refs/heads/ignite-1093-2 Commit: 93caa0b8bb9eb4f5f667d0533c15ce7a9efcdc17 Parents: 2cb397a Author: Anton Vinogradov <[email protected]> Authored: Fri Sep 18 21:21:17 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Fri Sep 18 21:21:17 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 11 ++- .../dht/preloader/GridDhtPartitionSupplier.java | 99 +++++++++----------- 2 files changed, 52 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/93caa0b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 596ec2f..a2f8c01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -827,7 +827,7 @@ public class GridDhtPartitionDemander { lock.lock(); try { - remaining.put(nodeId, new IgniteBiTuple<>(System.currentTimeMillis(), parts)); + remaining.put(nodeId, new IgniteBiTuple<>(U.currentTimeMillis(), parts)); } finally { lock.unlock(); @@ -949,7 +949,8 @@ public class GridDhtPartitionDemander { parts.remove(p); if (parts.isEmpty()) { - U.log(log, ("Completed rebalancing [cache=" + cctx.name() + + U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") + + "rebalancing [cache=" + cctx.name() + ", fromNode=" + nodeId + ", topology=" + topologyVersion() + ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]")); @@ -988,6 +989,9 @@ public class GridDhtPartitionDemander { */ private void checkIsDone() { if (remaining.isEmpty()) { + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt)) + preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); + if (log.isDebugEnabled()) log.debug("Completed sync future."); @@ -1006,9 +1010,6 @@ public class GridDhtPartitionDemander { cctx.shared().exchange().scheduleResendPartitions(); - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt)) - preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); - onDone(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/93caa0b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 50d64f9..1d8572a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -24,7 +24,6 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -41,12 +40,11 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.typedef.CI2; -import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.T4; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.jsr166.ConcurrentHashMap8; -import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -72,7 +70,7 @@ class GridDhtPartitionSupplier { private IgnitePredicate<GridCacheEntryInfo> preloadPred; /** Supply context map. */ - private final ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>(); + private final ConcurrentHashMap8<T4, SupplyContext> scMap = new ConcurrentHashMap8<>(); /** Rebalancing listener. */ private GridLocalEventListener lsnr; @@ -98,19 +96,8 @@ class GridDhtPartitionSupplier { void start() { lsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { - if (evt instanceof CacheRebalancingEvent) { - ClusterNode node = ((CacheRebalancingEvent)evt).discoveryNode(); - - int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize(); - - for (int idx = 0; idx < lsnrCnt; idx++) { - T2<UUID, Integer> scId = new T2<>(node.id(), idx); - - tryClearContext(scMap, scId, log); - } - } - else if (evt instanceof DiscoveryEvent) { - scMap.clear(); + if (evt instanceof DiscoveryEvent) { + clearContexts(scMap, log, cctx); } else { assert false; @@ -118,7 +105,9 @@ class GridDhtPartitionSupplier { } }; - cctx.events().addListener(lsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_CACHE_REBALANCE_STOPPED); + //todo: rebalance stopped. + + cctx.events().addListener(lsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); startOldListeners(); } @@ -133,19 +122,33 @@ class GridDhtPartitionSupplier { } /** - * Clear context by id. + * Clear contexts. * * @param map Context map. - * @param scId Context id. * @param log Logger. + * @param cctx Context. */ - private static void tryClearContext( - ConcurrentHashMap8<T2, SupplyContext> map, - T2<UUID, Integer> scId, - IgniteLogger log) { - SupplyContext sc = map.get(scId); + private static void clearContexts( + ConcurrentHashMap8<T4, SupplyContext> map, IgniteLogger log, GridCacheContext<?, ?> cctx) { + for (Map.Entry<T4, SupplyContext> entry : map.entrySet()) { + clearContext(map, entry.getKey(), entry.getValue(), log, cctx); + } + } - if (sc != null) { + /** + * Clear context. + * + * @param map Context map. + * @param log Logger. + */ + private static boolean clearContext( + ConcurrentHashMap8<T4, SupplyContext> map, + T4 t, + SupplyContext sc, + IgniteLogger log, + GridCacheContext<?, ?> cctx) { + + if (!t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null) { Iterator it = sc.entryIt; if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) { @@ -159,9 +162,11 @@ class GridDhtPartitionSupplier { log.error("Iterator close failed.", e); } } + + return map.remove(t, sc); } - map.remove(scId, sc); + return false; } /** @@ -190,21 +195,16 @@ class GridDhtPartitionSupplier { ClusterNode node = cctx.discovery().node(id); - T2<UUID, Integer> scId = new T2<>(id, idx); + T4<UUID, Integer, AffinityTopologyVersion, Long> scId = new T4<>(id, idx, d.topologyVersion(), d.updateSequence()); try { SupplyContext sctx = scMap.get(scId); - if (sctx == null) { - if (d.partitions().isEmpty()) - return; - } - else { - if (!sctx.top.equals(d.topologyVersion())) { - tryClearContext(scMap, scId, log); + if (sctx == null && d.partitions().isEmpty()) + return; - sctx = scMap.get(scId); - } + if (sctx != null && !d.partitions().isEmpty()) { + assert false; } long bCnt = 0; @@ -277,7 +277,7 @@ class GridDhtPartitionSupplier { if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { if (++bCnt >= maxBatchesCnt) { - saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr, d.topologyVersion()); + saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr); swapLsnr = null; @@ -321,8 +321,7 @@ class GridDhtPartitionSupplier { partIt, null, swapLsnr, - part, - d.topologyVersion()); + part); } } @@ -353,7 +352,7 @@ class GridDhtPartitionSupplier { if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { if (++bCnt >= maxBatchesCnt) { - saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr, d.topologyVersion()); + saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr); swapLsnr = null; @@ -436,8 +435,7 @@ class GridDhtPartitionSupplier { partIt, null, null, - part, - d.topologyVersion()); + part); } } @@ -465,7 +463,7 @@ class GridDhtPartitionSupplier { if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { if (++bCnt >= maxBatchesCnt) { - saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr, d.topologyVersion()); + saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr); swapLsnr = null; @@ -557,13 +555,12 @@ class GridDhtPartitionSupplier { * @param swapLsnr Swap listener. */ private void saveSupplyContext( - T2 t, + T4 t, int phase, Iterator<Integer> partIt, int part, - Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr, - AffinityTopologyVersion top) { - scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part, top)); + Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr) { + scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part)); } /** @@ -585,9 +582,6 @@ class GridDhtPartitionSupplier { /** Partition. */ private final int part; - /** Topology version. */ - private final AffinityTopologyVersion top; - /** * @param phase Phase. * @param partIt Partition iterator. @@ -596,13 +590,12 @@ class GridDhtPartitionSupplier { * @param part Partition. */ public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt, - GridCacheEntryInfoCollectSwapListener swapLsnr, int part, AffinityTopologyVersion top) { + GridCacheEntryInfoCollectSwapListener swapLsnr, int part) { this.phase = phase; this.partIt = partIt; this.entryIt = entryIt; this.swapLsnr = swapLsnr; this.part = part; - this.top = top; } }
