IGNITE-5232 GridDhtPartitionDemander.requestPartitions invokes sendMessages consequently, which lead to significant increase of node start time on large clusters with ssl
(cherry picked from commit 4c460b78f7b0febc37940c8d65f91cb449fa4d54) # Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2e5c9969 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2e5c9969 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2e5c9969 Branch: refs/heads/ignite-5232-1.7.2 Commit: 2e5c9969ca69fccc62cb57b9e1cf0ecb584b0b83 Parents: b1736c0 Author: Anton Vinogradov <[email protected]> Authored: Wed May 31 15:27:33 2017 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Wed May 31 15:27:33 2017 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 131 ++++++++++--------- 1 file changed, 71 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2e5c9969/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index a6808c7..daae1e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -331,41 +331,21 @@ public class GridDhtPartitionDemander { return new Runnable() { @Override public void run() { - try { - if (next != null) - fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> f) { - try { - if (f.get()) // Not cancelled. - next.run(); // Starts next cache rebalancing (according to the order). - } - catch (IgniteCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug(ignored.getMessage()); - } + if (next != null) + fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> f) { + try { + if (f.get()) // Not cancelled. + next.run(); // Starts next cache rebalancing (according to the order). } - }); - - requestPartitions(fut, assigns); - } - catch (IgniteCheckedException e) { - ClusterTopologyCheckedException cause = e.getCause(ClusterTopologyCheckedException.class); - - if (cause != null) - log.warning("Failed to send initial demand request to node. " + e.getMessage()); - else - log.error("Failed to send initial demand request to node.", e); - - fut.cancel(); - } - catch (Throwable th) { - log.error("Runtime error caught during initial demand request sending.", th); - - fut.cancel(); + catch (IgniteCheckedException ignored) { + if (log.isDebugEnabled()) + log.debug(ignored.getMessage()); + } + } + }); - if (th instanceof Error) - throw th; - } + requestPartitions(fut, assigns); } }; } @@ -404,9 +384,8 @@ public class GridDhtPartitionDemander { * @return Partitions were requested. */ private void requestPartitions( - RebalanceFuture fut, - GridDhtPreloaderAssignments assigns - ) throws IgniteCheckedException { + final RebalanceFuture fut, + GridDhtPreloaderAssignments assigns){ if (topologyChanged(fut)) { fut.cancel(); @@ -438,7 +417,7 @@ public class GridDhtPartitionDemander { int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize(); - List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt); + final List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt); for (int cnt = 0; cnt < lsnrCnt; cnt++) sParts.add(new HashSet<Integer>()); @@ -453,42 +432,74 @@ public class GridDhtPartitionDemander { for (cnt = 0; cnt < lsnrCnt; cnt++) { if (!sParts.get(cnt).isEmpty()) { // Create copy. - GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt)); + final GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt)); initD.topic(rebalanceTopics.get(cnt)); initD.updateSequence(fut.updateSeq); initD.timeout(cctx.config().getRebalanceTimeout()); - synchronized (fut) { - if (!fut.isDone()) { - // Future can be already cancelled at this moment and all failovers happened. - // New requests will not be covered by failovers. - cctx.io().sendOrderedMessage(node, - rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout()); - } - } + final int finalCnt = cnt; - if (log.isDebugEnabled()) - log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + - cnt + ", partitions count=" + sParts.get(cnt).size() + - " (" + partitionsList(sParts.get(cnt)) + ")]"); + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + try { + if (!fut.isDone()) { + cctx.io().sendOrderedMessage(node, + rebalanceTopics.get(finalCnt), initD, cctx.ioPolicy(), initD.timeout()); + + // Cleanup required in case partitions demanded in parallel with cancellation. + synchronized (fut) { + if (fut.isDone()) + fut.cleanupRemoteContexts(node.id()); + } + + if (log.isDebugEnabled()) + log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + + finalCnt + ", partitions count=" + sParts.get(finalCnt).size() + + " (" + partitionsList(sParts.get(finalCnt)) + ")]"); + } + } + catch (IgniteCheckedException e) { + ClusterTopologyCheckedException cause = e.getCause(ClusterTopologyCheckedException.class); + + if (cause != null) + log.warning("Failed to send initial demand request to node. " + e.getMessage()); + else + log.error("Failed to send initial demand request to node.", e); + + fut.cancel(); + } + catch (Throwable th) { + log.error("Runtime error caught during initial demand request sending.", th); + + fut.cancel(); + } + } + }, /*system pool*/true); } } } else { - U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() + - ", mode=" + cfg.getRebalanceMode() + - ", fromNode=" + node.id() + - ", partitionsCount=" + parts.size() + - ", topology=" + fut.topologyVersion() + - ", updateSeq=" + fut.updateSeq + "]"); + try { + U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() + + ", mode=" + cfg.getRebalanceMode() + + ", fromNode=" + node.id() + + ", partitionsCount=" + parts.size() + + ", topology=" + fut.topologyVersion() + + ", updateSeq=" + fut.updateSeq + "]"); - d.timeout(cctx.config().getRebalanceTimeout()); - d.workerId(0);//old api support. + d.timeout(cctx.config().getRebalanceTimeout()); + d.workerId(0);//old api support. + + worker = new DemandWorker(dmIdx.incrementAndGet(), fut); - worker = new DemandWorker(dmIdx.incrementAndGet(), fut); + worker.run(node, d); + } + catch (Throwable th) { + log.error("Runtime error caught during initial demand request sending.", th); - worker.run(node, d); + fut.cancel(); + } } } }
