IGNITE-5232 GridDhtPartitionDemander.requestPartitions invokes sendMessages consequently, which lead to significant increase of node start time on large clusters with ssl
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4c460b78 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4c460b78 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4c460b78 Branch: refs/heads/ignite-5075 Commit: 4c460b78f7b0febc37940c8d65f91cb449fa4d54 Parents: 8476a19 Author: Anton Vinogradov <[email protected]> Authored: Thu May 25 16:27:46 2017 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Wed May 31 13:50:29 2017 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 103 ++++++++++--------- 1 file changed, 52 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4c460b78/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 838ccc8..cdbae1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -328,41 +328,21 @@ public class GridDhtPartitionDemander { return new Runnable() { @Override public void run() { - try { - if (next != null) - fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> f) { - try { - if (f.get()) // Not cancelled. - next.run(); // Starts next cache rebalancing (according to the order). - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug(e.getMessage()); - } + if (next != null) + fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> f) { + try { + if (f.get()) // Not cancelled. + next.run(); // Starts next cache rebalancing (according to the order). } - }); - - requestPartitions(fut, assigns); - } - catch (IgniteCheckedException e) { - ClusterTopologyCheckedException cause = e.getCause(ClusterTopologyCheckedException.class); - - if (cause != null) - log.warning("Failed to send initial demand request to node. " + e.getMessage()); - else - log.error("Failed to send initial demand request to node.", e); - - fut.cancel(); - } - catch (Throwable th) { - log.error("Runtime error caught during initial demand request sending.", th); - - fut.cancel(); + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug(e.getMessage()); + } + } + }); - if (th instanceof Error) - throw th; - } + requestPartitions(fut, assigns); } }; } @@ -399,10 +379,7 @@ public class GridDhtPartitionDemander { * @param assigns Assignments. * @throws IgniteCheckedException If failed. */ - private void requestPartitions( - RebalanceFuture fut, - GridDhtPreloaderAssignments assigns - ) throws IgniteCheckedException { + private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssignments assigns) { assert fut != null; if (topologyChanged(fut)) { @@ -411,7 +388,7 @@ public class GridDhtPartitionDemander { return; } - synchronized (fut) { + synchronized (fut) { // Synchronized to prevent consistency issues in case of parallel cancellation. if (fut.isDone()) return; @@ -443,7 +420,7 @@ public class GridDhtPartitionDemander { int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize(); - List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt); + final List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt); for (int cnt = 0; cnt < lsnrCnt; cnt++) sParts.add(new HashSet<Integer>()); @@ -458,26 +435,50 @@ public class GridDhtPartitionDemander { for (cnt = 0; cnt < lsnrCnt; cnt++) { if (!sParts.get(cnt).isEmpty()) { // Create copy. - GridDhtPartitionDemandMessage initD = createDemandMessage(d, sParts.get(cnt)); + final GridDhtPartitionDemandMessage initD = createDemandMessage(d, sParts.get(cnt)); initD.topic(rebalanceTopics.get(cnt)); initD.updateSequence(fut.updateSeq); initD.timeout(cctx.config().getRebalanceTimeout()); - synchronized (fut) { - if (fut.isDone()) - return;// Future can be already cancelled at this moment and all failovers happened. + final int finalCnt = cnt; - // New requests will not be covered by failovers. - cctx.io().sendOrderedMessage(node, - rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout()); - } + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + try { + if (!fut.isDone()) { + cctx.io().sendOrderedMessage(node, + rebalanceTopics.get(finalCnt), initD, cctx.ioPolicy(), initD.timeout()); + + // Cleanup required in case partitions demanded in parallel with cancellation. + synchronized (fut) { + if (fut.isDone()) + fut.cleanupRemoteContexts(node.id()); + } + + if (log.isDebugEnabled()) + log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + + finalCnt + ", partitions count=" + sParts.get(finalCnt).size() + + " (" + partitionsList(sParts.get(finalCnt)) + ")]"); + } + } + catch (IgniteCheckedException e) { + ClusterTopologyCheckedException cause = e.getCause(ClusterTopologyCheckedException.class); + if (cause != null) + log.warning("Failed to send initial demand request to node. " + e.getMessage()); + else + log.error("Failed to send initial demand request to node.", e); - if (log.isDebugEnabled()) - log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + - cnt + ", partitions count=" + sParts.get(cnt).size() + - " (" + partitionsList(sParts.get(cnt)) + ")]"); + fut.cancel(); + } + catch (Throwable th) { + log.error("Runtime error caught during initial demand request sending.", th); + + fut.cancel(); + } + } + }, /*system pool*/true); } } }
