Repository: ignite Updated Branches: refs/heads/master ac94426ce -> eee638d3d
ignite-6124 Added missed initialization of merged join exchanges in GridDhtPartitionsExchangeFuture.onBecomeCoordinator Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0c5dca9a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0c5dca9a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0c5dca9a Branch: refs/heads/master Commit: 0c5dca9a807c8f024e3477c1b18ddb3f237124b2 Parents: d22631e Author: sboikov <[email protected]> Authored: Wed Aug 23 16:44:04 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Aug 23 16:44:04 2017 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 71 +++++++++++++------- .../GridDhtPartitionsExchangeFuture.java | 7 +- .../dht/preloader/InitNewCoordinatorFuture.java | 22 +++++- 3 files changed, 71 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0c5dca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 984721b..bd34a5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -309,9 +309,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (!crdInitFut.isDone() && !msg.restoreState()) { GridDhtPartitionExchangeId exchId = msg.exchangeId(); - log.info("Waiting for coordinator initialization [node=" + node.id() + - ", nodeOrder=" + node.order() + - ", ver=" + (exchId != null ? exchId.topologyVersion() : null) + ']'); + if (log.isInfoEnabled()) { + log.info("Waiting for coordinator initialization [node=" + node.id() + + ", nodeOrder=" + node.order() + + ", ver=" + (exchId != null ? exchId.topologyVersion() : null) + ']'); + } crdInitFut.listen(new CI1<IgniteInternalFuture>() { @Override public void apply(IgniteInternalFuture fut) { @@ -1821,18 +1823,22 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture) task; if (fut.initialVersion().compareTo(resVer) > 0) { - log.info("Merge exchange future on finish stop [curFut=" + curFut.initialVersion() + - ", resVer=" + resVer + - ", nextFutVer=" + fut.initialVersion() + ']'); + if (log.isInfoEnabled()) { + log.info("Merge exchange future on finish stop [curFut=" + curFut.initialVersion() + + ", resVer=" + resVer + + ", nextFutVer=" + fut.initialVersion() + ']'); + } break; } - log.info("Merge exchange future on finish [curFut=" + curFut.initialVersion() + - ", mergedFut=" + fut.initialVersion() + - ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) + - ", evtNode=" + fut.firstEvent().eventNode().id()+ - ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']'); + if (log.isInfoEnabled()) { + log.info("Merge exchange future on finish [curFut=" + curFut.initialVersion() + + ", mergedFut=" + fut.initialVersion() + + ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) + + ", evtNode=" + fut.firstEvent().eventNode().id()+ + ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']'); + } DiscoveryEvent evt = fut.firstEvent(); @@ -1843,8 +1849,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (evt.type() == EVT_NODE_JOINED) { final GridDhtPartitionsSingleMessage pendingMsg = fut.mergeJoinExchangeOnDone(curFut); - if (pendingMsg != null) + if (pendingMsg != null) { + if (log.isInfoEnabled()) { + log.info("Merged join exchange future on finish, will reply to node [" + + "curFut=" + curFut.initialVersion() + + ", mergedFut=" + fut.initialVersion() + + ", evtNode=" + evt.eventNode().id() + ']'); + } + curFut.waitAndReplyToNode(evt.eventNode().id(), pendingMsg); + } } } } @@ -1876,8 +1890,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana AffinityTopologyVersion exchMergeTestWaitVer = this.exchMergeTestWaitVer; if (exchMergeTestWaitVer != null) { - log.info("Exchange merge test, waiting for version [exch=" + curFut.initialVersion() + - ", waitVer=" + exchMergeTestWaitVer + ']'); + if (log.isInfoEnabled()) { + log.info("Exchange merge test, waiting for version [exch=" + curFut.initialVersion() + + ", waitVer=" + exchMergeTestWaitVer + ']'); + } long end = U.currentTimeMillis() + 10_000; @@ -1889,7 +1905,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task; if (exchMergeTestWaitVer.equals(fut.initialVersion())) { - log.info("Exchange merge test, found awaited version: " + exchMergeTestWaitVer); + if (log.isInfoEnabled()) + log.info("Exchange merge test, found awaited version: " + exchMergeTestWaitVer); found = true; @@ -1923,7 +1940,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana DiscoveryEvent evt = fut.firstEvent(); if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT) { - log.info("Stop merge, custom event found: " + evt); + if (log.isInfoEnabled()) + log.info("Stop merge, custom event found: " + evt); break; } @@ -1931,21 +1949,25 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana ClusterNode node = evt.eventNode(); if (!curFut.context().supportsMergeExchanges(node)) { - log.info("Stop merge, node does not support merge: " + node); + if (log.isInfoEnabled()) + log.info("Stop merge, node does not support merge: " + node); break; } if (evt.type() == EVT_NODE_JOINED && cctx.cache().hasCachesReceivedFromJoin(node)) { - log.info("Stop merge, received caches from node: " + node); + if (log.isInfoEnabled()) + log.info("Stop merge, received caches from node: " + node); break; } - log.info("Merge exchange future [curFut=" + curFut.initialVersion() + - ", mergedFut=" + fut.initialVersion() + - ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) + - ", evtNode=" + fut.firstEvent().eventNode().id() + - ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']'); + if (log.isInfoEnabled()) { + log.info("Merge exchange future [curFut=" + curFut.initialVersion() + + ", mergedFut=" + fut.initialVersion() + + ", evt=" + IgniteUtils.gridEventName(fut.firstEvent().type()) + + ", evtNode=" + fut.firstEvent().eventNode().id() + + ", evtNodeClient=" + CU.clientNode(fut.firstEvent().eventNode())+ ']'); + } curFut.context().events().addEvent(fut.initialVersion(), fut.firstEvent(), @@ -1958,7 +1980,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } else { if (!task.skipForExchangeMerge()) { - log.info("Stop merge, custom task found: " + task); + if (log.isInfoEnabled()) + log.info("Stop merge, custom task found: " + task); break; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0c5dca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 0dd6e4a..6decb44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -3204,13 +3204,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } + Map<UUID, GridDhtPartitionsSingleMessage> mergedJoins = newCrdFut.mergedJoinExchangeMessages(); + if (log.isInfoEnabled()) { log.info("New coordinator sends full message [ver=" + initialVersion() + ", resVer=" + fullMsg.resultTopologyVersion() + - ", nodes=" + F.nodeIds(msgs.keySet()) + ']'); + ", nodes=" + F.nodeIds(msgs.keySet()) + + ", mergedJoins=" + (mergedJoins != null ? mergedJoins.keySet() : null) + ']'); } - sendAllPartitions(fullMsg, msgs.keySet(), null, joinedNodeAff); + sendAllPartitions(fullMsg, msgs.keySet(), mergedJoins, joinedNodeAff); } return; http://git-wip-us.apache.org/repos/asf/ignite/blob/0c5dca9a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java index d0e619b..b5acd4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java @@ -196,6 +196,13 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture { } /** + * @return Messages for merged join exchanges. + */ + Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchangeMessages() { + return joinExchMsgs; + } + + /** * @return Full message is some of nodes received it from previous coordinator. */ GridDhtPartitionsFullMessage fullMessage() { @@ -247,7 +254,8 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture { if (fullMsg != null) { AffinityTopologyVersion resVer = fullMsg.resultTopologyVersion(); - for (Iterator<Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage>> it = msgs.entrySet().iterator(); it.hasNext();) { + for (Iterator<Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage>> it = msgs.entrySet().iterator(); + it.hasNext();) { Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e = it.next(); GridDhtPartitionExchangeId msgVer = joinedNodes.get(e.getKey().id()); @@ -263,8 +271,16 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture { if (msgVer.topologyVersion().compareTo(resVer) > 0) it.remove(); - else - e.getValue().exchangeId(msgVer); + else { + GridDhtPartitionsSingleMessage msg = e.getValue(); + + msg.exchangeId(msgVer); + + if (joinExchMsgs == null) + joinExchMsgs = new HashMap<>(); + + joinExchMsgs.put(e.getKey().id(), msg); + } } } }
