Repository: ignite Updated Branches: refs/heads/ignite-4565-ddl 70a11912b -> 4538526d8
Moved exchange worker to separate class. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/23f67a5a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/23f67a5a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/23f67a5a Branch: refs/heads/ignite-4565-ddl Commit: 23f67a5a1b8e6c7429d110df858c8bdd17a913d7 Parents: 9020d12 Author: devozerov <[email protected]> Authored: Fri Mar 17 15:44:57 2017 +0300 Committer: devozerov <[email protected]> Committed: Fri Mar 17 15:44:57 2017 +0300 ---------------------------------------------------------------------- .../cache/CachePartitionExchangeWorker.java | 355 +++++++++++++++++++ .../GridCachePartitionExchangeManager.java | 329 +---------------- 2 files changed, 368 insertions(+), 316 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/23f67a5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java new file mode 100644 index 0000000..98a9cc0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.GridWorker; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT; + +/** + * Exchange future thread. All exchanges happen only by one thread and next + * exchange will not start until previous one completes. + */ +public class CachePartitionExchangeWorker<K, V> extends GridWorker { + /** Cache context. */ + private final GridCacheSharedContext<K, V> cctx; + + /** Exchange manager. */ + private final GridCachePartitionExchangeManager<K, V> exchMgr; + + /** Future queue. */ + private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ = + new LinkedBlockingDeque<>(); + + /** Busy flag used as performance optimization to stop current preloading. */ + private volatile boolean busy; + + /** + * Constructor. + * + * @param exchMgr Exchange manager. + * @param log Logger. + */ + public CachePartitionExchangeWorker(GridCachePartitionExchangeManager<K, V> exchMgr, IgniteLogger log) { + super(exchMgr.context().igniteInstanceName(), "partition-exchanger", log); + + this.cctx = exchMgr.context(); + + this.exchMgr = exchMgr; + } + + /** + * Add first exchange future. + * + * @param fut Future. + */ + public void addFirstFuture(GridDhtPartitionsExchangeFuture fut) { + futQ.addFirst(fut); + } + + /** + * @param exchFut Exchange future. + */ + void addFuture(GridDhtPartitionsExchangeFuture exchFut) { + assert exchFut != null; + + if (!exchFut.dummy() || (exchangeQueueIsEmpty() && !busy)) + futQ.offer(exchFut); + + if (log.isDebugEnabled()) + log.debug("Added exchange future to exchange worker: " + exchFut); + } + + /** + * Dump debug info. + */ + public void dumpFuturesDebugInfo() { + U.warn(log, "Pending exchange futures:"); + + for (GridDhtPartitionsExchangeFuture fut : futQ) + U.warn(log, ">>> " + fut); + } + + /** + * @return {@code True} iif exchange queue is empty. + */ + public boolean exchangeQueueIsEmpty() { + return futQ.isEmpty(); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + long timeout = cctx.gridConfig().getNetworkTimeout(); + + int cnt = 0; + + while (!isCancelled()) { + GridDhtPartitionsExchangeFuture exchFut = null; + + cnt++; + + try { + boolean preloadFinished = true; + + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + preloadFinished &= cacheCtx.preloader() != null && cacheCtx.preloader().syncFuture().isDone(); + + if (!preloadFinished) + break; + } + + // If not first preloading and no more topology events present. + if (!cctx.kernalContext().clientNode() && exchangeQueueIsEmpty() && preloadFinished) + timeout = cctx.gridConfig().getNetworkTimeout(); + + // After workers line up and before preloading starts we initialize all futures. + if (log.isDebugEnabled()) { + Collection<IgniteInternalFuture> unfinished = new HashSet<>(); + + for (GridDhtPartitionsExchangeFuture fut : exchMgr.exchangeFutures()) { + if (!fut.isDone()) + unfinished.add(fut); + } + + log.debug("Before waiting for exchange futures [futs" + unfinished + ", worker=" + this + ']'); + } + + // Take next exchange future. + if (isCancelled()) + Thread.currentThread().interrupt(); + + exchFut = futQ.poll(timeout, MILLISECONDS); + + if (exchFut == null) + continue; // Main while loop. + + busy = true; + + Map<Integer, GridDhtPreloaderAssignments> assignsMap = null; + + boolean dummyReassign = exchFut.dummyReassign(); + boolean forcePreload = exchFut.forcePreload(); + + try { + if (isCancelled()) + break; + + if (!exchFut.dummy() && !exchFut.forcePreload()) { + exchMgr.lastTopologyFuture(exchFut); + + exchFut.init(); + + int dumpedObjects = 0; + + while (true) { + try { + exchFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS); + + break; + } + catch (IgniteFutureTimeoutCheckedException ignored) { + U.warn(log, "Failed to wait for partition map exchange [" + + "topVer=" + exchFut.topologyVersion() + + ", node=" + cctx.localNodeId() + "]. " + + "Dumping pending objects that might be the cause: "); + + if (dumpedObjects < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) { + try { + exchMgr.dumpDebugInfo(exchFut.topologyVersion()); + } + catch (Exception e) { + U.error(log, "Failed to dump debug information: " + e, e); + } + + if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false)) + U.dumpThreads(log); + + dumpedObjects++; + } + } + } + + + if (log.isDebugEnabled()) + log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" + + this + ']'); + + boolean changed = false; + + // Just pick first worker to do this, so we don't + // invoke topology callback more than once for the + // same event. + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.isLocal()) + continue; + + changed |= cacheCtx.topology().afterExchange(exchFut); + } + + if (!cctx.kernalContext().clientNode() && changed && exchangeQueueIsEmpty()) + exchMgr.refreshPartitions(); + } + else { + if (log.isDebugEnabled()) + log.debug("Got dummy exchange (will reassign)"); + + if (!dummyReassign) { + timeout = 0; // Force refresh. + + continue; + } + } + + if (!exchFut.skipPreload()) { + assignsMap = new HashMap<>(); + + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + long delay = cacheCtx.config().getRebalanceDelay(); + + GridDhtPreloaderAssignments assigns = null; + + // Don't delay for dummy reassigns to avoid infinite recursion. + if (delay == 0 || forcePreload) + assigns = cacheCtx.preloader().assign(exchFut); + + assignsMap.put(cacheCtx.cacheId(), assigns); + } + } + } + finally { + // Must flip busy flag before assignments are given to demand workers. + busy = false; + } + + if (assignsMap != null) { + int size = assignsMap.size(); + + NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>(); + + for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) { + int cacheId = e.getKey(); + + GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + + int order = cacheCtx.config().getRebalanceOrder(); + + if (orderMap.get(order) == null) + orderMap.put(order, new ArrayList<Integer>(size)); + + orderMap.get(order).add(cacheId); + } + + Runnable r = null; + + List<String> rebList = new LinkedList<>(); + + boolean assignsCancelled = false; + + for (Integer order : orderMap.descendingKeySet()) { + for (Integer cacheId : orderMap.get(order)) { + GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + + GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId); + + if (assigns != null) + assignsCancelled |= assigns.cancelled(); + + // Cancels previous rebalance future (in case it's not done yet). + // Sends previous rebalance stopped event (if necessary). + // Creates new rebalance future. + // Sends current rebalance started event (if necessary). + // Finishes cache sync future (on empty assignments). + Runnable cur = cacheCtx.preloader().addAssignments(assigns, + forcePreload, + cnt, + r, + exchFut.forcedRebalanceFuture()); + + if (cur != null) { + rebList.add(U.maskName(cacheCtx.name())); + + r = cur; + } + } + } + + if (assignsCancelled) { // Pending exchange. + U.log(log, "Skipping rebalancing (obsolete exchange ID) " + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); + } + else if (r != null) { + Collections.reverse(rebList); + + U.log(log, "Rebalancing scheduled [order=" + rebList + "]"); + + if (exchangeQueueIsEmpty()) { + U.log(log, "Rebalancing started " + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); + + r.run(); // Starts rebalancing routine. + } + else + U.log(log, "Skipping rebalancing (obsolete exchange ID) " + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); + } + else + U.log(log, "Skipping rebalancing (nothing scheduled) " + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); + } + } + catch (IgniteInterruptedCheckedException e) { + throw e; + } + catch (IgniteClientDisconnectedCheckedException ignored) { + return; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to wait for completion of partition map exchange " + + "(preloading will not start): " + exchFut, e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/23f67a5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 3e72efb..f9222bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -21,24 +21,15 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; import java.util.UUID; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -73,7 +64,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -90,7 +80,6 @@ import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; @@ -98,7 +87,6 @@ import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.getLong; @@ -126,12 +114,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** */ private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); - /** Last partition refresh. */ - private final AtomicLong lastRefresh = new AtomicLong(-1); - /** */ @GridToStringInclude - private ExchangeWorker exchWorker; + private CachePartitionExchangeWorker exchWorker; /** */ @GridToStringExclude @@ -297,7 +282,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana @Override protected void start0() throws IgniteCheckedException { super.start0(); - exchWorker = new ExchangeWorker(); + exchWorker = new CachePartitionExchangeWorker<>(this, log); cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_DISCOVERY_CUSTOM_EVT); @@ -369,7 +354,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (reconnect) reconnectExchangeFut = new GridFutureAdapter<>(); - exchWorker.futQ.addFirst(fut); + exchWorker.addFirstFuture(fut); if (!cctx.kernalContext().clientNode()) { for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) { @@ -597,6 +582,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * @param lastInitializedFut Last completed topology future. + */ + public void lastTopologyFuture(GridDhtPartitionsExchangeFuture lastInitializedFut) { + this.lastInitializedFut = lastInitializedFut; + } + + /** * @param ver Topology version. * @return Future or {@code null} is future is already completed. */ @@ -684,7 +676,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @return {@code True} if pending future queue is empty. */ public boolean hasPendingExchange() { - return !exchWorker.futQ.isEmpty(); + return !exchWorker.exchangeQueueIsEmpty(); } /** @@ -739,7 +731,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * Partition refresh callback. */ - private void refreshPartitions() { + public void refreshPartitions() { ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldest == null) { @@ -1345,10 +1337,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana U.warn(log, "Last exchange future: " + lastInitializedFut); - U.warn(log, "Pending exchange futures:"); - - for (GridDhtPartitionsExchangeFuture fut : exchWorker.futQ) - U.warn(log, ">>> " + fut); + exchWorker.dumpFuturesDebugInfo(); if (!readyFuts.isEmpty()) { U.warn(log, "Pending affinity ready futures:"); @@ -1547,28 +1536,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param deque Deque to poll from. - * @param time Time to wait. - * @param w Worker. - * @return Polled item. - * @throws InterruptedException If interrupted. - */ - @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException { - assert w != null; - - // There is currently a case where {@code interrupted} - // flag on a thread gets flipped during stop which causes the pool to hang. This check - // will always make sure that interrupted flag gets reset before going into wait conditions. - // The true fix should actually make sure that interrupted flag does not get reset or that - // interrupted exception gets propagated. Until we find a real fix, this method should - // always work to make sure that there is no hanging during stop. - if (w.isCancelled()) - Thread.currentThread().interrupt(); - - return deque.poll(time, MILLISECONDS); - } - - /** * @param node Target node. * @return {@code True} if can use compression for partition map messages. */ @@ -1587,276 +1554,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * Exchange future thread. All exchanges happen only by one thread and next - * exchange will not start until previous one completes. - */ - private class ExchangeWorker extends GridWorker { - /** Future queue. */ - private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ = - new LinkedBlockingDeque<>(); - - /** Busy flag used as performance optimization to stop current preloading. */ - private volatile boolean busy; - - /** - * - */ - private ExchangeWorker() { - super(cctx.igniteInstanceName(), "partition-exchanger", GridCachePartitionExchangeManager.this.log); - } - - /** - * @param exchFut Exchange future. - */ - void addFuture(GridDhtPartitionsExchangeFuture exchFut) { - assert exchFut != null; - - if (!exchFut.dummy() || (futQ.isEmpty() && !busy)) - futQ.offer(exchFut); - - if (log.isDebugEnabled()) - log.debug("Added exchange future to exchange worker: " + exchFut); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - long timeout = cctx.gridConfig().getNetworkTimeout(); - - int cnt = 0; - - while (!isCancelled()) { - GridDhtPartitionsExchangeFuture exchFut = null; - - cnt++; - - try { - boolean preloadFinished = true; - - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - preloadFinished &= cacheCtx.preloader() != null && cacheCtx.preloader().syncFuture().isDone(); - - if (!preloadFinished) - break; - } - - // If not first preloading and no more topology events present. - if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished) - timeout = cctx.gridConfig().getNetworkTimeout(); - - // After workers line up and before preloading starts we initialize all futures. - if (log.isDebugEnabled()) { - Collection<IgniteInternalFuture> unfinished = new HashSet<>(); - - for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) { - if (!fut.isDone()) - unfinished.add(fut); - } - - log.debug("Before waiting for exchange futures [futs" + unfinished + ", worker=" + this + ']'); - } - - // Take next exchange future. - exchFut = poll(futQ, timeout, this); - - if (exchFut == null) - continue; // Main while loop. - - busy = true; - - Map<Integer, GridDhtPreloaderAssignments> assignsMap = null; - - boolean dummyReassign = exchFut.dummyReassign(); - boolean forcePreload = exchFut.forcePreload(); - - try { - if (isCancelled()) - break; - - if (!exchFut.dummy() && !exchFut.forcePreload()) { - lastInitializedFut = exchFut; - - exchFut.init(); - - int dumpedObjects = 0; - - while (true) { - try { - exchFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS); - - break; - } - catch (IgniteFutureTimeoutCheckedException ignored) { - U.warn(log, "Failed to wait for partition map exchange [" + - "topVer=" + exchFut.topologyVersion() + - ", node=" + cctx.localNodeId() + "]. " + - "Dumping pending objects that might be the cause: "); - - if (dumpedObjects < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) { - try { - dumpDebugInfo(exchFut.topologyVersion()); - } - catch (Exception e) { - U.error(log, "Failed to dump debug information: " + e, e); - } - - if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false)) - U.dumpThreads(log); - - dumpedObjects++; - } - } - } - - - if (log.isDebugEnabled()) - log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" + - this + ']'); - - if (exchFut.exchangeId().nodeId().equals(cctx.localNodeId())) - lastRefresh.compareAndSet(-1, U.currentTimeMillis()); - - boolean changed = false; - - // Just pick first worker to do this, so we don't - // invoke topology callback more than once for the - // same event. - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) - continue; - - changed |= cacheCtx.topology().afterExchange(exchFut); - } - - if (!cctx.kernalContext().clientNode() && changed && futQ.isEmpty()) - refreshPartitions(); - } - else { - if (log.isDebugEnabled()) - log.debug("Got dummy exchange (will reassign)"); - - if (!dummyReassign) { - timeout = 0; // Force refresh. - - continue; - } - } - - if (!exchFut.skipPreload()) { - assignsMap = new HashMap<>(); - - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - long delay = cacheCtx.config().getRebalanceDelay(); - - GridDhtPreloaderAssignments assigns = null; - - // Don't delay for dummy reassigns to avoid infinite recursion. - if (delay == 0 || forcePreload) - assigns = cacheCtx.preloader().assign(exchFut); - - assignsMap.put(cacheCtx.cacheId(), assigns); - } - } - } - finally { - // Must flip busy flag before assignments are given to demand workers. - busy = false; - } - - if (assignsMap != null) { - int size = assignsMap.size(); - - NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>(); - - for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) { - int cacheId = e.getKey(); - - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); - - int order = cacheCtx.config().getRebalanceOrder(); - - if (orderMap.get(order) == null) - orderMap.put(order, new ArrayList<Integer>(size)); - - orderMap.get(order).add(cacheId); - } - - Runnable r = null; - - List<String> rebList = new LinkedList<>(); - - boolean assignsCancelled = false; - - for (Integer order : orderMap.descendingKeySet()) { - for (Integer cacheId : orderMap.get(order)) { - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); - - GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId); - - if (assigns != null) - assignsCancelled |= assigns.cancelled(); - - // Cancels previous rebalance future (in case it's not done yet). - // Sends previous rebalance stopped event (if necessary). - // Creates new rebalance future. - // Sends current rebalance started event (if necessary). - // Finishes cache sync future (on empty assignments). - Runnable cur = cacheCtx.preloader().addAssignments(assigns, - forcePreload, - cnt, - r, - exchFut.forcedRebalanceFuture()); - - if (cur != null) { - rebList.add(U.maskName(cacheCtx.name())); - - r = cur; - } - } - } - - if (assignsCancelled) { // Pending exchange. - U.log(log, "Skipping rebalancing (obsolete exchange ID) " + - "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + - ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); - } - else if (r != null) { - Collections.reverse(rebList); - - U.log(log, "Rebalancing scheduled [order=" + rebList + "]"); - - if (futQ.isEmpty()) { - U.log(log, "Rebalancing started " + - "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + - ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); - - r.run(); // Starts rebalancing routine. - } - else - U.log(log, "Skipping rebalancing (obsolete exchange ID) " + - "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + - ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); - } - else - U.log(log, "Skipping rebalancing (nothing scheduled) " + - "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + - ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); - } - } - catch (IgniteInterruptedCheckedException e) { - throw e; - } - catch (IgniteClientDisconnectedCheckedException ignored) { - return; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to wait for completion of partition map exchange " + - "(preloading will not start): " + exchFut, e); - } - } - } - } - - /** * Partition resend timeout object. */ private class ResendTimeoutObject implements GridTimeoutObject {
