Repository: ignite Updated Branches: refs/heads/ignite-4154-opt2 74d0ecf53 -> 852a7ec85
ignite-4154 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/852a7ec8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/852a7ec8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/852a7ec8 Branch: refs/heads/ignite-4154-opt2 Commit: 852a7ec85f62fee52b5360f148521946478808ec Parents: 74d0ecf Author: sboikov <[email protected]> Authored: Mon Nov 21 15:24:16 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 21 15:24:16 2016 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 1 + .../dht/GridDhtPartitionTopologyImpl.java | 19 +++---- .../GridDhtPartitionsExchangeFuture.java | 59 ++++++++++++++++---- 3 files changed, 57 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/852a7ec8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 16ce38a..c418627 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1128,6 +1128,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana ", singleMsgUpdateTime=" + exchFut.singleMsgUpdateTime + ", singleMsgUpdateCnt=" + exchFut.singleMsgUpdateCnt + ", singleMsgUpdateMaxTime=" + exchFut.singleMsgUpdateMaxTime + + ", singleMsgUpdateMinTime=" + exchFut.singleMsgUpdateMinTime + ", err=" + err + ']'); } else http://git-wip-us.apache.org/repos/asf/ignite/blob/852a7ec8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index f50116d..7e74b91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1174,22 +1174,17 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (cntrMap != null) { for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) { - Long cntr = this.cntrMap.get(e.getKey()); + Integer p = e.getKey(); - if (cntr == null || cntr < e.getValue()) - this.cntrMap.put(e.getKey(), e.getValue()); - } - - for (int i = 0; i < locParts.length; i++) { - GridDhtLocalPartition part = locParts[i]; + Long cntr = this.cntrMap.get(p); - if (part == null) - continue; + if (cntr == null || cntr < e.getValue()) + this.cntrMap.put(p, e.getValue()); - Long cntr = cntrMap.get(part.id()); + GridDhtLocalPartition part = locParts[p]; - if (cntr != null) - part.updateCounter(cntr); + if (part != null) + part.updateCounter(e.getValue()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/852a7ec8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index a4f615f..66f3ed1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -111,6 +111,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT private final Set<UUID> remaining = new HashSet<>(); /** */ + @GridToStringExclude + private int pendingSingleUpdates; + + /** */ public int singleMsgUpdateCnt; /** */ @@ -120,6 +124,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT public long singleMsgUpdateMaxTime; /** */ + public long singleMsgUpdateMinTime = Long.MAX_VALUE; + + /** */ @GridToStringExclude private List<ClusterNode> srvNodes; @@ -1244,34 +1251,66 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT */ private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) { boolean allReceived = false; + boolean updateSingleMap = false; synchronized (mux) { assert crd != null; if (crd.isLocal()) { - long start = U.currentTimeMillis(); - if (remaining.remove(node.id())) { - updatePartitionSingleMap(msg); + updateSingleMap = true; + + pendingSingleUpdates++; allReceived = remaining.isEmpty(); } singleMsgUpdateCnt++; + } + else + singleMsgs.put(node, msg); + } - long time = U.currentTimeMillis() - start; + if (updateSingleMap) { + long start = U.currentTimeMillis(); - if (time > singleMsgUpdateMaxTime) - singleMsgUpdateMaxTime = time; + try { + updatePartitionSingleMap(msg); + } + finally { + synchronized (mux) { + long time = U.currentTimeMillis() - start; - singleMsgUpdateTime += time; + if (time > singleMsgUpdateMaxTime) + singleMsgUpdateMaxTime = time; + if (time < singleMsgUpdateMinTime) + singleMsgUpdateMinTime = time; + + singleMsgUpdateTime += time; + + assert pendingSingleUpdates > 0; + + pendingSingleUpdates--; + + if (pendingSingleUpdates == 0) + mux.notifyAll(); + } } - else - singleMsgs.put(node, msg); } - if (allReceived) + if (allReceived) { + synchronized (mux) { + try { + while (pendingSingleUpdates > 0) + U.wait(mux); + } + catch (IgniteInterruptedCheckedException e) { + U.warn(log, "Failed to wait for partition map updates."); + } + } + onAllReceived(false); + } } /**
