http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 2c9a760..5d80306 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -741,12 +741,12 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
                 }
 
                 // Need to wait for next topology version to remap.
-                IgniteInternalFuture<Long> topFut = 
cctx.discovery().topologyFuture(rmtTopVer.topologyVersion());
+                IgniteInternalFuture<AffinityTopologyVersion> topFut = 
cctx.affinity().affinityReadyFuture(rmtTopVer);
 
-                topFut.listen(new CIX1<IgniteInternalFuture<Long>>() {
+                topFut.listen(new 
CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                     @SuppressWarnings("unchecked")
-                    @Override public void applyx(IgniteInternalFuture<Long> 
fut) throws IgniteCheckedException {
-                        AffinityTopologyVersion topVer = new 
AffinityTopologyVersion(fut.get());
+                    @Override public void 
applyx(IgniteInternalFuture<AffinityTopologyVersion> fut) throws 
IgniteCheckedException {
+                        AffinityTopologyVersion topVer = fut.get();
 
                         // This will append new futures to compound list.
                         map(F.view(keys.keySet(), new P1<KeyCacheObject>() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 01e61bf..739e5ab 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -555,12 +555,12 @@ public class GridPartitionedSingleGetFuture extends 
GridFutureAdapter<Object> im
             }
 
             if (canRemap) {
-                IgniteInternalFuture<Long> topFut = 
cctx.discovery().topologyFuture(rmtTopVer.topologyVersion());
+                IgniteInternalFuture<AffinityTopologyVersion> topFut = 
cctx.affinity().affinityReadyFuture(rmtTopVer);
 
-                topFut.listen(new CIX1<IgniteInternalFuture<Long>>() {
-                    @Override public void applyx(IgniteInternalFuture<Long> 
fut) {
+                topFut.listen(new 
CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                    @Override public void 
applyx(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                         try {
-                            AffinityTopologyVersion topVer = new 
AffinityTopologyVersion(fut.get());
+                            AffinityTopologyVersion topVer = fut.get();
 
                             remap(topVer);
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 1b5b8ad..a7eaadf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -139,6 +139,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
 
     /** Update reply closure. */
+    @GridToStringExclude
     private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> 
updateReplyClos;
 
     /** Pending  */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 519df17..c4f48b0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -677,7 +677,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                     if (remapKeys != null) {
                         assert mapErrTopVer != null;
 
-                        remapTopVer = new 
AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
+                        remapTopVer = 
cctx.shared().exchange().topologyVersion();
                     }
                     else {
                         if (err != null &&

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 1a2eb22..216338f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -213,8 +213,9 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
 
         if (tx != null && !tx.implicit() && !skipTx) {
             return asyncOp(tx, new AsyncOp<V>() {
-                @Override public IgniteInternalFuture<V> 
op(IgniteTxLocalAdapter tx) {
+                @Override public IgniteInternalFuture<V> 
op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
                     IgniteInternalFuture<Map<Object, Object>>  fut = 
tx.getAllAsync(ctx,
+                        readyTopVer,
                         Collections.singleton(ctx.toCacheKeyObject(key)),
                         deserializeBinary,
                         skipVals,
@@ -294,8 +295,9 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
 
         if (tx != null && !tx.implicit() && !skipTx) {
             return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
-                @Override public IgniteInternalFuture<Map<K, V>> 
op(IgniteTxLocalAdapter tx) {
+                @Override public IgniteInternalFuture<Map<K, V>> 
op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
                     return tx.getAllAsync(ctx,
+                        readyTopVer,
                         ctx.cacheKeysView(keys),
                         deserializeBinary,
                         skipVals,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 0cf974f..dc225cf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -407,7 +407,6 @@ public class GridDhtPartitionDemander {
 
                 for (cnt = 0; cnt < lsnrCnt; cnt++) {
                     if (!sParts.get(cnt).isEmpty()) {
-
                         // Create copy.
                         GridDhtPartitionDemandMessage initD = new 
GridDhtPartitionDemandMessage(d, sParts.get(cnt));
 
@@ -416,11 +415,12 @@ public class GridDhtPartitionDemander {
                         initD.timeout(cctx.config().getRebalanceTimeout());
 
                         synchronized (fut) {
-                            if (!fut.isDone())
+                            if (!fut.isDone()) {
                                 // Future can be already cancelled at this 
moment and all failovers happened.
                                 // New requests will not be covered by 
failovers.
                                 cctx.io().sendOrderedMessage(node,
                                     rebalanceTopics.get(cnt), initD, 
cctx.ioPolicy(), initD.timeout());
+                            }
                         }
 
                         if (log.isDebugEnabled())
@@ -597,9 +597,8 @@ public class GridDhtPartitionDemander {
                         }
                     }
                     else {
-                        if (last) {
+                        if (last)
                             fut.partitionDone(id, p);
-                        }
 
                         if (log.isDebugEnabled())
                             log.debug("Skipping rebalancing partition (state 
is not MOVING): " + part);
@@ -861,7 +860,7 @@ public class GridDhtPartitionDemander {
                     log.debug("Rebalancing is not required [cache=" + 
cctx.name() +
                         ", topology=" + topVer + "]");
 
-                checkIsDone(cancelled);
+                checkIsDone(cancelled, true);
             }
         }
 
@@ -885,7 +884,7 @@ public class GridDhtPartitionDemander {
 
                 remaining.clear();
 
-                checkIsDone(true /* cancelled */);
+                checkIsDone(true /* cancelled */, false);
             }
 
             return true;
@@ -1022,13 +1021,14 @@ public class GridDhtPartitionDemander {
          *
          */
         private void checkIsDone() {
-            checkIsDone(false);
+            checkIsDone(false, false);
         }
 
         /**
          * @param cancelled Is cancelled.
+         * @param wasEmpty {@code True} if future was created without 
assignments.
          */
-        private void checkIsDone(boolean cancelled) {
+        private void checkIsDone(boolean cancelled, boolean wasEmpty) {
             if (remaining.isEmpty()) {
                 if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && 
(!cctx.isReplicated() || sndStoppedEvnt))
                     preloadEvent(EVT_CACHE_REBALANCE_STOPPED, 
exchFut.discoveryEvent());
@@ -1036,7 +1036,8 @@ public class GridDhtPartitionDemander {
                 if (log.isDebugEnabled())
                     log.debug("Completed rebalance future: " + this);
 
-                cctx.shared().exchange().scheduleResendPartitions();
+                if (!wasEmpty)
+                    cctx.shared().exchange().scheduleResendPartitions();
 
                 Collection<Integer> m = new HashSet<>();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
index 6cbc66b..54dfb68 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
@@ -187,14 +187,16 @@ public class GridDhtPartitionMap2 implements 
Comparable<GridDhtPartitionMap2>, E
 
     /**
      * @param updateSeq New update sequence value.
+     * @param topVer Current topology version.
      * @return Old update sequence value.
      */
-    public long updateSequence(long updateSeq) {
+    public long updateSequence(long updateSeq, AffinityTopologyVersion topVer) 
{
         long old = this.updateSeq;
 
         assert updateSeq >= old : "Invalid update sequence [cur=" + old + ", 
new=" + updateSeq + ']';
 
         this.updateSeq = updateSeq;
+        this.top = topVer;
 
         return old;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 4e33d8e..d301ba9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -37,8 +37,10 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.IgniteSpiException;
@@ -113,7 +115,6 @@ class GridDhtPartitionSupplier {
      *
      * @param sc Supply context.
      * @param log Logger.
-     * @return true in case context was removed.
      */
     private static void clearContext(
         final SupplyContext sc,
@@ -126,7 +127,7 @@ class GridDhtPartitionSupplier {
                     ((GridCloseableIterator)it).close();
                 }
                 catch (IgniteCheckedException e) {
-                    log.error("Iterator close failed.", e);
+                    U.error(log, "Iterator close failed.", e);
                 }
             }
 
@@ -152,7 +153,7 @@ class GridDhtPartitionSupplier {
             while (it.hasNext()) {
                 T3<UUID, Integer, AffinityTopologyVersion> t = it.next();
 
-                if (topVer.compareTo(t.get3()) > 0) {// Clear all obsolete 
contexts.
+                if (topVer.compareTo(t.get3()) > 0) { // Clear all obsolete 
contexts.
                     clearContext(scMap.get(t), log);
 
                     it.remove();
@@ -187,7 +188,7 @@ class GridDhtPartitionSupplier {
 
         T3<UUID, Integer, AffinityTopologyVersion> scId = new T3<>(id, idx, 
demTop);
 
-        if (d.updateSequence() == -1) {//Demand node requested context cleanup.
+        if (d.updateSequence() == -1) { //Demand node requested context 
cleanup.
             synchronized (scMap) {
                 clearContext(scMap.remove(scId), log);
 
@@ -213,7 +214,7 @@ class GridDhtPartitionSupplier {
         ClusterNode node = cctx.discovery().node(id);
 
         if (node == null)
-            return; //Context will be cleaned at topology change.
+            return; // Context will be cleaned at topology change.
 
         try {
             SupplyContext sctx;
@@ -674,9 +675,13 @@ class GridDhtPartitionSupplier {
      * Supply context phase.
      */
     private enum SupplyContextPhase {
+        /** */
         NEW,
+        /** */
         ONHEAP,
+        /** */
         SWAP,
+        /** */
         EVICTED
     }
 
@@ -688,12 +693,15 @@ class GridDhtPartitionSupplier {
         private final SupplyContextPhase phase;
 
         /** Partition iterator. */
+        @GridToStringExclude
         private final Iterator<Integer> partIt;
 
         /** Entry iterator. */
+        @GridToStringExclude
         private final Iterator<?> entryIt;
 
         /** Swap listener. */
+        @GridToStringExclude
         private final GridCacheEntryInfoCollectSwapListener swapLsnr;
 
         /** Partition. */
@@ -708,6 +716,8 @@ class GridDhtPartitionSupplier {
         /**
          * @param phase Phase.
          * @param partIt Partition iterator.
+         * @param loc Partition.
+         * @param updateSeq Update sequence.
          * @param entryIt Entry iterator.
          * @param swapLsnr Swap listener.
          * @param part Partition.
@@ -727,6 +737,11 @@ class GridDhtPartitionSupplier {
             this.loc = loc;
             this.updateSeq = updateSeq;
         }
+
+        /** {@inheritDoc} */
+        public String toString() {
+            return S.toString(SupplyContext.class, this);
+        }
     }
 
     @Deprecated//Backward compatibility. To be removed in future.
@@ -742,10 +757,8 @@ class GridDhtPartitionSupplier {
 
     @Deprecated//Backward compatibility. To be removed in future.
     public void stopOldListeners() {
-        if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) {
-
+        if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled())
             cctx.io().removeHandler(cctx.cacheId(), 
GridDhtPartitionDemandMessage.class);
-        }
     }
 
     /**

Reply via email to