Repository: ignite Updated Branches: refs/heads/ignite-slow-rebal 1597e6ca1 -> a950de966
slow rebalancing Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a950de96 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a950de96 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a950de96 Branch: refs/heads/ignite-slow-rebal Commit: a950de96624dcef64b6e8935c4f82ef7629035b3 Parents: 1597e6c Author: Denis Magda <[email protected]> Authored: Wed Sep 30 12:30:47 2015 +0300 Committer: Denis Magda <[email protected]> Committed: Wed Sep 30 12:30:47 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 48 ++++++++++++++++++-- .../preloader/GridDhtPartitionsFullMessage.java | 4 +- .../GridDhtPartitionsSingleMessage.java | 4 +- 3 files changed, 47 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a950de96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index eb76233..630d57a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -626,6 +627,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * Schedules next full partitions update. */ public void scheduleResendPartitions() { + log.info("scheduleResendPartitoins"); + ResendTimeoutObject timeout = pendingResend.get(); if (timeout == null || timeout.started()) { @@ -668,6 +671,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana log.debug("Refreshing local partitions from non-oldest node: " + cctx.localNodeId()); + log.info("Refreshing local partitions from non-oldest node: [locNode= " + + cctx.localNodeId() + ']'); + sendLocalPartitions(oldest, null); } } @@ -701,9 +707,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) { GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE); + List<String> caches = new ArrayList<>(); + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal() && cacheCtx.started()) + if (!cacheCtx.isLocal() && cacheCtx.started()) { m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true)); + caches.add(cacheCtx.name()); + } } // It is important that client topologies be added after contexts. @@ -713,9 +723,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (log.isDebugEnabled()) log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']'); + log.info("Before sending all partitions: [rmtNodes=" + nodes + ']'); + for (ClusterNode node : nodes) { try { + long time = System.currentTimeMillis(); + + log.info("Start sending all partitions [caches=" + caches + ", time=" + new Date(time) + ", node=" + node + ']'); + cctx.io().sendNoRetry(node, m, SYSTEM_POOL); + + long passed = System.currentTimeMillis(); + + log.info("Stop sending all partitions [caches=" + caches + ",time=" + new Date(passed) + ", diff=" + (passed - time) + + ", node=" + node + ']'); } catch (ClusterTopologyCheckedException ignore) { if (log.isDebugEnabled()) @@ -904,6 +925,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (log.isDebugEnabled()) log.debug("Received full partition update [node=" + node.id() + ", msg=" + msg + ']'); + log.info("Received full partition update [node=" + node.id() + ", msg=" + msg + ", time=" + new Date() + ']'); + boolean updated = false; for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { @@ -925,8 +948,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana updated |= top.update(null, entry.getValue()) != null; } - if (!cctx.kernalContext().clientNode() && updated) + if (!cctx.kernalContext().clientNode() && updated) { + log.info("refreshPartitions: processFullPartitionUpdate"); refreshPartitions(); + } } else exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg); @@ -950,6 +975,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana log.debug("Received local partition update [nodeId=" + node.id() + ", parts=" + msg + ']'); + log.info("Received local partition update [nodeId=" + node.id() + ", parts=" + + msg + ']'); + boolean updated = false; for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { @@ -968,8 +996,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana updated |= top.update(null, entry.getValue()) != null; } - if (updated) + if (updated) { + log.info("Partitions updated, schedule: [sender=" + node + ']'); scheduleResendPartitions(); + } } else { if (msg.client()) { @@ -1149,6 +1179,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana // If not first preloading and no more topology events present, // then we periodically refresh partition map. if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished) { + log.info("refreshPartitions: preloadFinished"); + refreshPartitions(timeout); timeout = cctx.gridConfig().getNetworkTimeout(); @@ -1214,8 +1246,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana startEvtFired = true; - if (!cctx.kernalContext().clientNode() && changed && futQ.isEmpty()) + if (!cctx.kernalContext().clientNode() && changed && futQ.isEmpty()) { + log.info("refreshPartitions: ExcahngeWorker body"); + refreshPartitions(); + } } else { if (log.isDebugEnabled()) @@ -1311,8 +1346,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana return; try { - if (started.compareAndSet(false, true)) + if (started.compareAndSet(false, true)) { + log.info("refreshPartitions: ResendTimeoutObject"); + refreshPartitions(); + } } finally { busyLock.readLock().unlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a950de96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index b91a2de..50e2e41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -95,7 +95,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (parts != null) + if (partsBytes == null && parts != null) partsBytes = ctx.marshaller().marshal(parts); } @@ -200,4 +200,4 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa return S.toString(GridDhtPartitionsFullMessage.class, this, "partCnt", parts != null ? parts.size() : 0, "super", super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a950de96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 9b6dcf7..85d8d0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -98,7 +98,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (parts != null) + if (partsBytes == null && parts != null) partsBytes = ctx.marshaller().marshal(parts); } @@ -188,4 +188,4 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes @Override public String toString() { return S.toString(GridDhtPartitionsSingleMessage.class, this, super.toString()); } -} \ No newline at end of file +}
