WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/92524a44 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/92524a44 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/92524a44 Branch: refs/heads/ignite-4565-ddl Commit: 92524a44eb8ccfb43901557f23368fc325e46c59 Parents: 23f67a5 Author: devozerov <[email protected]> Authored: Fri Mar 17 16:04:37 2017 +0300 Committer: devozerov <[email protected]> Committed: Fri Mar 17 16:04:37 2017 +0300 ---------------------------------------------------------------------- .../cache/CachePartitionExchangeWorker.java | 355 ------------------- .../cache/CachePartitionExchangeWorkerTask.java | 29 ++ .../GridCachePartitionExchangeManager.java | 335 ++++++++++++++++- .../GridDhtPartitionsExchangeFuture.java | 8 +- 4 files changed, 357 insertions(+), 370 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/92524a44/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java deleted file mode 100644 index 98a9cc0..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java +++ /dev/null @@ -1,355 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; -import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT; - -/** - * Exchange future thread. All exchanges happen only by one thread and next - * exchange will not start until previous one completes. - */ -public class CachePartitionExchangeWorker<K, V> extends GridWorker { - /** Cache context. */ - private final GridCacheSharedContext<K, V> cctx; - - /** Exchange manager. */ - private final GridCachePartitionExchangeManager<K, V> exchMgr; - - /** Future queue. */ - private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ = - new LinkedBlockingDeque<>(); - - /** Busy flag used as performance optimization to stop current preloading. */ - private volatile boolean busy; - - /** - * Constructor. - * - * @param exchMgr Exchange manager. - * @param log Logger. - */ - public CachePartitionExchangeWorker(GridCachePartitionExchangeManager<K, V> exchMgr, IgniteLogger log) { - super(exchMgr.context().igniteInstanceName(), "partition-exchanger", log); - - this.cctx = exchMgr.context(); - - this.exchMgr = exchMgr; - } - - /** - * Add first exchange future. - * - * @param fut Future. - */ - public void addFirstFuture(GridDhtPartitionsExchangeFuture fut) { - futQ.addFirst(fut); - } - - /** - * @param exchFut Exchange future. - */ - void addFuture(GridDhtPartitionsExchangeFuture exchFut) { - assert exchFut != null; - - if (!exchFut.dummy() || (exchangeQueueIsEmpty() && !busy)) - futQ.offer(exchFut); - - if (log.isDebugEnabled()) - log.debug("Added exchange future to exchange worker: " + exchFut); - } - - /** - * Dump debug info. - */ - public void dumpFuturesDebugInfo() { - U.warn(log, "Pending exchange futures:"); - - for (GridDhtPartitionsExchangeFuture fut : futQ) - U.warn(log, ">>> " + fut); - } - - /** - * @return {@code True} iif exchange queue is empty. - */ - public boolean exchangeQueueIsEmpty() { - return futQ.isEmpty(); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - long timeout = cctx.gridConfig().getNetworkTimeout(); - - int cnt = 0; - - while (!isCancelled()) { - GridDhtPartitionsExchangeFuture exchFut = null; - - cnt++; - - try { - boolean preloadFinished = true; - - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - preloadFinished &= cacheCtx.preloader() != null && cacheCtx.preloader().syncFuture().isDone(); - - if (!preloadFinished) - break; - } - - // If not first preloading and no more topology events present. - if (!cctx.kernalContext().clientNode() && exchangeQueueIsEmpty() && preloadFinished) - timeout = cctx.gridConfig().getNetworkTimeout(); - - // After workers line up and before preloading starts we initialize all futures. - if (log.isDebugEnabled()) { - Collection<IgniteInternalFuture> unfinished = new HashSet<>(); - - for (GridDhtPartitionsExchangeFuture fut : exchMgr.exchangeFutures()) { - if (!fut.isDone()) - unfinished.add(fut); - } - - log.debug("Before waiting for exchange futures [futs" + unfinished + ", worker=" + this + ']'); - } - - // Take next exchange future. - if (isCancelled()) - Thread.currentThread().interrupt(); - - exchFut = futQ.poll(timeout, MILLISECONDS); - - if (exchFut == null) - continue; // Main while loop. - - busy = true; - - Map<Integer, GridDhtPreloaderAssignments> assignsMap = null; - - boolean dummyReassign = exchFut.dummyReassign(); - boolean forcePreload = exchFut.forcePreload(); - - try { - if (isCancelled()) - break; - - if (!exchFut.dummy() && !exchFut.forcePreload()) { - exchMgr.lastTopologyFuture(exchFut); - - exchFut.init(); - - int dumpedObjects = 0; - - while (true) { - try { - exchFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS); - - break; - } - catch (IgniteFutureTimeoutCheckedException ignored) { - U.warn(log, "Failed to wait for partition map exchange [" + - "topVer=" + exchFut.topologyVersion() + - ", node=" + cctx.localNodeId() + "]. " + - "Dumping pending objects that might be the cause: "); - - if (dumpedObjects < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) { - try { - exchMgr.dumpDebugInfo(exchFut.topologyVersion()); - } - catch (Exception e) { - U.error(log, "Failed to dump debug information: " + e, e); - } - - if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false)) - U.dumpThreads(log); - - dumpedObjects++; - } - } - } - - - if (log.isDebugEnabled()) - log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" + - this + ']'); - - boolean changed = false; - - // Just pick first worker to do this, so we don't - // invoke topology callback more than once for the - // same event. - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) - continue; - - changed |= cacheCtx.topology().afterExchange(exchFut); - } - - if (!cctx.kernalContext().clientNode() && changed && exchangeQueueIsEmpty()) - exchMgr.refreshPartitions(); - } - else { - if (log.isDebugEnabled()) - log.debug("Got dummy exchange (will reassign)"); - - if (!dummyReassign) { - timeout = 0; // Force refresh. - - continue; - } - } - - if (!exchFut.skipPreload()) { - assignsMap = new HashMap<>(); - - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - long delay = cacheCtx.config().getRebalanceDelay(); - - GridDhtPreloaderAssignments assigns = null; - - // Don't delay for dummy reassigns to avoid infinite recursion. - if (delay == 0 || forcePreload) - assigns = cacheCtx.preloader().assign(exchFut); - - assignsMap.put(cacheCtx.cacheId(), assigns); - } - } - } - finally { - // Must flip busy flag before assignments are given to demand workers. - busy = false; - } - - if (assignsMap != null) { - int size = assignsMap.size(); - - NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>(); - - for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) { - int cacheId = e.getKey(); - - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); - - int order = cacheCtx.config().getRebalanceOrder(); - - if (orderMap.get(order) == null) - orderMap.put(order, new ArrayList<Integer>(size)); - - orderMap.get(order).add(cacheId); - } - - Runnable r = null; - - List<String> rebList = new LinkedList<>(); - - boolean assignsCancelled = false; - - for (Integer order : orderMap.descendingKeySet()) { - for (Integer cacheId : orderMap.get(order)) { - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); - - GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId); - - if (assigns != null) - assignsCancelled |= assigns.cancelled(); - - // Cancels previous rebalance future (in case it's not done yet). - // Sends previous rebalance stopped event (if necessary). - // Creates new rebalance future. - // Sends current rebalance started event (if necessary). - // Finishes cache sync future (on empty assignments). - Runnable cur = cacheCtx.preloader().addAssignments(assigns, - forcePreload, - cnt, - r, - exchFut.forcedRebalanceFuture()); - - if (cur != null) { - rebList.add(U.maskName(cacheCtx.name())); - - r = cur; - } - } - } - - if (assignsCancelled) { // Pending exchange. - U.log(log, "Skipping rebalancing (obsolete exchange ID) " + - "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + - ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); - } - else if (r != null) { - Collections.reverse(rebList); - - U.log(log, "Rebalancing scheduled [order=" + rebList + "]"); - - if (exchangeQueueIsEmpty()) { - U.log(log, "Rebalancing started " + - "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + - ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); - - r.run(); // Starts rebalancing routine. - } - else - U.log(log, "Skipping rebalancing (obsolete exchange ID) " + - "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + - ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); - } - else - U.log(log, "Skipping rebalancing (nothing scheduled) " + - "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + - ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); - } - } - catch (IgniteInterruptedCheckedException e) { - throw e; - } - catch (IgniteClientDisconnectedCheckedException ignored) { - return; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to wait for completion of partition map exchange " + - "(preloading will not start): " + exchFut, e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/92524a44/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java new file mode 100644 index 0000000..80ef9f5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +/** + * Cache partition exchange worker task marker interface. + */ +public interface CachePartitionExchangeWorkerTask { + /** + * @return {@code True) if task denotes standard exchange task, {@code false} if this is a custom task which + * must be executed from within exchange thread. + */ + boolean isExchange(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/92524a44/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index f9222bc..444b530 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -21,15 +21,24 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -64,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -80,6 +90,7 @@ import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; @@ -87,6 +98,7 @@ import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.getLong; @@ -114,9 +126,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** */ private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); + /** Last partition refresh. */ + private final AtomicLong lastRefresh = new AtomicLong(-1); + /** */ @GridToStringInclude - private CachePartitionExchangeWorker exchWorker; + private ExchangeWorker exchWorker; /** */ @GridToStringExclude @@ -282,7 +297,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana @Override protected void start0() throws IgniteCheckedException { super.start0(); - exchWorker = new CachePartitionExchangeWorker<>(this, log); + exchWorker = new ExchangeWorker(); cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_DISCOVERY_CUSTOM_EVT); @@ -582,13 +597,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param lastInitializedFut Last completed topology future. - */ - public void lastTopologyFuture(GridDhtPartitionsExchangeFuture lastInitializedFut) { - this.lastInitializedFut = lastInitializedFut; - } - - /** * @param ver Topology version. * @return Future or {@code null} is future is already completed. */ @@ -676,7 +684,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @return {@code True} if pending future queue is empty. */ public boolean hasPendingExchange() { - return !exchWorker.exchangeQueueIsEmpty(); + return exchWorker.hasPendingExchange(); } /** @@ -731,7 +739,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * Partition refresh callback. */ - public void refreshPartitions() { + private void refreshPartitions() { ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldest == null) { @@ -811,8 +819,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana @Nullable GridCacheVersion lastVer, boolean compress) { GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId, - lastVer, - exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE); + lastVer, + exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE); boolean useOldApi = false; @@ -1337,7 +1345,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana U.warn(log, "Last exchange future: " + lastInitializedFut); - exchWorker.dumpFuturesDebugInfo(); + exchWorker.dumpExchangeDebugInfo(); if (!readyFuts.isEmpty()) { U.warn(log, "Pending affinity ready futures:"); @@ -1554,6 +1562,305 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * Exchange future thread. All exchanges happen only by one thread and next + * exchange will not start until previous one completes. + */ + private class ExchangeWorker extends GridWorker { + /** Future queue. */ + private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ = + new LinkedBlockingDeque<>(); + + /** Busy flag used as performance optimization to stop current preloading. */ + private volatile boolean busy; + + /** + * Constructor. + */ + private ExchangeWorker() { + super(cctx.igniteInstanceName(), "partition-exchanger", GridCachePartitionExchangeManager.this.log); + } + + /** + * Add first exchange future. + * + * @param exchFut Exchange future. + */ + void addFirstFuture(GridDhtPartitionsExchangeFuture exchFut) { + futQ.addFirst(exchFut); + } + + /** + * @param exchFut Exchange future. + */ + void addFuture(GridDhtPartitionsExchangeFuture exchFut) { + assert exchFut != null; + + if (!exchFut.dummy() || (!hasPendingExchange() && !busy)) + futQ.offer(exchFut); + + if (log.isDebugEnabled()) + log.debug("Added exchange future to exchange worker: " + exchFut); + } + + /** + * @return Whether pending exchange future exists. + */ + boolean hasPendingExchange() { + return !futQ.isEmpty(); + } + + /** + * Dump debug info. + */ + void dumpExchangeDebugInfo() { + U.warn(log, "Pending exchange futures:"); + + for (GridDhtPartitionsExchangeFuture fut : futQ) + U.warn(log, ">>> " + fut); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + long timeout = cctx.gridConfig().getNetworkTimeout(); + + int cnt = 0; + + while (!isCancelled()) { + GridDhtPartitionsExchangeFuture exchFut = null; + + cnt++; + + try { + boolean preloadFinished = true; + + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + preloadFinished &= cacheCtx.preloader() != null && cacheCtx.preloader().syncFuture().isDone(); + + if (!preloadFinished) + break; + } + + // If not first preloading and no more topology events present. + if (!cctx.kernalContext().clientNode() && !hasPendingExchange() && preloadFinished) + timeout = cctx.gridConfig().getNetworkTimeout(); + + // After workers line up and before preloading starts we initialize all futures. + if (log.isDebugEnabled()) { + Collection<IgniteInternalFuture> unfinished = new HashSet<>(); + + for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) { + if (!fut.isDone()) + unfinished.add(fut); + } + + log.debug("Before waiting for exchange futures [futs" + unfinished + ", worker=" + this + ']'); + } + + // Take next exchange future. + if (isCancelled()) + Thread.currentThread().interrupt(); + + exchFut = futQ.poll(timeout, MILLISECONDS); + + if (exchFut == null) + continue; // Main while loop. + + busy = true; + + Map<Integer, GridDhtPreloaderAssignments> assignsMap = null; + + boolean dummyReassign = exchFut.dummyReassign(); + boolean forcePreload = exchFut.forcePreload(); + + try { + if (isCancelled()) + break; + + if (!exchFut.dummy() && !exchFut.forcePreload()) { + lastInitializedFut = exchFut; + + exchFut.init(); + + int dumpedObjects = 0; + + while (true) { + try { + exchFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS); + + break; + } + catch (IgniteFutureTimeoutCheckedException ignored) { + U.warn(log, "Failed to wait for partition map exchange [" + + "topVer=" + exchFut.topologyVersion() + + ", node=" + cctx.localNodeId() + "]. " + + "Dumping pending objects that might be the cause: "); + + if (dumpedObjects < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) { + try { + dumpDebugInfo(exchFut.topologyVersion()); + } + catch (Exception e) { + U.error(log, "Failed to dump debug information: " + e, e); + } + + if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false)) + U.dumpThreads(log); + + dumpedObjects++; + } + } + } + + + if (log.isDebugEnabled()) + log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" + + this + ']'); + + if (exchFut.exchangeId().nodeId().equals(cctx.localNodeId())) + lastRefresh.compareAndSet(-1, U.currentTimeMillis()); + + boolean changed = false; + + // Just pick first worker to do this, so we don't + // invoke topology callback more than once for the + // same event. + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.isLocal()) + continue; + + changed |= cacheCtx.topology().afterExchange(exchFut); + } + + if (!cctx.kernalContext().clientNode() && changed && !hasPendingExchange()) + refreshPartitions(); + } + else { + if (log.isDebugEnabled()) + log.debug("Got dummy exchange (will reassign)"); + + if (!dummyReassign) { + timeout = 0; // Force refresh. + + continue; + } + } + + if (!exchFut.skipPreload()) { + assignsMap = new HashMap<>(); + + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + long delay = cacheCtx.config().getRebalanceDelay(); + + GridDhtPreloaderAssignments assigns = null; + + // Don't delay for dummy reassigns to avoid infinite recursion. + if (delay == 0 || forcePreload) + assigns = cacheCtx.preloader().assign(exchFut); + + assignsMap.put(cacheCtx.cacheId(), assigns); + } + } + } + finally { + // Must flip busy flag before assignments are given to demand workers. + busy = false; + } + + if (assignsMap != null) { + int size = assignsMap.size(); + + NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>(); + + for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) { + int cacheId = e.getKey(); + + GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + + int order = cacheCtx.config().getRebalanceOrder(); + + if (orderMap.get(order) == null) + orderMap.put(order, new ArrayList<Integer>(size)); + + orderMap.get(order).add(cacheId); + } + + Runnable r = null; + + List<String> rebList = new LinkedList<>(); + + boolean assignsCancelled = false; + + for (Integer order : orderMap.descendingKeySet()) { + for (Integer cacheId : orderMap.get(order)) { + GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + + GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId); + + if (assigns != null) + assignsCancelled |= assigns.cancelled(); + + // Cancels previous rebalance future (in case it's not done yet). + // Sends previous rebalance stopped event (if necessary). + // Creates new rebalance future. + // Sends current rebalance started event (if necessary). + // Finishes cache sync future (on empty assignments). + Runnable cur = cacheCtx.preloader().addAssignments(assigns, + forcePreload, + cnt, + r, + exchFut.forcedRebalanceFuture()); + + if (cur != null) { + rebList.add(U.maskName(cacheCtx.name())); + + r = cur; + } + } + } + + if (assignsCancelled) { // Pending exchange. + U.log(log, "Skipping rebalancing (obsolete exchange ID) " + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); + } + else if (r != null) { + Collections.reverse(rebList); + + U.log(log, "Rebalancing scheduled [order=" + rebList + "]"); + + if (!hasPendingExchange()) { + U.log(log, "Rebalancing started " + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); + + r.run(); // Starts rebalancing routine. + } + else + U.log(log, "Skipping rebalancing (obsolete exchange ID) " + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); + } + else + U.log(log, "Skipping rebalancing (nothing scheduled) " + + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); + } + } + catch (IgniteInterruptedCheckedException e) { + throw e; + } + catch (IgniteClientDisconnectedCheckedException ignored) { + return; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to wait for completion of partition map exchange " + + "(preloading will not start): " + exchFut, e); + } + } + } + } + + /** * Partition resend timeout object. */ private class ResendTimeoutObject implements GridTimeoutObject { http://git-wip-us.apache.org/repos/asf/ignite/blob/92524a44/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 46fb144..50937a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; +import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -84,7 +85,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS * Future for exchanging partition maps. */ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion> - implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture { + implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture, CachePartitionExchangeWorkerTask { /** */ public static final int DUMP_PENDING_OBJECTS_THRESHOLD = IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10); @@ -1677,6 +1678,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** {@inheritDoc} */ + @Override public boolean isExchange() { + return true; + } + + /** {@inheritDoc} */ @Override public int compareTo(GridDhtPartitionsExchangeFuture fut) { return exchId.compareTo(fut.exchId); }
