1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7e91796 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7e91796 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7e91796 Branch: refs/heads/ignite-1093-2 Commit: b7e9179604833dad2b358917413d092989f2bd55 Parents: a8b323d Author: Anton Vinogradov <[email protected]> Authored: Wed Sep 23 13:00:42 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Wed Sep 23 13:00:42 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 42 ++++++++++++----- .../dht/preloader/GridDhtPartitionSupplier.java | 49 +++++++++++--------- 2 files changed, 57 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b7e91796/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index bbb6a21..345e3bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -64,11 +64,11 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.thread.IgniteThread; @@ -391,7 +391,7 @@ public class GridDhtPartitionDemander { //Check remote node rebalancing API version. if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION))) { - fut.appendPartitions(node.id(), d.partitions()); + fut.appendPartitions(node.id(), d.partitions(), d.updateSequence()); int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize(); @@ -418,10 +418,13 @@ public class GridDhtPartitionDemander { try { if (!topologyChanged(fut)) { - cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout()); + cctx.io().sendOrderedMessage(node, + GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout()); if (log.isDebugEnabled()) - log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + cnt + ", partitions count=" + sParts.get(cnt).size() + " (" + partitionsList(sParts.get(cnt)) + ")]"); + log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + + cnt + ", partitions count=" + sParts.get(cnt).size() + + " (" + partitionsList(sParts.get(cnt)) + ")]"); } else { @@ -441,7 +444,7 @@ public class GridDhtPartitionDemander { else { DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut); - fut.appendPartitions(node.id(), d.partitions()); + fut.appendPartitions(node.id(), d.partitions(), d.updateSequence()); dw.run(node, d); } @@ -513,7 +516,7 @@ public class GridDhtPartitionDemander { assert node != null; - if (!fut.topologyVersion().equals(topVer) || topologyChanged(fut)) + if (!fut.topologyVersion().equals(topVer) || topologyChanged(fut) || !fut.isActual(id, supply.updateSequence())) return; if (log.isDebugEnabled()) @@ -744,7 +747,7 @@ public class GridDhtPartitionDemander { /** * */ - public static class SyncFuture extends GridFutureAdapter<Object> { + public static class SyncFuture extends GridFutureAdapter<Boolean> { /** */ private static final long serialVersionUID = 1L; @@ -757,8 +760,8 @@ public class GridDhtPartitionDemander { /** */ private final IgniteLogger log; - /** Remaining. */ - private final Map<UUID, IgniteBiTuple<Long, Collection<Integer>>> remaining = new HashMap<>(); + /** Remaining. T3: startTime, partitions, updateSequence */ + private final Map<UUID, T3<Long, Collection<Integer>, Long>> remaining = new HashMap<>(); /** Missed. */ private final Map<UUID, Collection<Integer>> missed = new HashMap<>(); @@ -819,6 +822,17 @@ public class GridDhtPartitionDemander { } /** + * @param nodeId Node id. + * @param updateSeq Update sequence. + * @return true in case future created for specified updateSeq, false in other case. + */ + private boolean isActual(UUID nodeId, long updateSeq) { + T3<Long, Collection<Integer>, Long> t = remaining.get(nodeId); + + return t != null ? t.get3().equals(updateSeq) : false; + } + + /** * @return Is dummy (created at demander creation). */ private boolean isDummy() { @@ -829,11 +843,11 @@ public class GridDhtPartitionDemander { * @param nodeId Node id. * @param parts Parts. */ - private void appendPartitions(UUID nodeId, Collection<Integer> parts) { + private void appendPartitions(UUID nodeId, Collection<Integer> parts, long updateSeq) { lock.lock(); try { - remaining.put(nodeId, new IgniteBiTuple<>(U.currentTimeMillis(), parts)); + remaining.put(nodeId, new T3<>(U.currentTimeMillis(), parts, updateSeq)); } finally { lock.unlock(); @@ -1014,12 +1028,16 @@ public class GridDhtPartitionDemander { U.log(log, ("Reassigning partitions that were missed: " + m)); cctx.shared().exchange().forceDummyExchange(true, exchFut); + + onDone(false); //Finished but has missed partitions and forced dummy exchange + + return; } cctx.shared().exchange().scheduleResendPartitions(); } - onDone(); + onDone(true); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b7e91796/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index d33dc5a..a4bd134 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -131,7 +131,12 @@ class GridDhtPartitionSupplier { private static void clearContexts( ConcurrentHashMap8<T4, SupplyContext> map, IgniteLogger log, GridCacheContext<?, ?> cctx) { for (Map.Entry<T4, SupplyContext> entry : map.entrySet()) { - clearContext(map, entry.getKey(), entry.getValue(), log, cctx); + T4 t = entry.getKey(); + + SupplyContext sc = entry.getValue(); + + if (t.get3() != null && !t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null) + clearContext(map, t, sc, log); } } @@ -139,34 +144,31 @@ class GridDhtPartitionSupplier { * Clear context. * * @param map Context map. + * @param t id. + * @param sc Supply context. * @param log Logger. + * @return true in case context was removed. */ private static boolean clearContext( - ConcurrentHashMap8<T4, SupplyContext> map, - T4 t, - SupplyContext sc, - IgniteLogger log, - GridCacheContext<?, ?> cctx) { - - if (!t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null) { - Iterator it = sc.entryIt; - - if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) { - try { - synchronized (map) { - if (!((GridCloseableIterator)it).isClosed()) - ((GridCloseableIterator)it).close(); - } - } - catch (IgniteCheckedException e) { - log.error("Iterator close failed.", e); + final ConcurrentHashMap8<T4, SupplyContext> map, + final T4 t, + final SupplyContext sc, + final IgniteLogger log) { + final Iterator it = sc.entryIt; + + if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) { + try { + synchronized (it) { + if (!((GridCloseableIterator)it).isClosed()) + ((GridCloseableIterator)it).close(); } } - - return map.remove(t, sc); + catch (IgniteCheckedException e) { + log.error("Iterator close failed.", e); + } } - return false; + return map.remove(t, sc); } /** @@ -528,6 +530,9 @@ class GridDhtPartitionSupplier { if (log.isDebugEnabled()) log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); + if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion())) + return true; + cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); // Throttle preloading.
