1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/be4addef Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/be4addef Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/be4addef Branch: refs/heads/ignite-1093-2 Commit: be4addef01a28f54b887dc59c6b194e46edfdb7f Parents: 8ba913b Author: Anton Vinogradov <[email protected]> Authored: Tue Oct 20 14:20:45 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Tue Oct 20 14:20:45 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 10 +++--- .../dht/preloader/GridDhtPartitionDemander.java | 32 +++++++++++--------- 2 files changed, 24 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/be4addef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 5251c61..6793f9f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1338,7 +1338,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana marsR.call();//Marshaller cache rebalancing launches in sync way. } catch (Exception ex) { - U.error(log, "Failed to send partition demand message to node", ex); + if (log.isDebugEnabled()) + log.debug("Failed to send initial demand request to node"); continue; } @@ -1356,12 +1357,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (r == null) return false; - if (!r.call()) - return false; + if (!r.call()) + return false; } } catch (Exception ex) { - U.error(log, "Failed to send partition demand message to node"); + if (log.isDebugEnabled()) + log.debug("Failed to send initial demand request to node"); return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/be4addef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index f5f4c56..fbb187b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -337,20 +337,23 @@ public class GridDhtPartitionDemander { GridDhtPartitionDemandMessage d = e.getValue(); - d.timeout(cctx.config().getRebalanceTimeout()); - d.workerId(0);//old api support. + fut.appendPartitions(node.id(), d.partitions());//Future preparation. + } + + for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) { + final ClusterNode node = e.getKey(); final CacheConfiguration cfg = cctx.config(); + final Collection<Integer> parts = fut.remaining.get(node.id()).get2(); + + GridDhtPartitionDemandMessage d = e.getValue(); + //Check remote node rebalancing API version. if (new Integer(1).equals(node.attribute(IgniteNodeAttributes.REBALANCING_VERSION))) { U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() + - ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() + - ", topology=" + d.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]"); - - Collection<Integer> parts = new HashSet<>(d.partitions()); - - fut.appendPartitions(node.id(), parts); + ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() + + ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]"); int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize(); @@ -367,7 +370,6 @@ public class GridDhtPartitionDemander { sParts.get(cnt++ % lsnrCnt).add(it.next()); for (cnt = 0; cnt < lsnrCnt; cnt++) { - if (!sParts.get(cnt).isEmpty()) { // Create copy. @@ -375,9 +377,10 @@ public class GridDhtPartitionDemander { initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt)); initD.updateSequence(fut.updateSeq); + initD.timeout(cctx.config().getRebalanceTimeout()); cctx.io().sendOrderedMessage(node, - GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), d.timeout()); + GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD, cctx.ioPolicy(), initD.timeout()); if (log.isDebugEnabled()) log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + @@ -388,12 +391,13 @@ public class GridDhtPartitionDemander { } else { U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() + - ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() + - ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + "]"); + ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() + + ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]"); - DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut); + d.timeout(cctx.config().getRebalanceTimeout()); + d.workerId(0);//old api support. - fut.appendPartitions(node.id(), d.partitions()); + DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut); dw.run(node, d); }
