Repository: ignite
Updated Branches:
  refs/heads/ignite-4565-ddl 70a11912b -> 4538526d8


Moved exchange worker to separate class.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/23f67a5a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/23f67a5a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/23f67a5a

Branch: refs/heads/ignite-4565-ddl
Commit: 23f67a5a1b8e6c7429d110df858c8bdd17a913d7
Parents: 9020d12
Author: devozerov <[email protected]>
Authored: Fri Mar 17 15:44:57 2017 +0300
Committer: devozerov <[email protected]>
Committed: Fri Mar 17 15:44:57 2017 +0300

----------------------------------------------------------------------
 .../cache/CachePartitionExchangeWorker.java     | 355 +++++++++++++++++++
 .../GridCachePartitionExchangeManager.java      | 329 +----------------
 2 files changed, 368 insertions(+), 316 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/23f67a5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
new file mode 100644
index 0000000..98a9cc0
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
+
+/**
+ * Exchange future thread. All exchanges happen only by one thread and next
+ * exchange will not start until previous one completes.
+ */
+public class CachePartitionExchangeWorker<K, V> extends GridWorker {
+    /** Cache context. */
+    private final GridCacheSharedContext<K, V> cctx;
+
+    /** Exchange manager. */
+    private final GridCachePartitionExchangeManager<K, V> exchMgr;
+
+    /** Future queue. */
+    private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ =
+        new LinkedBlockingDeque<>();
+
+    /** Busy flag used as performance optimization to stop current preloading. 
*/
+    private volatile boolean busy;
+
+    /**
+     * Constructor.
+     *
+     * @param exchMgr Exchange manager.
+     * @param log Logger.
+     */
+    public CachePartitionExchangeWorker(GridCachePartitionExchangeManager<K, 
V> exchMgr, IgniteLogger log) {
+        super(exchMgr.context().igniteInstanceName(), "partition-exchanger", 
log);
+
+        this.cctx = exchMgr.context();
+
+        this.exchMgr = exchMgr;
+    }
+
+    /**
+     * Add first exchange future.
+     *
+     * @param fut Future.
+     */
+    public void addFirstFuture(GridDhtPartitionsExchangeFuture fut) {
+        futQ.addFirst(fut);
+    }
+
+    /**
+     * @param exchFut Exchange future.
+     */
+    void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
+        assert exchFut != null;
+
+        if (!exchFut.dummy() || (exchangeQueueIsEmpty() && !busy))
+            futQ.offer(exchFut);
+
+        if (log.isDebugEnabled())
+            log.debug("Added exchange future to exchange worker: " + exchFut);
+    }
+
+    /**
+     * Dump debug info.
+     */
+    public void dumpFuturesDebugInfo() {
+        U.warn(log, "Pending exchange futures:");
+
+        for (GridDhtPartitionsExchangeFuture fut : futQ)
+            U.warn(log, ">>> " + fut);
+    }
+
+    /**
+     * @return {@code True} iif exchange queue is empty.
+     */
+    public boolean exchangeQueueIsEmpty() {
+        return futQ.isEmpty();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
+        long timeout = cctx.gridConfig().getNetworkTimeout();
+
+        int cnt = 0;
+
+        while (!isCancelled()) {
+            GridDhtPartitionsExchangeFuture exchFut = null;
+
+            cnt++;
+
+            try {
+                boolean preloadFinished = true;
+
+                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                    preloadFinished &= cacheCtx.preloader() != null && 
cacheCtx.preloader().syncFuture().isDone();
+
+                    if (!preloadFinished)
+                        break;
+                }
+
+                // If not first preloading and no more topology events present.
+                if (!cctx.kernalContext().clientNode() && 
exchangeQueueIsEmpty() && preloadFinished)
+                    timeout = cctx.gridConfig().getNetworkTimeout();
+
+                // After workers line up and before preloading starts we 
initialize all futures.
+                if (log.isDebugEnabled()) {
+                    Collection<IgniteInternalFuture> unfinished = new 
HashSet<>();
+
+                    for (GridDhtPartitionsExchangeFuture fut : 
exchMgr.exchangeFutures()) {
+                        if (!fut.isDone())
+                            unfinished.add(fut);
+                    }
+
+                    log.debug("Before waiting for exchange futures [futs" + 
unfinished + ", worker=" + this + ']');
+                }
+
+                // Take next exchange future.
+                if (isCancelled())
+                    Thread.currentThread().interrupt();
+
+                exchFut = futQ.poll(timeout, MILLISECONDS);
+
+                if (exchFut == null)
+                    continue; // Main while loop.
+
+                busy = true;
+
+                Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
+
+                boolean dummyReassign = exchFut.dummyReassign();
+                boolean forcePreload = exchFut.forcePreload();
+
+                try {
+                    if (isCancelled())
+                        break;
+
+                    if (!exchFut.dummy() && !exchFut.forcePreload()) {
+                        exchMgr.lastTopologyFuture(exchFut);
+
+                        exchFut.init();
+
+                        int dumpedObjects = 0;
+
+                        while (true) {
+                            try {
+                                exchFut.get(2 * 
cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
+
+                                break;
+                            }
+                            catch (IgniteFutureTimeoutCheckedException 
ignored) {
+                                U.warn(log, "Failed to wait for partition map 
exchange [" +
+                                    "topVer=" + exchFut.topologyVersion() +
+                                    ", node=" + cctx.localNodeId() + "]. " +
+                                    "Dumping pending objects that might be the 
cause: ");
+
+                                if (dumpedObjects < 
GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
+                                    try {
+                                        
exchMgr.dumpDebugInfo(exchFut.topologyVersion());
+                                    }
+                                    catch (Exception e) {
+                                        U.error(log, "Failed to dump debug 
information: " + e, e);
+                                    }
+
+                                    if 
(IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, 
false))
+                                        U.dumpThreads(log);
+
+                                    dumpedObjects++;
+                                }
+                            }
+                        }
+
+
+                        if (log.isDebugEnabled())
+                            log.debug("After waiting for exchange future 
[exchFut=" + exchFut + ", worker=" +
+                                this + ']');
+
+                        boolean changed = false;
+
+                        // Just pick first worker to do this, so we don't
+                        // invoke topology callback more than once for the
+                        // same event.
+                        for (GridCacheContext cacheCtx : cctx.cacheContexts()) 
{
+                            if (cacheCtx.isLocal())
+                                continue;
+
+                            changed |= 
cacheCtx.topology().afterExchange(exchFut);
+                        }
+
+                        if (!cctx.kernalContext().clientNode() && changed && 
exchangeQueueIsEmpty())
+                            exchMgr.refreshPartitions();
+                    }
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Got dummy exchange (will reassign)");
+
+                        if (!dummyReassign) {
+                            timeout = 0; // Force refresh.
+
+                            continue;
+                        }
+                    }
+
+                    if (!exchFut.skipPreload()) {
+                        assignsMap = new HashMap<>();
+
+                        for (GridCacheContext cacheCtx : cctx.cacheContexts()) 
{
+                            long delay = cacheCtx.config().getRebalanceDelay();
+
+                            GridDhtPreloaderAssignments assigns = null;
+
+                            // Don't delay for dummy reassigns to avoid 
infinite recursion.
+                            if (delay == 0 || forcePreload)
+                                assigns = cacheCtx.preloader().assign(exchFut);
+
+                            assignsMap.put(cacheCtx.cacheId(), assigns);
+                        }
+                    }
+                }
+                finally {
+                    // Must flip busy flag before assignments are given to 
demand workers.
+                    busy = false;
+                }
+
+                if (assignsMap != null) {
+                    int size = assignsMap.size();
+
+                    NavigableMap<Integer, List<Integer>> orderMap = new 
TreeMap<>();
+
+                    for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : 
assignsMap.entrySet()) {
+                        int cacheId = e.getKey();
+
+                        GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(cacheId);
+
+                        int order = cacheCtx.config().getRebalanceOrder();
+
+                        if (orderMap.get(order) == null)
+                            orderMap.put(order, new ArrayList<Integer>(size));
+
+                        orderMap.get(order).add(cacheId);
+                    }
+
+                    Runnable r = null;
+
+                    List<String> rebList = new LinkedList<>();
+
+                    boolean assignsCancelled = false;
+
+                    for (Integer order : orderMap.descendingKeySet()) {
+                        for (Integer cacheId : orderMap.get(order)) {
+                            GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(cacheId);
+
+                            GridDhtPreloaderAssignments assigns = 
assignsMap.get(cacheId);
+
+                            if (assigns != null)
+                                assignsCancelled |= assigns.cancelled();
+
+                            // Cancels previous rebalance future (in case it's 
not done yet).
+                            // Sends previous rebalance stopped event (if 
necessary).
+                            // Creates new rebalance future.
+                            // Sends current rebalance started event (if 
necessary).
+                            // Finishes cache sync future (on empty 
assignments).
+                            Runnable cur = 
cacheCtx.preloader().addAssignments(assigns,
+                                forcePreload,
+                                cnt,
+                                r,
+                                exchFut.forcedRebalanceFuture());
+
+                            if (cur != null) {
+                                rebList.add(U.maskName(cacheCtx.name()));
+
+                                r = cur;
+                            }
+                        }
+                    }
+
+                    if (assignsCancelled) { // Pending exchange.
+                        U.log(log, "Skipping rebalancing (obsolete exchange 
ID) " +
+                            "[top=" + exchFut.topologyVersion() + ", evt=" + 
exchFut.discoveryEvent().name() +
+                            ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
+                    }
+                    else if (r != null) {
+                        Collections.reverse(rebList);
+
+                        U.log(log, "Rebalancing scheduled [order=" + rebList + 
"]");
+
+                        if (exchangeQueueIsEmpty()) {
+                            U.log(log, "Rebalancing started " +
+                                "[top=" + exchFut.topologyVersion() + ", evt=" 
+ exchFut.discoveryEvent().name() +
+                                ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
+
+                            r.run(); // Starts rebalancing routine.
+                        }
+                        else
+                            U.log(log, "Skipping rebalancing (obsolete 
exchange ID) " +
+                                "[top=" + exchFut.topologyVersion() + ", evt=" 
+ exchFut.discoveryEvent().name() +
+                                ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
+                    }
+                    else
+                        U.log(log, "Skipping rebalancing (nothing scheduled) " 
+
+                            "[top=" + exchFut.topologyVersion() + ", evt=" + 
exchFut.discoveryEvent().name() +
+                            ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
+                }
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw e;
+            }
+            catch (IgniteClientDisconnectedCheckedException ignored) {
+                return;
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to wait for completion of partition map 
exchange " +
+                    "(preloading will not start): " + exchFut, e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/23f67a5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 3e72efb..f9222bc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -21,24 +21,15 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -73,7 +64,6 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -90,7 +80,6 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
@@ -98,7 +87,6 @@ import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.getLong;
@@ -126,12 +114,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     /** */
     private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
 
-    /** Last partition refresh. */
-    private final AtomicLong lastRefresh = new AtomicLong(-1);
-
     /** */
     @GridToStringInclude
-    private ExchangeWorker exchWorker;
+    private CachePartitionExchangeWorker exchWorker;
 
     /** */
     @GridToStringExclude
@@ -297,7 +282,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     @Override protected void start0() throws IgniteCheckedException {
         super.start0();
 
-        exchWorker = new ExchangeWorker();
+        exchWorker = new CachePartitionExchangeWorker<>(this, log);
 
         cctx.gridEvents().addDiscoveryEventListener(discoLsnr, 
EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
             EVT_DISCOVERY_CUSTOM_EVT);
@@ -369,7 +354,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         if (reconnect)
             reconnectExchangeFut = new GridFutureAdapter<>();
 
-        exchWorker.futQ.addFirst(fut);
+        exchWorker.addFirstFuture(fut);
 
         if (!cctx.kernalContext().clientNode()) {
             for (int cnt = 0; cnt < 
cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
@@ -597,6 +582,13 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
+     * @param lastInitializedFut Last completed topology future.
+     */
+    public void lastTopologyFuture(GridDhtPartitionsExchangeFuture 
lastInitializedFut) {
+        this.lastInitializedFut = lastInitializedFut;
+    }
+
+    /**
      * @param ver Topology version.
      * @return Future or {@code null} is future is already completed.
      */
@@ -684,7 +676,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * @return {@code True} if pending future queue is empty.
      */
     public boolean hasPendingExchange() {
-        return !exchWorker.futQ.isEmpty();
+        return !exchWorker.exchangeQueueIsEmpty();
     }
 
     /**
@@ -739,7 +731,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     /**
      * Partition refresh callback.
      */
-    private void refreshPartitions() {
+    public void refreshPartitions() {
         ClusterNode oldest = 
cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
         if (oldest == null) {
@@ -1345,10 +1337,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
         U.warn(log, "Last exchange future: " + lastInitializedFut);
 
-        U.warn(log, "Pending exchange futures:");
-
-        for (GridDhtPartitionsExchangeFuture fut : exchWorker.futQ)
-            U.warn(log, ">>> " + fut);
+        exchWorker.dumpFuturesDebugInfo();
 
         if (!readyFuts.isEmpty()) {
             U.warn(log, "Pending affinity ready futures:");
@@ -1547,28 +1536,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
-     * @param deque Deque to poll from.
-     * @param time Time to wait.
-     * @param w Worker.
-     * @return Polled item.
-     * @throws InterruptedException If interrupted.
-     */
-    @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker 
w) throws InterruptedException {
-        assert w != null;
-
-        // There is currently a case where {@code interrupted}
-        // flag on a thread gets flipped during stop which causes the pool to 
hang.  This check
-        // will always make sure that interrupted flag gets reset before going 
into wait conditions.
-        // The true fix should actually make sure that interrupted flag does 
not get reset or that
-        // interrupted exception gets propagated. Until we find a real fix, 
this method should
-        // always work to make sure that there is no hanging during stop.
-        if (w.isCancelled())
-            Thread.currentThread().interrupt();
-
-        return deque.poll(time, MILLISECONDS);
-    }
-
-    /**
      * @param node Target node.
      * @return {@code True} if can use compression for partition map messages.
      */
@@ -1587,276 +1554,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
-     * Exchange future thread. All exchanges happen only by one thread and next
-     * exchange will not start until previous one completes.
-     */
-    private class ExchangeWorker extends GridWorker {
-        /** Future queue. */
-        private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> 
futQ =
-            new LinkedBlockingDeque<>();
-
-        /** Busy flag used as performance optimization to stop current 
preloading. */
-        private volatile boolean busy;
-
-        /**
-         *
-         */
-        private ExchangeWorker() {
-            super(cctx.igniteInstanceName(), "partition-exchanger", 
GridCachePartitionExchangeManager.this.log);
-        }
-
-        /**
-         * @param exchFut Exchange future.
-         */
-        void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
-            assert exchFut != null;
-
-            if (!exchFut.dummy() || (futQ.isEmpty() && !busy))
-                futQ.offer(exchFut);
-
-            if (log.isDebugEnabled())
-                log.debug("Added exchange future to exchange worker: " + 
exchFut);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
-            long timeout = cctx.gridConfig().getNetworkTimeout();
-
-            int cnt = 0;
-
-            while (!isCancelled()) {
-                GridDhtPartitionsExchangeFuture exchFut = null;
-
-                cnt++;
-
-                try {
-                    boolean preloadFinished = true;
-
-                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                        preloadFinished &= cacheCtx.preloader() != null && 
cacheCtx.preloader().syncFuture().isDone();
-
-                        if (!preloadFinished)
-                            break;
-                    }
-
-                    // If not first preloading and no more topology events 
present.
-                    if (!cctx.kernalContext().clientNode() && futQ.isEmpty() 
&& preloadFinished)
-                        timeout = cctx.gridConfig().getNetworkTimeout();
-
-                    // After workers line up and before preloading starts we 
initialize all futures.
-                    if (log.isDebugEnabled()) {
-                        Collection<IgniteInternalFuture> unfinished = new 
HashSet<>();
-
-                        for (GridDhtPartitionsExchangeFuture fut : 
exchFuts.values()) {
-                            if (!fut.isDone())
-                                unfinished.add(fut);
-                        }
-
-                        log.debug("Before waiting for exchange futures [futs" 
+ unfinished + ", worker=" + this + ']');
-                    }
-
-                    // Take next exchange future.
-                    exchFut = poll(futQ, timeout, this);
-
-                    if (exchFut == null)
-                        continue; // Main while loop.
-
-                    busy = true;
-
-                    Map<Integer, GridDhtPreloaderAssignments> assignsMap = 
null;
-
-                    boolean dummyReassign = exchFut.dummyReassign();
-                    boolean forcePreload = exchFut.forcePreload();
-
-                    try {
-                        if (isCancelled())
-                            break;
-
-                        if (!exchFut.dummy() && !exchFut.forcePreload()) {
-                            lastInitializedFut = exchFut;
-
-                            exchFut.init();
-
-                            int dumpedObjects = 0;
-
-                            while (true) {
-                                try {
-                                    exchFut.get(2 * 
cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
-
-                                    break;
-                                }
-                                catch (IgniteFutureTimeoutCheckedException 
ignored) {
-                                    U.warn(log, "Failed to wait for partition 
map exchange [" +
-                                        "topVer=" + exchFut.topologyVersion() +
-                                        ", node=" + cctx.localNodeId() + "]. " 
+
-                                        "Dumping pending objects that might be 
the cause: ");
-
-                                    if (dumpedObjects < 
GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
-                                        try {
-                                            
dumpDebugInfo(exchFut.topologyVersion());
-                                        }
-                                        catch (Exception e) {
-                                            U.error(log, "Failed to dump debug 
information: " + e, e);
-                                        }
-
-                                        if 
(IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, 
false))
-                                            U.dumpThreads(log);
-
-                                        dumpedObjects++;
-                                    }
-                                }
-                            }
-
-
-                            if (log.isDebugEnabled())
-                                log.debug("After waiting for exchange future 
[exchFut=" + exchFut + ", worker=" +
-                                    this + ']');
-
-                            if 
(exchFut.exchangeId().nodeId().equals(cctx.localNodeId()))
-                                lastRefresh.compareAndSet(-1, 
U.currentTimeMillis());
-
-                            boolean changed = false;
-
-                            // Just pick first worker to do this, so we don't
-                            // invoke topology callback more than once for the
-                            // same event.
-                            for (GridCacheContext cacheCtx : 
cctx.cacheContexts()) {
-                                if (cacheCtx.isLocal())
-                                    continue;
-
-                                changed |= 
cacheCtx.topology().afterExchange(exchFut);
-                            }
-
-                            if (!cctx.kernalContext().clientNode() && changed 
&& futQ.isEmpty())
-                                refreshPartitions();
-                        }
-                        else {
-                            if (log.isDebugEnabled())
-                                log.debug("Got dummy exchange (will 
reassign)");
-
-                            if (!dummyReassign) {
-                                timeout = 0; // Force refresh.
-
-                                continue;
-                            }
-                        }
-
-                        if (!exchFut.skipPreload()) {
-                            assignsMap = new HashMap<>();
-
-                            for (GridCacheContext cacheCtx : 
cctx.cacheContexts()) {
-                                long delay = 
cacheCtx.config().getRebalanceDelay();
-
-                                GridDhtPreloaderAssignments assigns = null;
-
-                                // Don't delay for dummy reassigns to avoid 
infinite recursion.
-                                if (delay == 0 || forcePreload)
-                                    assigns = 
cacheCtx.preloader().assign(exchFut);
-
-                                assignsMap.put(cacheCtx.cacheId(), assigns);
-                            }
-                        }
-                    }
-                    finally {
-                        // Must flip busy flag before assignments are given to 
demand workers.
-                        busy = false;
-                    }
-
-                    if (assignsMap != null) {
-                        int size = assignsMap.size();
-
-                        NavigableMap<Integer, List<Integer>> orderMap = new 
TreeMap<>();
-
-                        for (Map.Entry<Integer, GridDhtPreloaderAssignments> e 
: assignsMap.entrySet()) {
-                            int cacheId = e.getKey();
-
-                            GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(cacheId);
-
-                            int order = cacheCtx.config().getRebalanceOrder();
-
-                            if (orderMap.get(order) == null)
-                                orderMap.put(order, new 
ArrayList<Integer>(size));
-
-                            orderMap.get(order).add(cacheId);
-                        }
-
-                        Runnable r = null;
-
-                        List<String> rebList = new LinkedList<>();
-
-                        boolean assignsCancelled = false;
-
-                        for (Integer order : orderMap.descendingKeySet()) {
-                            for (Integer cacheId : orderMap.get(order)) {
-                                GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(cacheId);
-
-                                GridDhtPreloaderAssignments assigns = 
assignsMap.get(cacheId);
-
-                                if (assigns != null)
-                                    assignsCancelled |= assigns.cancelled();
-
-                                // Cancels previous rebalance future (in case 
it's not done yet).
-                                // Sends previous rebalance stopped event (if 
necessary).
-                                // Creates new rebalance future.
-                                // Sends current rebalance started event (if 
necessary).
-                                // Finishes cache sync future (on empty 
assignments).
-                                Runnable cur = 
cacheCtx.preloader().addAssignments(assigns,
-                                    forcePreload,
-                                    cnt,
-                                    r,
-                                    exchFut.forcedRebalanceFuture());
-
-                                if (cur != null) {
-                                    rebList.add(U.maskName(cacheCtx.name()));
-
-                                    r = cur;
-                                }
-                            }
-                        }
-
-                        if (assignsCancelled) { // Pending exchange.
-                            U.log(log, "Skipping rebalancing (obsolete 
exchange ID) " +
-                                "[top=" + exchFut.topologyVersion() + ", evt=" 
+ exchFut.discoveryEvent().name() +
-                                ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
-                        }
-                        else if (r != null) {
-                            Collections.reverse(rebList);
-
-                            U.log(log, "Rebalancing scheduled [order=" + 
rebList + "]");
-
-                            if (futQ.isEmpty()) {
-                                U.log(log, "Rebalancing started " +
-                                    "[top=" + exchFut.topologyVersion() + ", 
evt=" + exchFut.discoveryEvent().name() +
-                                    ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
-
-                                r.run(); // Starts rebalancing routine.
-                            }
-                            else
-                                U.log(log, "Skipping rebalancing (obsolete 
exchange ID) " +
-                                    "[top=" + exchFut.topologyVersion() + ", 
evt=" + exchFut.discoveryEvent().name() +
-                                    ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
-                        }
-                        else
-                            U.log(log, "Skipping rebalancing (nothing 
scheduled) " +
-                                "[top=" + exchFut.topologyVersion() + ", evt=" 
+ exchFut.discoveryEvent().name() +
-                                ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
-                    }
-                }
-                catch (IgniteInterruptedCheckedException e) {
-                    throw e;
-                }
-                catch (IgniteClientDisconnectedCheckedException ignored) {
-                    return;
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to wait for completion of partition 
map exchange " +
-                        "(preloading will not start): " + exchFut, e);
-                }
-            }
-        }
-    }
-
-    /**
      * Partition resend timeout object.
      */
     private class ResendTimeoutObject implements GridTimeoutObject {

Reply via email to