Repository: ignite
Updated Branches:
  refs/heads/ignite-slow-rebal 1597e6ca1 -> a950de966


slow rebalancing


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a950de96
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a950de96
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a950de96

Branch: refs/heads/ignite-slow-rebal
Commit: a950de96624dcef64b6e8935c4f82ef7629035b3
Parents: 1597e6c
Author: Denis Magda <[email protected]>
Authored: Wed Sep 30 12:30:47 2015 +0300
Committer: Denis Magda <[email protected]>
Committed: Wed Sep 30 12:30:47 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 48 ++++++++++++++++++--
 .../preloader/GridDhtPartitionsFullMessage.java |  4 +-
 .../GridDhtPartitionsSingleMessage.java         |  4 +-
 3 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a950de96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index eb76233..630d57a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -626,6 +627,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * Schedules next full partitions update.
      */
     public void scheduleResendPartitions() {
+        log.info("scheduleResendPartitoins");
+
         ResendTimeoutObject timeout = pendingResend.get();
 
         if (timeout == null || timeout.started()) {
@@ -668,6 +671,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                 log.debug("Refreshing local partitions from non-oldest node: " 
+
                     cctx.localNodeId());
 
+            log.info("Refreshing local partitions from non-oldest node: 
[locNode= " +
+                cctx.localNodeId() + ']');
+
             sendLocalPartitions(oldest, null);
         }
     }
@@ -701,9 +707,13 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) 
{
         GridDhtPartitionsFullMessage m = new 
GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE);
 
+        List<String> caches = new ArrayList<>();
+
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal() && cacheCtx.started())
+            if (!cacheCtx.isLocal() && cacheCtx.started()) {
                 m.addFullPartitionsMap(cacheCtx.cacheId(), 
cacheCtx.topology().partitionMap(true));
+                caches.add(cacheCtx.name());
+            }
         }
 
         // It is important that client topologies be added after contexts.
@@ -713,9 +723,20 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         if (log.isDebugEnabled())
             log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + 
", msg=" + m + ']');
 
+        log.info("Before sending all partitions: [rmtNodes=" + nodes + ']');
+
         for (ClusterNode node : nodes) {
             try {
+                long time = System.currentTimeMillis();
+
+                log.info("Start sending all partitions [caches=" + caches + ", 
time=" + new Date(time) + ", node=" + node + ']');
+
                 cctx.io().sendNoRetry(node, m, SYSTEM_POOL);
+
+                long passed = System.currentTimeMillis();
+
+                log.info("Stop sending all partitions [caches=" + caches + 
",time=" + new Date(passed) + ", diff=" + (passed - time) +
+                    ", node=" + node + ']');
             }
             catch (ClusterTopologyCheckedException ignore) {
                 if (log.isDebugEnabled())
@@ -904,6 +925,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                 if (log.isDebugEnabled())
                     log.debug("Received full partition update [node=" + 
node.id() + ", msg=" + msg + ']');
 
+                log.info("Received full partition update [node=" + node.id() + 
", msg=" + msg + ", time=" + new Date() + ']');
+
                 boolean updated = false;
 
                 for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : 
msg.partitions().entrySet()) {
@@ -925,8 +948,10 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                         updated |= top.update(null, entry.getValue()) != null;
                 }
 
-                if (!cctx.kernalContext().clientNode() && updated)
+                if (!cctx.kernalContext().clientNode() && updated) {
+                    log.info("refreshPartitions: processFullPartitionUpdate");
                     refreshPartitions();
+                }
             }
             else
                 exchangeFuture(msg.exchangeId(), null, 
null).onReceive(node.id(), msg);
@@ -950,6 +975,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     log.debug("Received local partition update [nodeId=" + 
node.id() + ", parts=" +
                         msg + ']');
 
+                log.info("Received local partition update [nodeId=" + 
node.id() + ", parts=" +
+                    msg + ']');
+
                 boolean updated = false;
 
                 for (Map.Entry<Integer, GridDhtPartitionMap> entry : 
msg.partitions().entrySet()) {
@@ -968,8 +996,10 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                         updated |= top.update(null, entry.getValue()) != null;
                 }
 
-                if (updated)
+                if (updated) {
+                    log.info("Partitions updated, schedule: [sender=" + node + 
']');
                     scheduleResendPartitions();
+                }
             }
             else {
                 if (msg.client()) {
@@ -1149,6 +1179,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     // If not first preloading and no more topology events 
present,
                     // then we periodically refresh partition map.
                     if (!cctx.kernalContext().clientNode() && futQ.isEmpty() 
&& preloadFinished) {
+                        log.info("refreshPartitions: preloadFinished");
+
                         refreshPartitions(timeout);
 
                         timeout = cctx.gridConfig().getNetworkTimeout();
@@ -1214,8 +1246,11 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                             startEvtFired = true;
 
-                            if (!cctx.kernalContext().clientNode() && changed 
&& futQ.isEmpty())
+                            if (!cctx.kernalContext().clientNode() && changed 
&& futQ.isEmpty()) {
+                                log.info("refreshPartitions: ExcahngeWorker 
body");
+
                                 refreshPartitions();
+                            }
                         }
                         else {
                             if (log.isDebugEnabled())
@@ -1311,8 +1346,11 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                         return;
 
                     try {
-                        if (started.compareAndSet(false, true))
+                        if (started.compareAndSet(false, true)) {
+                            log.info("refreshPartitions: ResendTimeoutObject");
+
                             refreshPartitions();
+                        }
                     }
                     finally {
                         busyLock.readLock().unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a950de96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index b91a2de..50e2e41 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -95,7 +95,7 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws 
IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (parts != null)
+        if (partsBytes == null && parts != null)
             partsBytes = ctx.marshaller().marshal(parts);
     }
 
@@ -200,4 +200,4 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
         return S.toString(GridDhtPartitionsFullMessage.class, this, "partCnt", 
parts != null ? parts.size() : 0,
             "super", super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a950de96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 9b6dcf7..85d8d0d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -98,7 +98,7 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws 
IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (parts != null)
+        if (partsBytes == null && parts != null)
             partsBytes = ctx.marshaller().marshal(parts);
     }
 
@@ -188,4 +188,4 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
     @Override public String toString() {
         return S.toString(GridDhtPartitionsSingleMessage.class, this, 
super.toString());
     }
-}
\ No newline at end of file
+}

Reply via email to