Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 d4030225a -> 51a95a14c


ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/51a95a14
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/51a95a14
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/51a95a14

Branch: refs/heads/ignite-5578
Commit: 51a95a14c1a4377407684d7de94e562c2f8dfcb3
Parents: d403022
Author: sboikov <[email protected]>
Authored: Thu Aug 3 16:33:34 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu Aug 3 17:26:22 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  2 +
 .../cache/CachePartitionExchangeWorkerTask.java |  3 ++
 .../processors/cache/ClusterCachesInfo.java     | 22 +++++++++--
 .../processors/cache/ExchangeContext.java       |  2 +-
 .../GridCachePartitionExchangeManager.java      | 20 ++++++----
 .../processors/cache/GridCacheProcessor.java    |  8 +++-
 .../dht/GridDhtPartitionTopology.java           |  9 +++--
 .../dht/GridDhtPartitionTopologyImpl.java       |  7 +---
 .../preloader/CacheGroupAffinityMessage.java    | 13 ++++++-
 .../GridDhtPartitionsExchangeFuture.java        | 39 ++++++--------------
 .../preloader/GridDhtPartitionsFullMessage.java | 22 +++++------
 .../GridDhtPartitionsSingleMessage.java         |  5 ++-
 .../GridDhtPartitionsSingleRequest.java         |  9 ++---
 .../IgniteDhtPartitionCountersMap.java          |  3 ++
 .../near/GridNearTxPrepareRequest.java          |  5 +++
 .../datastreamer/PlatformDataStreamer.java      |  2 +-
 .../ignite/internal/util/GridListSet.java       |  8 ----
 17 files changed, 100 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index dc905a0..d02df5c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1867,6 +1867,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     }
 
     /**
+     * @param evts Discovery events processed during exchange.
+     * @param addedOnExchnage {@code True} if cache group was added during 
this exchange.
      * @param aff Affinity.
      * @param rebalanceInfo Rebalance information.
      * @param latePrimary If {@code true} delays primary assignment if it is 
not owner.

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
index cfbfa6e..f4c1392 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
@@ -21,5 +21,8 @@ package org.apache.ignite.internal.processors.cache;
  * Cache partition exchange worker task marker interface.
  */
 public interface CachePartitionExchangeWorkerTask {
+    /**
+     * @return {@code False} if exchange merge should stop if this task is 
found in exchange worker queue.
+     */
     boolean skipForExchangeMerge();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index bb51a3b..5e2c8db 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -811,9 +811,26 @@ class ClusterCachesInfo {
 
     /**
      * @param joinedNodeId Joined node ID.
+     * @return {@code True} if there are new caches received from joined node.
+     */
+    boolean hasCachesReceivedFromJoin(UUID joinedNodeId) {
+        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+            if (desc.staticallyConfigured()) {
+                assert desc.receivedFrom() != null : desc;
+
+                if (joinedNodeId.equals(desc.receivedFrom()))
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * @param joinedNodeId Joined node ID.
      * @return New caches received from joined node.
      */
-    @NotNull public List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID 
joinedNodeId) {
+    List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) {
         assert joinedNodeId != null;
 
         List<DynamicCacheDescriptor> started = null;
@@ -1732,8 +1749,7 @@ class ClusterCachesInfo {
          * DIRECT comparator for cache descriptors (first system caches).
          */
         static Comparator<DynamicCacheDescriptor> DIRECT = new 
Comparator<DynamicCacheDescriptor>() {
-            @Override
-            public int compare(DynamicCacheDescriptor o1, 
DynamicCacheDescriptor o2) {
+            @Override public int compare(DynamicCacheDescriptor o1, 
DynamicCacheDescriptor o2) {
                 if (!o1.cacheType().userCache())
                     return -1;
                 if (!o2.cacheType().userCache())

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index 6442752..4046c98 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -65,7 +65,7 @@ public class ExchangeContext {
         }
         else {
             boolean startCaches = fut.exchangeId().isJoined() &&
-                
fut.sharedContext().cache().receivedCachesFromNodeJoin(fut.exchangeId().eventNode());
+                
fut.sharedContext().cache().hasCachesReceivedFromJoin(fut.exchangeId().eventNode());
 
             fetchAffOnJoin = protocolVer == 1;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 75ddacd..efb04a7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -205,6 +205,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     /** */
     private final GridFutureAdapter<?> crdInitFut = new GridFutureAdapter();
 
+    /** For tests only. */
+    private volatile AffinityTopologyVersion exchMergeTestWaitVer;
+
     /** Discovery listener. */
     private final DiscoveryEventListener discoLsnr = new 
DiscoveryEventListener() {
         @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) {
@@ -371,7 +374,10 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         }
     }
 
-    public void coordinatorInitialized() {
+    /**
+     *
+     */
+    public void onCoordinatorInitialized() {
         crdInitFut.onDone();
     }
 
@@ -1264,7 +1270,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
-     * @param topVer Topology version.
+     * @param topVer Exchange result topology version.
+     * @param initTopVer Exchange initial version.
      * @param err Error.
      */
     public void onExchangeDone(AffinityTopologyVersion topVer, 
AffinityTopologyVersion initTopVer, @Nullable Throwable err) {
@@ -1757,9 +1764,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             ((IgniteDiagnosticAware)fut).addDiagnosticRequest(ctx);
     }
 
-    /** */
-    private volatile AffinityTopologyVersion exchMergeTestWaitVer;
-
     /**
      * For testing only.
      *
@@ -1810,7 +1814,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     final GridDhtPartitionsSingleMessage pendingMsg = 
fut.mergeJoinExchangeOnDone(curFut);
 
                     if (pendingMsg != null)
-                        curFut.waitAndReplyToNode(evt.eventNode(), pendingMsg);
+                        curFut.waitAndReplyToNode(evt.eventNode().id(), 
pendingMsg);
                 }
 
                 exchWorker.futQ.remove(fut);
@@ -1903,7 +1907,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                         break;
                     }
-                    if (evt.type() == EVT_NODE_JOINED && 
cctx.cache().receivedCachesFromNodeJoin(node)) {
+                    if (evt.type() == EVT_NODE_JOINED && 
cctx.cache().hasCachesReceivedFromJoin(node)) {
                         log.info("Stop merge, received caches from node: " + 
node);
 
                         break;
@@ -2037,7 +2041,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
          */
         private void onExchangeDone(AffinityTopologyVersion resVer, 
GridDhtPartitionsExchangeFuture exchFut)
             throws IgniteInterruptedCheckedException {
-            if (resVer.compareTo(exchFut.exchangeId().topologyVersion()) != 0) 
{
+            if (resVer.compareTo(exchFut.initialVersion()) != 0) {
                 waitForExchangeFuture(resVer);
 
                 for (CachePartitionExchangeWorkerTask task : futQ) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index f616e5b..e7bfb9a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1760,8 +1760,12 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         }
     }
 
-    public boolean receivedCachesFromNodeJoin(ClusterNode node) {
-        return !cachesInfo.cachesReceivedFromJoin(node.id()).isEmpty();
+    /**
+     * @param node Joined node.
+     * @return {@code True} if there are new caches received from joined node.
+     */
+    boolean hasCachesReceivedFromJoin(ClusterNode node) {
+        return cachesInfo.hasCachesReceivedFromJoin(node.id());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 1887473..3365e52 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -69,9 +69,7 @@ public interface GridDhtPartitionTopology {
     ) throws IgniteInterruptedCheckedException;
 
     /**
-     * Topology version.
-     *
-     * @return Topology version.
+     * @return Result topology version of last finished exchange.
      */
     public AffinityTopologyVersion readyTopologyVersion();
 
@@ -103,6 +101,7 @@ public interface GridDhtPartitionTopology {
      *
      * @param exchFut Exchange future.
      * @param affReady Affinity ready flag.
+     * @param updateMoving
      * @throws IgniteCheckedException If failed.
      */
     public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut,
@@ -111,6 +110,7 @@ public interface GridDhtPartitionTopology {
         throws IgniteCheckedException;
 
     /**
+     * @param affVer Affinity version.
      * @param exchFut Exchange future.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
@@ -272,6 +272,7 @@ public interface GridDhtPartitionTopology {
     /**
      * @param exchId Exchange ID.
      * @param parts Partitions.
+     * @param force {@code True} to skip stale update check.
      * @return {@code True} if local state was changed.
      */
     public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
@@ -297,6 +298,8 @@ public interface GridDhtPartitionTopology {
 
     /**
      * Resets the state of all LOST partitions to OWNING.
+     *
+     * @param resTopVer Exchange result version.
      */
     public void resetLostPartitions(AffinityTopologyVersion resTopVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 5e58502..1770497 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -110,12 +110,6 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     private volatile AffinityTopologyVersion diffFromAffinityVer = 
AffinityTopologyVersion.NONE;
 
     /** */
-    //private AffinityTopologyVersion lastExchangeVer;
-
-    /** */
-    // private volatile AffinityTopologyVersion readyTopVer = 
AffinityTopologyVersion.NONE;
-
-    /** */
     private volatile AffinityTopologyVersion readyTopVer = 
AffinityTopologyVersion.NONE;
 
     /** */
@@ -401,6 +395,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /**
+     * @param affVer Affinity version.
      * @param aff Affinity assignments.
      * @param updateSeq Update sequence.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
index f6d870f..179c76f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java
@@ -39,7 +39,7 @@ import 
org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
 /**
- *
+ * Information about affinity assignment.
  */
 public class CacheGroupAffinityMessage implements Message {
     /** */
@@ -62,6 +62,7 @@ public class CacheGroupAffinityMessage implements Message {
 
     /**
      * @param assign0 Assignment.
+     * @param assignDiff0 Difference with ideal affinity assignment.
      */
     private CacheGroupAffinityMessage(List<List<ClusterNode>> assign0, 
Map<Integer, List<Long>> assignDiff0) {
         if (assign0 != null) {
@@ -95,7 +96,12 @@ public class CacheGroupAffinityMessage implements Message {
         }
     }
 
-    public static Map<Integer, CacheGroupAffinityMessage> 
createAffinityDiffMessages(Map<Integer, Map<Integer, List<Long>>> affDiff) {
+    /**
+     * @param affDiff
+     * @return
+     */
+    public static Map<Integer, CacheGroupAffinityMessage> 
createAffinityDiffMessages(
+        Map<Integer, Map<Integer, List<Long>>> affDiff) {
         if (F.isEmpty(affDiff))
             return null;
 
@@ -181,6 +187,9 @@ public class CacheGroupAffinityMessage implements Message {
         return assignments0;
     }
 
+    /**
+     * @return Difference with ideal affinity assignment.
+     */
     public Map<Integer, GridLongList> assignmentsDiff() {
         return assignsDiff;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index b43fe92..528a85b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -266,7 +266,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     @GridToStringExclude
     private GridDhtPartitionsExchangeFuture mergedWith;
 
-
     /**
      * @param cctx Cache context.
      * @param busyLock Busy lock.
@@ -296,7 +295,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         exchLog = cctx.logger(EXCHANGE_LOG);
 
         initFut = new GridFutureAdapter<Boolean>() {
-            @Nullable @Override public IgniteLogger logger() {
+            @Override public IgniteLogger logger() {
                 return log;
             }
         };
@@ -512,7 +511,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             if (fut != null)
                 fut.get();
 
-            cctx.exchange().coordinatorInitialized();
+            cctx.exchange().onCoordinatorInitialized();
         }
     }
 
@@ -1028,7 +1027,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         try {
             long start = U.currentTimeMillis();
 
-            IgniteInternalFuture fut = 
cctx.snapshot().tryStartLocalSnapshotOperation(events().lastEvent());
+            IgniteInternalFuture fut = 
cctx.snapshot().tryStartLocalSnapshotOperation(firstDiscoEvt);
 
             if (fut != null) {
                 fut.get();
@@ -1272,7 +1271,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @param msg Message to send.
      * @param nodes Nodes.
+     * @param mergedJoinExchMsgs Messages received from merged 'join node' 
exchanges.
      * @param joinedNodeAff Affinity if was requested by some nodes.
      */
     private void sendAllPartitions(
@@ -1555,7 +1556,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
         if (CU.clientNode(node)) {
             if (msg != null)
-                waitAndReplyToClient(nodeId, msg);
+                waitAndReplyToNode(nodeId, msg);
         }
         else {
             if (mergedJoinExchMsgs == null)
@@ -1640,7 +1641,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      */
     private void processMergedMessage(final ClusterNode node, final 
GridDhtPartitionsSingleMessage msg) {
         if (msg.client()) {
-            waitAndReplyToClient(node.id(), msg);
+            waitAndReplyToNode(node.id(), msg);
 
             return;
         }
@@ -1760,29 +1761,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         });
     }
 
-    public void waitAndReplyToNode(final ClusterNode node, final 
GridDhtPartitionsSingleMessage msg) {
-        listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-            @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                FinishState finishState0;
-
-                synchronized (mux) {
-                    finishState0 = finishState;
-                }
-
-                assert finishState0 != null;
-
-                sendAllPartitionsToNode(finishState0, msg, node.id());
-            }
-        });
-    }
-
     /**
      * @param nodeId Node ID.
      * @param msg Client's message.
      */
-    private void waitAndReplyToClient(final UUID nodeId, final 
GridDhtPartitionsSingleMessage msg) {
-        assert msg.client();
-
+    public void waitAndReplyToNode(final UUID nodeId, final 
GridDhtPartitionsSingleMessage msg) {
         listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
             @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                 if (cctx.kernalContext().isStopping())
@@ -1814,9 +1797,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * @param nodeId Sender node.
      * @param msg Partition single message.
      */
-    void processSingleMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) 
{
+    private void processSingleMessage(UUID nodeId, 
GridDhtPartitionsSingleMessage msg) {
         if (msg.client()) {
-            waitAndReplyToClient(nodeId, msg);
+            waitAndReplyToNode(nodeId, msg);
 
             return;
         }
@@ -3078,7 +3061,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     private void onBecomeCoordinator(InitNewCoordinatorFuture newCrdFut) {
         boolean allRcvd = false;
 
-        cctx.exchange().coordinatorInitialized();
+        cctx.exchange().onCoordinatorInitialized();
 
         if (newCrdFut.restoreState()) {
             GridDhtPartitionsFullMessage fullMsg = newCrdFut.fullMessage();

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index b060704..a164e85 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -164,6 +164,17 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     }
 
     /**
+     * @return Message copy.
+     */
+    GridDhtPartitionsFullMessage copy() {
+        GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage();
+
+        copyStateTo(cp);
+
+        return cp;
+    }
+
+    /**
      * @param resTopVer Result topology version.
      */
     public void resultTopologyVersion(AffinityTopologyVersion resTopVer) {
@@ -178,17 +189,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     }
 
     /**
-     * @return Message copy.
-     */
-    GridDhtPartitionsFullMessage copy() {
-        GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage();
-
-        copyStateTo(cp);
-
-        return cp;
-    }
-
-    /**
      * @return Caches affinity for joining nodes.
      */
     @Nullable public Map<Integer, CacheGroupAffinityMessage> 
joinedNodeAffinity() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index ed50634..bc7d314 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -94,7 +94,10 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
     @GridDirectCollection(Integer.class)
     private Collection<Integer> grpsAffRequest;
 
-    /** */
+    /**
+     * Exchange finish message, sent to new coordinator when it tries to
+     * restore state after previous coordinator failed during exchange.
+     */
     private GridDhtPartitionsFullMessage finishMsg;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
index 82feb12..6317fbc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
@@ -60,19 +60,18 @@ public class GridDhtPartitionsSingleRequest extends 
GridDhtPartitionsAbstractMes
 
         msg.restoreState(true);
 
-        msg.restoreExchangeId(restoreExchId);
+        msg.restoreExchId = restoreExchId;
 
         return msg;
     }
 
+    /**
+     * @return ID of current exchange on new coordinator.
+     */
     GridDhtPartitionExchangeId restoreExchangeId() {
         return restoreExchId;
     }
 
-    void restoreExchangeId(GridDhtPartitionExchangeId restoreExchId) {
-        this.restoreExchId = restoreExchId;
-    }
-
     /** {@inheritDoc} */
     @Override public int handlerId() {
         return 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
index 8266052..dc2fbf8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
@@ -34,6 +34,9 @@ public class IgniteDhtPartitionCountersMap implements 
Serializable {
     /** */
     private Map<Integer, Map<Integer, T2<Long, Long>>> map;
 
+    /**
+     * @return {@code True} if map is empty.
+     */
     public synchronized boolean empty() {
         return map == null || map.isEmpty();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 9744c48..7deceb5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -100,6 +100,7 @@ public class GridNearTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash.
      * @param firstClientReq {@code True} if first optimistic tx prepare 
request sent from client node.
+     * @param {@code True} if it is safe for first client request to wait for 
topology future.
      * @param addDepInfo Deployment info flag.
      */
     public GridNearTxPrepareRequest(
@@ -147,6 +148,10 @@ public class GridNearTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
         setFlag(allowWaitTopFut, ALLOW_WAIT_TOP_FUT_FLAG_MASK);
     }
 
+    /**
+     * @return {@code True} if it is safe for first client request to wait for 
topology future
+     *      completion.
+     */
     public boolean allowWaitTopologyFuture() {
         return isFlag(ALLOW_WAIT_TOP_FUT_FLAG_MASK);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
index 7c893b5..fba0a4c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -210,7 +210,7 @@ public class PlatformDataStreamer extends 
PlatformAbstractTarget {
                 GridDiscoveryManager discoMgr = 
platformCtx.kernalContext().discovery();
 
                 AffinityTopologyVersion topVer =
-                        
platformCtx.kernalContext().cache().context().exchange().readyAffinityVersion();
+                    
platformCtx.kernalContext().cache().context().exchange().lastTopologyFuture().get();
 
                 int topSize = discoMgr.cacheNodes(cacheName, topVer).size();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a95a14/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java
index 1a632b0..6226bd2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java
@@ -373,14 +373,6 @@ public class GridListSet<V> extends GridSerializableSet<V> 
implements Cloneable
         return vals.iterator();
     }
 
-    /**
-     * @param idx Start index.
-     * @return List iterator.
-     */
-    public ListIterator<V> listIterator(int idx) {
-        return vals.listIterator(idx);
-    }
-
     /** {@inheritDoc} */
     @Override public int size() {
         return vals.size();

Reply via email to