WIP.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/92524a44
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/92524a44
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/92524a44

Branch: refs/heads/ignite-4565-ddl
Commit: 92524a44eb8ccfb43901557f23368fc325e46c59
Parents: 23f67a5
Author: devozerov <[email protected]>
Authored: Fri Mar 17 16:04:37 2017 +0300
Committer: devozerov <[email protected]>
Committed: Fri Mar 17 16:04:37 2017 +0300

----------------------------------------------------------------------
 .../cache/CachePartitionExchangeWorker.java     | 355 -------------------
 .../cache/CachePartitionExchangeWorkerTask.java |  29 ++
 .../GridCachePartitionExchangeManager.java      | 335 ++++++++++++++++-
 .../GridDhtPartitionsExchangeFuture.java        |   8 +-
 4 files changed, 357 insertions(+), 370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/92524a44/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
deleted file mode 100644
index 98a9cc0..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
-
-/**
- * Exchange future thread. All exchanges happen only by one thread and next
- * exchange will not start until previous one completes.
- */
-public class CachePartitionExchangeWorker<K, V> extends GridWorker {
-    /** Cache context. */
-    private final GridCacheSharedContext<K, V> cctx;
-
-    /** Exchange manager. */
-    private final GridCachePartitionExchangeManager<K, V> exchMgr;
-
-    /** Future queue. */
-    private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ =
-        new LinkedBlockingDeque<>();
-
-    /** Busy flag used as performance optimization to stop current preloading. 
*/
-    private volatile boolean busy;
-
-    /**
-     * Constructor.
-     *
-     * @param exchMgr Exchange manager.
-     * @param log Logger.
-     */
-    public CachePartitionExchangeWorker(GridCachePartitionExchangeManager<K, 
V> exchMgr, IgniteLogger log) {
-        super(exchMgr.context().igniteInstanceName(), "partition-exchanger", 
log);
-
-        this.cctx = exchMgr.context();
-
-        this.exchMgr = exchMgr;
-    }
-
-    /**
-     * Add first exchange future.
-     *
-     * @param fut Future.
-     */
-    public void addFirstFuture(GridDhtPartitionsExchangeFuture fut) {
-        futQ.addFirst(fut);
-    }
-
-    /**
-     * @param exchFut Exchange future.
-     */
-    void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
-        assert exchFut != null;
-
-        if (!exchFut.dummy() || (exchangeQueueIsEmpty() && !busy))
-            futQ.offer(exchFut);
-
-        if (log.isDebugEnabled())
-            log.debug("Added exchange future to exchange worker: " + exchFut);
-    }
-
-    /**
-     * Dump debug info.
-     */
-    public void dumpFuturesDebugInfo() {
-        U.warn(log, "Pending exchange futures:");
-
-        for (GridDhtPartitionsExchangeFuture fut : futQ)
-            U.warn(log, ">>> " + fut);
-    }
-
-    /**
-     * @return {@code True} iif exchange queue is empty.
-     */
-    public boolean exchangeQueueIsEmpty() {
-        return futQ.isEmpty();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
-        long timeout = cctx.gridConfig().getNetworkTimeout();
-
-        int cnt = 0;
-
-        while (!isCancelled()) {
-            GridDhtPartitionsExchangeFuture exchFut = null;
-
-            cnt++;
-
-            try {
-                boolean preloadFinished = true;
-
-                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    preloadFinished &= cacheCtx.preloader() != null && 
cacheCtx.preloader().syncFuture().isDone();
-
-                    if (!preloadFinished)
-                        break;
-                }
-
-                // If not first preloading and no more topology events present.
-                if (!cctx.kernalContext().clientNode() && 
exchangeQueueIsEmpty() && preloadFinished)
-                    timeout = cctx.gridConfig().getNetworkTimeout();
-
-                // After workers line up and before preloading starts we 
initialize all futures.
-                if (log.isDebugEnabled()) {
-                    Collection<IgniteInternalFuture> unfinished = new 
HashSet<>();
-
-                    for (GridDhtPartitionsExchangeFuture fut : 
exchMgr.exchangeFutures()) {
-                        if (!fut.isDone())
-                            unfinished.add(fut);
-                    }
-
-                    log.debug("Before waiting for exchange futures [futs" + 
unfinished + ", worker=" + this + ']');
-                }
-
-                // Take next exchange future.
-                if (isCancelled())
-                    Thread.currentThread().interrupt();
-
-                exchFut = futQ.poll(timeout, MILLISECONDS);
-
-                if (exchFut == null)
-                    continue; // Main while loop.
-
-                busy = true;
-
-                Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
-
-                boolean dummyReassign = exchFut.dummyReassign();
-                boolean forcePreload = exchFut.forcePreload();
-
-                try {
-                    if (isCancelled())
-                        break;
-
-                    if (!exchFut.dummy() && !exchFut.forcePreload()) {
-                        exchMgr.lastTopologyFuture(exchFut);
-
-                        exchFut.init();
-
-                        int dumpedObjects = 0;
-
-                        while (true) {
-                            try {
-                                exchFut.get(2 * 
cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
-
-                                break;
-                            }
-                            catch (IgniteFutureTimeoutCheckedException 
ignored) {
-                                U.warn(log, "Failed to wait for partition map 
exchange [" +
-                                    "topVer=" + exchFut.topologyVersion() +
-                                    ", node=" + cctx.localNodeId() + "]. " +
-                                    "Dumping pending objects that might be the 
cause: ");
-
-                                if (dumpedObjects < 
GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
-                                    try {
-                                        
exchMgr.dumpDebugInfo(exchFut.topologyVersion());
-                                    }
-                                    catch (Exception e) {
-                                        U.error(log, "Failed to dump debug 
information: " + e, e);
-                                    }
-
-                                    if 
(IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, 
false))
-                                        U.dumpThreads(log);
-
-                                    dumpedObjects++;
-                                }
-                            }
-                        }
-
-
-                        if (log.isDebugEnabled())
-                            log.debug("After waiting for exchange future 
[exchFut=" + exchFut + ", worker=" +
-                                this + ']');
-
-                        boolean changed = false;
-
-                        // Just pick first worker to do this, so we don't
-                        // invoke topology callback more than once for the
-                        // same event.
-                        for (GridCacheContext cacheCtx : cctx.cacheContexts()) 
{
-                            if (cacheCtx.isLocal())
-                                continue;
-
-                            changed |= 
cacheCtx.topology().afterExchange(exchFut);
-                        }
-
-                        if (!cctx.kernalContext().clientNode() && changed && 
exchangeQueueIsEmpty())
-                            exchMgr.refreshPartitions();
-                    }
-                    else {
-                        if (log.isDebugEnabled())
-                            log.debug("Got dummy exchange (will reassign)");
-
-                        if (!dummyReassign) {
-                            timeout = 0; // Force refresh.
-
-                            continue;
-                        }
-                    }
-
-                    if (!exchFut.skipPreload()) {
-                        assignsMap = new HashMap<>();
-
-                        for (GridCacheContext cacheCtx : cctx.cacheContexts()) 
{
-                            long delay = cacheCtx.config().getRebalanceDelay();
-
-                            GridDhtPreloaderAssignments assigns = null;
-
-                            // Don't delay for dummy reassigns to avoid 
infinite recursion.
-                            if (delay == 0 || forcePreload)
-                                assigns = cacheCtx.preloader().assign(exchFut);
-
-                            assignsMap.put(cacheCtx.cacheId(), assigns);
-                        }
-                    }
-                }
-                finally {
-                    // Must flip busy flag before assignments are given to 
demand workers.
-                    busy = false;
-                }
-
-                if (assignsMap != null) {
-                    int size = assignsMap.size();
-
-                    NavigableMap<Integer, List<Integer>> orderMap = new 
TreeMap<>();
-
-                    for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : 
assignsMap.entrySet()) {
-                        int cacheId = e.getKey();
-
-                        GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(cacheId);
-
-                        int order = cacheCtx.config().getRebalanceOrder();
-
-                        if (orderMap.get(order) == null)
-                            orderMap.put(order, new ArrayList<Integer>(size));
-
-                        orderMap.get(order).add(cacheId);
-                    }
-
-                    Runnable r = null;
-
-                    List<String> rebList = new LinkedList<>();
-
-                    boolean assignsCancelled = false;
-
-                    for (Integer order : orderMap.descendingKeySet()) {
-                        for (Integer cacheId : orderMap.get(order)) {
-                            GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(cacheId);
-
-                            GridDhtPreloaderAssignments assigns = 
assignsMap.get(cacheId);
-
-                            if (assigns != null)
-                                assignsCancelled |= assigns.cancelled();
-
-                            // Cancels previous rebalance future (in case it's 
not done yet).
-                            // Sends previous rebalance stopped event (if 
necessary).
-                            // Creates new rebalance future.
-                            // Sends current rebalance started event (if 
necessary).
-                            // Finishes cache sync future (on empty 
assignments).
-                            Runnable cur = 
cacheCtx.preloader().addAssignments(assigns,
-                                forcePreload,
-                                cnt,
-                                r,
-                                exchFut.forcedRebalanceFuture());
-
-                            if (cur != null) {
-                                rebList.add(U.maskName(cacheCtx.name()));
-
-                                r = cur;
-                            }
-                        }
-                    }
-
-                    if (assignsCancelled) { // Pending exchange.
-                        U.log(log, "Skipping rebalancing (obsolete exchange 
ID) " +
-                            "[top=" + exchFut.topologyVersion() + ", evt=" + 
exchFut.discoveryEvent().name() +
-                            ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
-                    }
-                    else if (r != null) {
-                        Collections.reverse(rebList);
-
-                        U.log(log, "Rebalancing scheduled [order=" + rebList + 
"]");
-
-                        if (exchangeQueueIsEmpty()) {
-                            U.log(log, "Rebalancing started " +
-                                "[top=" + exchFut.topologyVersion() + ", evt=" 
+ exchFut.discoveryEvent().name() +
-                                ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
-
-                            r.run(); // Starts rebalancing routine.
-                        }
-                        else
-                            U.log(log, "Skipping rebalancing (obsolete 
exchange ID) " +
-                                "[top=" + exchFut.topologyVersion() + ", evt=" 
+ exchFut.discoveryEvent().name() +
-                                ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
-                    }
-                    else
-                        U.log(log, "Skipping rebalancing (nothing scheduled) " 
+
-                            "[top=" + exchFut.topologyVersion() + ", evt=" + 
exchFut.discoveryEvent().name() +
-                            ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
-                }
-            }
-            catch (IgniteInterruptedCheckedException e) {
-                throw e;
-            }
-            catch (IgniteClientDisconnectedCheckedException ignored) {
-                return;
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to wait for completion of partition map 
exchange " +
-                    "(preloading will not start): " + exchFut, e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/92524a44/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
new file mode 100644
index 0000000..80ef9f5
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+/**
+ * Cache partition exchange worker task marker interface.
+ */
+public interface CachePartitionExchangeWorkerTask {
+    /**
+     * @return {@code True) if task denotes standard exchange task, {@code 
false} if this is a custom task which
+     * must be executed from within exchange thread.
+     */
+    boolean isExchange();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/92524a44/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f9222bc..444b530 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -21,15 +21,24 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -64,6 +73,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -80,6 +90,7 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
@@ -87,6 +98,7 @@ import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.getLong;
@@ -114,9 +126,12 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     /** */
     private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
 
+    /** Last partition refresh. */
+    private final AtomicLong lastRefresh = new AtomicLong(-1);
+
     /** */
     @GridToStringInclude
-    private CachePartitionExchangeWorker exchWorker;
+    private ExchangeWorker exchWorker;
 
     /** */
     @GridToStringExclude
@@ -282,7 +297,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     @Override protected void start0() throws IgniteCheckedException {
         super.start0();
 
-        exchWorker = new CachePartitionExchangeWorker<>(this, log);
+        exchWorker = new ExchangeWorker();
 
         cctx.gridEvents().addDiscoveryEventListener(discoLsnr, 
EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
             EVT_DISCOVERY_CUSTOM_EVT);
@@ -582,13 +597,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
-     * @param lastInitializedFut Last completed topology future.
-     */
-    public void lastTopologyFuture(GridDhtPartitionsExchangeFuture 
lastInitializedFut) {
-        this.lastInitializedFut = lastInitializedFut;
-    }
-
-    /**
      * @param ver Topology version.
      * @return Future or {@code null} is future is already completed.
      */
@@ -676,7 +684,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * @return {@code True} if pending future queue is empty.
      */
     public boolean hasPendingExchange() {
-        return !exchWorker.exchangeQueueIsEmpty();
+        return exchWorker.hasPendingExchange();
     }
 
     /**
@@ -731,7 +739,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     /**
      * Partition refresh callback.
      */
-    public void refreshPartitions() {
+    private void refreshPartitions() {
         ClusterNode oldest = 
cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
         if (oldest == null) {
@@ -811,8 +819,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         @Nullable GridCacheVersion lastVer,
         boolean compress) {
         GridDhtPartitionsFullMessage m = new 
GridDhtPartitionsFullMessage(exchId,
-                lastVer,
-                exchId != null ? exchId.topologyVersion() : 
AffinityTopologyVersion.NONE);
+            lastVer,
+            exchId != null ? exchId.topologyVersion() : 
AffinityTopologyVersion.NONE);
 
         boolean useOldApi = false;
 
@@ -1337,7 +1345,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
         U.warn(log, "Last exchange future: " + lastInitializedFut);
 
-        exchWorker.dumpFuturesDebugInfo();
+        exchWorker.dumpExchangeDebugInfo();
 
         if (!readyFuts.isEmpty()) {
             U.warn(log, "Pending affinity ready futures:");
@@ -1554,6 +1562,305 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
+     * Exchange future thread. All exchanges happen only by one thread and next
+     * exchange will not start until previous one completes.
+     */
+    private class ExchangeWorker extends GridWorker {
+        /** Future queue. */
+        private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> 
futQ =
+            new LinkedBlockingDeque<>();
+
+        /** Busy flag used as performance optimization to stop current 
preloading. */
+        private volatile boolean busy;
+
+        /**
+         * Constructor.
+         */
+        private ExchangeWorker() {
+            super(cctx.igniteInstanceName(), "partition-exchanger", 
GridCachePartitionExchangeManager.this.log);
+        }
+
+        /**
+         * Add first exchange future.
+         *
+         * @param exchFut Exchange future.
+         */
+        void addFirstFuture(GridDhtPartitionsExchangeFuture exchFut) {
+            futQ.addFirst(exchFut);
+        }
+
+        /**
+         * @param exchFut Exchange future.
+         */
+        void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
+            assert exchFut != null;
+
+            if (!exchFut.dummy() || (!hasPendingExchange() && !busy))
+                futQ.offer(exchFut);
+
+            if (log.isDebugEnabled())
+                log.debug("Added exchange future to exchange worker: " + 
exchFut);
+        }
+
+        /**
+         * @return Whether pending exchange future exists.
+         */
+        boolean hasPendingExchange() {
+            return !futQ.isEmpty();
+        }
+
+        /**
+         * Dump debug info.
+         */
+        void dumpExchangeDebugInfo() {
+            U.warn(log, "Pending exchange futures:");
+
+            for (GridDhtPartitionsExchangeFuture fut : futQ)
+                U.warn(log, ">>> " + fut);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
+            long timeout = cctx.gridConfig().getNetworkTimeout();
+
+            int cnt = 0;
+
+            while (!isCancelled()) {
+                GridDhtPartitionsExchangeFuture exchFut = null;
+
+                cnt++;
+
+                try {
+                    boolean preloadFinished = true;
+
+                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                        preloadFinished &= cacheCtx.preloader() != null && 
cacheCtx.preloader().syncFuture().isDone();
+
+                        if (!preloadFinished)
+                            break;
+                    }
+
+                    // If not first preloading and no more topology events 
present.
+                    if (!cctx.kernalContext().clientNode() && 
!hasPendingExchange() && preloadFinished)
+                        timeout = cctx.gridConfig().getNetworkTimeout();
+
+                    // After workers line up and before preloading starts we 
initialize all futures.
+                    if (log.isDebugEnabled()) {
+                        Collection<IgniteInternalFuture> unfinished = new 
HashSet<>();
+
+                        for (GridDhtPartitionsExchangeFuture fut : 
exchFuts.values()) {
+                            if (!fut.isDone())
+                                unfinished.add(fut);
+                        }
+
+                        log.debug("Before waiting for exchange futures [futs" 
+ unfinished + ", worker=" + this + ']');
+                    }
+
+                    // Take next exchange future.
+                    if (isCancelled())
+                        Thread.currentThread().interrupt();
+
+                    exchFut = futQ.poll(timeout, MILLISECONDS);
+
+                    if (exchFut == null)
+                        continue; // Main while loop.
+
+                    busy = true;
+
+                    Map<Integer, GridDhtPreloaderAssignments> assignsMap = 
null;
+
+                    boolean dummyReassign = exchFut.dummyReassign();
+                    boolean forcePreload = exchFut.forcePreload();
+
+                    try {
+                        if (isCancelled())
+                            break;
+
+                        if (!exchFut.dummy() && !exchFut.forcePreload()) {
+                            lastInitializedFut = exchFut;
+
+                            exchFut.init();
+
+                            int dumpedObjects = 0;
+
+                            while (true) {
+                                try {
+                                    exchFut.get(2 * 
cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
+
+                                    break;
+                                }
+                                catch (IgniteFutureTimeoutCheckedException 
ignored) {
+                                    U.warn(log, "Failed to wait for partition 
map exchange [" +
+                                        "topVer=" + exchFut.topologyVersion() +
+                                        ", node=" + cctx.localNodeId() + "]. " 
+
+                                        "Dumping pending objects that might be 
the cause: ");
+
+                                    if (dumpedObjects < 
GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
+                                        try {
+                                            
dumpDebugInfo(exchFut.topologyVersion());
+                                        }
+                                        catch (Exception e) {
+                                            U.error(log, "Failed to dump debug 
information: " + e, e);
+                                        }
+
+                                        if 
(IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, 
false))
+                                            U.dumpThreads(log);
+
+                                        dumpedObjects++;
+                                    }
+                                }
+                            }
+
+
+                            if (log.isDebugEnabled())
+                                log.debug("After waiting for exchange future 
[exchFut=" + exchFut + ", worker=" +
+                                    this + ']');
+
+                            if 
(exchFut.exchangeId().nodeId().equals(cctx.localNodeId()))
+                                lastRefresh.compareAndSet(-1, 
U.currentTimeMillis());
+
+                            boolean changed = false;
+
+                            // Just pick first worker to do this, so we don't
+                            // invoke topology callback more than once for the
+                            // same event.
+                            for (GridCacheContext cacheCtx : 
cctx.cacheContexts()) {
+                                if (cacheCtx.isLocal())
+                                    continue;
+
+                                changed |= 
cacheCtx.topology().afterExchange(exchFut);
+                            }
+
+                            if (!cctx.kernalContext().clientNode() && changed 
&& !hasPendingExchange())
+                                refreshPartitions();
+                        }
+                        else {
+                            if (log.isDebugEnabled())
+                                log.debug("Got dummy exchange (will 
reassign)");
+
+                            if (!dummyReassign) {
+                                timeout = 0; // Force refresh.
+
+                                continue;
+                            }
+                        }
+
+                        if (!exchFut.skipPreload()) {
+                            assignsMap = new HashMap<>();
+
+                            for (GridCacheContext cacheCtx : 
cctx.cacheContexts()) {
+                                long delay = 
cacheCtx.config().getRebalanceDelay();
+
+                                GridDhtPreloaderAssignments assigns = null;
+
+                                // Don't delay for dummy reassigns to avoid 
infinite recursion.
+                                if (delay == 0 || forcePreload)
+                                    assigns = 
cacheCtx.preloader().assign(exchFut);
+
+                                assignsMap.put(cacheCtx.cacheId(), assigns);
+                            }
+                        }
+                    }
+                    finally {
+                        // Must flip busy flag before assignments are given to 
demand workers.
+                        busy = false;
+                    }
+
+                    if (assignsMap != null) {
+                        int size = assignsMap.size();
+
+                        NavigableMap<Integer, List<Integer>> orderMap = new 
TreeMap<>();
+
+                        for (Map.Entry<Integer, GridDhtPreloaderAssignments> e 
: assignsMap.entrySet()) {
+                            int cacheId = e.getKey();
+
+                            GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(cacheId);
+
+                            int order = cacheCtx.config().getRebalanceOrder();
+
+                            if (orderMap.get(order) == null)
+                                orderMap.put(order, new 
ArrayList<Integer>(size));
+
+                            orderMap.get(order).add(cacheId);
+                        }
+
+                        Runnable r = null;
+
+                        List<String> rebList = new LinkedList<>();
+
+                        boolean assignsCancelled = false;
+
+                        for (Integer order : orderMap.descendingKeySet()) {
+                            for (Integer cacheId : orderMap.get(order)) {
+                                GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(cacheId);
+
+                                GridDhtPreloaderAssignments assigns = 
assignsMap.get(cacheId);
+
+                                if (assigns != null)
+                                    assignsCancelled |= assigns.cancelled();
+
+                                // Cancels previous rebalance future (in case 
it's not done yet).
+                                // Sends previous rebalance stopped event (if 
necessary).
+                                // Creates new rebalance future.
+                                // Sends current rebalance started event (if 
necessary).
+                                // Finishes cache sync future (on empty 
assignments).
+                                Runnable cur = 
cacheCtx.preloader().addAssignments(assigns,
+                                    forcePreload,
+                                    cnt,
+                                    r,
+                                    exchFut.forcedRebalanceFuture());
+
+                                if (cur != null) {
+                                    rebList.add(U.maskName(cacheCtx.name()));
+
+                                    r = cur;
+                                }
+                            }
+                        }
+
+                        if (assignsCancelled) { // Pending exchange.
+                            U.log(log, "Skipping rebalancing (obsolete 
exchange ID) " +
+                                "[top=" + exchFut.topologyVersion() + ", evt=" 
+ exchFut.discoveryEvent().name() +
+                                ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
+                        }
+                        else if (r != null) {
+                            Collections.reverse(rebList);
+
+                            U.log(log, "Rebalancing scheduled [order=" + 
rebList + "]");
+
+                            if (!hasPendingExchange()) {
+                                U.log(log, "Rebalancing started " +
+                                    "[top=" + exchFut.topologyVersion() + ", 
evt=" + exchFut.discoveryEvent().name() +
+                                    ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
+
+                                r.run(); // Starts rebalancing routine.
+                            }
+                            else
+                                U.log(log, "Skipping rebalancing (obsolete 
exchange ID) " +
+                                    "[top=" + exchFut.topologyVersion() + ", 
evt=" + exchFut.discoveryEvent().name() +
+                                    ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
+                        }
+                        else
+                            U.log(log, "Skipping rebalancing (nothing 
scheduled) " +
+                                "[top=" + exchFut.topologyVersion() + ", evt=" 
+ exchFut.discoveryEvent().name() +
+                                ", node=" + 
exchFut.discoveryEvent().eventNode().id() + ']');
+                    }
+                }
+                catch (IgniteInterruptedCheckedException e) {
+                    throw e;
+                }
+                catch (IgniteClientDisconnectedCheckedException ignored) {
+                    return;
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to wait for completion of partition 
map exchange " +
+                        "(preloading will not start): " + exchFut, e);
+                }
+            }
+        }
+    }
+
+    /**
      * Partition resend timeout object.
      */
     private class ResendTimeoutObject implements GridTimeoutObject {

http://git-wip-us.apache.org/repos/asf/ignite/blob/92524a44/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 46fb144..50937a8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -48,6 +48,7 @@ import 
org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import 
org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -84,7 +85,7 @@ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
  * Future for exchanging partition maps.
  */
 public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityTopologyVersion>
-    implements Comparable<GridDhtPartitionsExchangeFuture>, 
GridDhtTopologyFuture {
+    implements Comparable<GridDhtPartitionsExchangeFuture>, 
GridDhtTopologyFuture, CachePartitionExchangeWorkerTask {
     /** */
     public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
         
IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD,
 10);
@@ -1677,6 +1678,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     }
 
     /** {@inheritDoc} */
+    @Override public boolean isExchange() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @Override public int compareTo(GridDhtPartitionsExchangeFuture fut) {
         return exchId.compareTo(fut.exchId);
     }

Reply via email to