Repository: ignite
Updated Branches:
  refs/heads/ignite-4154-opt2 74d0ecf53 -> 852a7ec85


ignite-4154


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/852a7ec8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/852a7ec8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/852a7ec8

Branch: refs/heads/ignite-4154-opt2
Commit: 852a7ec85f62fee52b5360f148521946478808ec
Parents: 74d0ecf
Author: sboikov <[email protected]>
Authored: Mon Nov 21 15:24:16 2016 +0300
Committer: sboikov <[email protected]>
Committed: Mon Nov 21 15:24:16 2016 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |  1 +
 .../dht/GridDhtPartitionTopologyImpl.java       | 19 +++----
 .../GridDhtPartitionsExchangeFuture.java        | 59 ++++++++++++++++----
 3 files changed, 57 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/852a7ec8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 16ce38a..c418627 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1128,6 +1128,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                 ", singleMsgUpdateTime=" + exchFut.singleMsgUpdateTime +
                 ", singleMsgUpdateCnt=" + exchFut.singleMsgUpdateCnt +
                 ", singleMsgUpdateMaxTime=" + exchFut.singleMsgUpdateMaxTime +
+                ", singleMsgUpdateMinTime=" + exchFut.singleMsgUpdateMinTime +
                 ", err=" + err + ']');
         }
         else

http://git-wip-us.apache.org/repos/asf/ignite/blob/852a7ec8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index f50116d..7e74b91 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1174,22 +1174,17 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
             if (cntrMap != null) {
                 for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
-                    Long cntr = this.cntrMap.get(e.getKey());
+                    Integer p = e.getKey();
 
-                    if (cntr == null || cntr < e.getValue())
-                        this.cntrMap.put(e.getKey(), e.getValue());
-                }
-
-                for (int i = 0; i < locParts.length; i++) {
-                    GridDhtLocalPartition part = locParts[i];
+                    Long cntr = this.cntrMap.get(p);
 
-                    if (part == null)
-                        continue;
+                    if (cntr == null || cntr < e.getValue())
+                        this.cntrMap.put(p, e.getValue());
 
-                    Long cntr = cntrMap.get(part.id());
+                    GridDhtLocalPartition part = locParts[p];
 
-                    if (cntr != null)
-                        part.updateCounter(cntr);
+                    if (part != null)
+                        part.updateCounter(e.getValue());
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/852a7ec8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a4f615f..66f3ed1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -111,6 +111,10 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     private final Set<UUID> remaining = new HashSet<>();
 
     /** */
+    @GridToStringExclude
+    private int pendingSingleUpdates;
+
+    /** */
     public int singleMsgUpdateCnt;
 
     /** */
@@ -120,6 +124,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     public long singleMsgUpdateMaxTime;
 
     /** */
+    public long singleMsgUpdateMinTime = Long.MAX_VALUE;
+
+    /** */
     @GridToStringExclude
     private List<ClusterNode> srvNodes;
 
@@ -1244,34 +1251,66 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
      */
     private void processMessage(ClusterNode node, 
GridDhtPartitionsSingleMessage msg) {
         boolean allReceived = false;
+        boolean updateSingleMap = false;
 
         synchronized (mux) {
             assert crd != null;
 
             if (crd.isLocal()) {
-                long start = U.currentTimeMillis();
-
                 if (remaining.remove(node.id())) {
-                    updatePartitionSingleMap(msg);
+                    updateSingleMap = true;
+
+                    pendingSingleUpdates++;
 
                     allReceived = remaining.isEmpty();
                 }
 
                 singleMsgUpdateCnt++;
+            }
+            else
+                singleMsgs.put(node, msg);
+        }
 
-                long time = U.currentTimeMillis() - start;
+        if (updateSingleMap) {
+            long start = U.currentTimeMillis();
 
-                if (time > singleMsgUpdateMaxTime)
-                    singleMsgUpdateMaxTime = time;
+            try {
+                updatePartitionSingleMap(msg);
+            }
+            finally {
+                synchronized (mux) {
+                    long time = U.currentTimeMillis() - start;
 
-                singleMsgUpdateTime += time;
+                    if (time > singleMsgUpdateMaxTime)
+                        singleMsgUpdateMaxTime = time;
+                    if (time < singleMsgUpdateMinTime)
+                        singleMsgUpdateMinTime = time;
+
+                    singleMsgUpdateTime += time;
+
+                    assert pendingSingleUpdates > 0;
+
+                    pendingSingleUpdates--;
+
+                    if (pendingSingleUpdates == 0)
+                        mux.notifyAll();
+                }
             }
-            else
-                singleMsgs.put(node, msg);
         }
 
-        if (allReceived)
+        if (allReceived) {
+            synchronized (mux) {
+                try {
+                    while (pendingSingleUpdates > 0)
+                        U.wait(mux);
+                }
+                catch (IgniteInterruptedCheckedException e) {
+                    U.warn(log, "Failed to wait for partition map updates.");
+                }
+            }
+
             onAllReceived(false);
+        }
     }
 
     /**

Reply via email to