Repository: ignite
Updated Branches:
  refs/heads/ignite-5872-5578 [created] 30339d35c


Merge remote-tracking branch 'apache/ignite-5578' into ignite-5872-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30339d35
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30339d35
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30339d35

Branch: refs/heads/ignite-5872-5578
Commit: 30339d35c9bba365db0369ca8d892efb3a2b3a96
Parents: 855ece3 bc9a416
Author: Alexey Goncharuk <alexey.goncha...@gmail.com>
Authored: Thu Aug 10 17:09:53 2017 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Thu Aug 10 17:09:53 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |    3 +
 .../java/org/apache/ignite/TestDebugLog.java    |  219 ++
 .../internal/IgniteDiagnosticMessage.java       |    2 +-
 .../communication/GridIoMessageFactory.java     |    9 +-
 .../internal/managers/discovery/DiscoCache.java |   79 +-
 .../discovery/GridDiscoveryManager.java         |   28 +-
 .../affinity/AffinityTopologyVersion.java       |    7 +
 .../affinity/GridAffinityAssignmentCache.java   |   42 +
 .../affinity/GridAffinityProcessor.java         |    8 +-
 .../cache/CacheAffinitySharedManager.java       |  688 ++++--
 .../processors/cache/CacheGroupContext.java     |   18 +-
 .../cache/CachePartitionExchangeWorkerTask.java |    5 +-
 .../ClientCacheChangeDummyDiscoveryMessage.java |    5 +
 .../cache/ClientCacheUpdateTimeout.java         |    5 +
 .../processors/cache/ClusterCachesInfo.java     |   22 +-
 .../processors/cache/ExchangeContext.java       |  131 ++
 .../cache/ExchangeDiscoveryEvents.java          |  262 +++
 .../processors/cache/GridCacheAdapter.java      |    8 +-
 .../processors/cache/GridCacheContext.java      |    2 +-
 .../processors/cache/GridCacheIoManager.java    |   57 +-
 .../processors/cache/GridCacheMapEntry.java     |    6 +-
 .../GridCachePartitionExchangeManager.java      |  413 +++-
 .../processors/cache/GridCacheProcessor.java    |   14 +-
 .../dht/ClientCacheDhtTopologyFuture.java       |   12 +-
 .../dht/GridClientPartitionTopology.java        |  132 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |   18 +-
 .../distributed/dht/GridDhtLocalPartition.java  |    4 +-
 .../dht/GridDhtPartitionTopology.java           |   37 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  492 +++--
 .../dht/GridDhtPartitionsReservation.java       |    2 +-
 .../distributed/dht/GridDhtTopologyFuture.java  |   36 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   95 +-
 .../dht/GridPartitionedGetFuture.java           |    4 +-
 .../dht/GridPartitionedSingleGetFuture.java     |    4 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      |    2 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   31 +-
 .../GridNearAtomicSingleUpdateFuture.java       |    1 -
 .../colocated/GridDhtColocatedLockFuture.java   |    2 +-
 .../preloader/CacheGroupAffinityMessage.java    |  274 +++
 .../preloader/ForceRebalanceExchangeTask.java   |    5 +
 .../preloader/GridDhtPartitionExchangeId.java   |   11 +
 .../dht/preloader/GridDhtPartitionMap.java      |    2 +-
 .../dht/preloader/GridDhtPartitionSupplier.java |    2 +-
 .../GridDhtPartitionsAbstractMessage.java       |   37 +-
 .../GridDhtPartitionsExchangeFuture.java        | 1974 +++++++++++++-----
 .../preloader/GridDhtPartitionsFullMessage.java |  161 +-
 .../GridDhtPartitionsSingleMessage.java         |   78 +-
 .../GridDhtPartitionsSingleRequest.java         |   47 +-
 .../dht/preloader/GridDhtPreloader.java         |   34 +-
 .../IgniteDhtPartitionCountersMap.java          |    7 +
 .../dht/preloader/InitNewCoordinatorFuture.java |  307 +++
 .../RebalanceReassignExchangeTask.java          |    5 +
 .../distributed/near/GridNearCacheAdapter.java  |    2 +-
 .../distributed/near/GridNearGetFuture.java     |    4 +-
 .../distributed/near/GridNearLockFuture.java    |    2 +-
 ...arOptimisticSerializableTxPrepareFuture.java |    1 +
 .../near/GridNearOptimisticTxPrepareFuture.java |    1 +
 .../GridNearPessimisticTxPrepareFuture.java     |    1 +
 .../near/GridNearTxPrepareRequest.java          |   14 +
 .../GridCacheDatabaseSharedManager.java         |    7 +-
 .../cache/query/GridCacheQueryAdapter.java      |    4 +-
 .../cache/transactions/IgniteTxAdapter.java     |    2 +-
 .../cache/transactions/IgniteTxHandler.java     |  184 +-
 .../closure/GridClosureProcessor.java           |   36 +-
 .../cluster/GridClusterStateProcessor.java      |    2 +-
 .../datastreamer/DataStreamProcessor.java       |   57 +-
 .../datastreamer/DataStreamerImpl.java          |   65 +-
 .../datastreamer/PlatformDataStreamer.java      |    3 +-
 .../query/schema/SchemaExchangeWorkerTask.java  |    5 +
 .../SchemaNodeLeaveExchangeWorkerTask.java      |    5 +
 .../processors/task/GridTaskWorker.java         |    8 +-
 .../org/apache/ignite/thread/IgniteThread.java  |    9 +
 .../internal/TestDelayingCommunicationSpi.java  |   63 +
 ...CacheExchangeMessageDuplicatedStateTest.java |    9 +-
 .../IgniteClientCacheStartFailoverTest.java     |    4 +-
 .../IgniteClusterActivateDeactivateTest.java    |    4 +-
 .../cache/IgniteDynamicCacheStartSelfTest.java  |   26 +-
 ...niteTopologyValidatorGridSplitCacheTest.java |    6 +-
 ...AffinityCoordinatorDynamicStartStopTest.java |    2 +-
 ...eAbstractDataStructuresFailoverSelfTest.java |    7 +-
 .../distributed/CacheExchangeMergeTest.java     | 1530 ++++++++++++++
 .../CacheLateAffinityAssignmentTest.java        |  598 ++++--
 ...CacheLoadingConcurrentGridStartSelfTest.java |   11 +
 .../CacheLockReleaseNodeLeaveTest.java          |   13 +-
 .../distributed/CachePartitionStateTest.java    |   18 +-
 ...ncurrentGridStartSelfTestAllowOverwrite.java |   33 +
 ...niteCacheClientNodeChangingTopologyTest.java |    5 +-
 ...teCacheClientNodePartitionsExchangeTest.java |   52 +-
 ...ePrimaryNodeFailureRecoveryAbstractTest.java |    4 +-
 ...eAtomicInvalidPartitionHandlingSelfTest.java |   36 +-
 .../IgniteChangeGlobalStateTest.java            |   11 +-
 .../join/JoinInActiveNodeToActiveCluster.java   |    4 +-
 .../junits/common/GridCommonAbstractTest.java   |   22 +-
 .../testsuites/IgniteCacheTestSuite2.java       |    7 +-
 .../testsuites/IgniteCacheTestSuite6.java       |    3 +
 .../cache/WaitMapExchangeFinishCallable.java    |    4 +-
 96 files changed, 7264 insertions(+), 1469 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 9762586,3d25084..a597227
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@@ -439,9 -457,9 +457,9 @@@ public class CacheAffinitySharedManager
                              GridClientPartitionTopology clientTop = 
cctx.exchange().clearClientTopology(grp.groupId());
  
                              if (clientTop != null) {
-                                 grp.topology().update(topVer,
+                                 
grp.topology().update(grpHolder.affinity().lastVersion(),
                                      clientTop.partitionMap(true),
 -                                    clientTop.updateCounters(false),
 +                                    clientTop.fullUpdateCounters(),
                                      Collections.<Integer>emptySet(),
                                      null);
                              }

http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 27fda14,48909d4..70f1dfa
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@@ -668,21 -723,23 +723,21 @@@ public class GridCachePartitionExchange
          if (top != null)
              return top;
  
-         CacheGroupDescriptor grpDesc = 
cctx.cache().cacheGroupDescriptors().get(grpId);
 -        Object affKey = null;
 -
+         CacheGroupDescriptor grpDesc = 
cctx.affinity().cacheGroups().get(grpId);
  
-         assert grpDesc != null: "Failed for exchange: " + exchFut;
 -        if (grpDesc != null) {
 -            CacheConfiguration<?, ?> ccfg = grpDesc.config();
++        assert grpDesc != null;
  
 -            AffinityFunction aff = ccfg.getAffinity();
 +        CacheConfiguration<?, ?> ccfg = grpDesc.config();
  
 -            affKey = cctx.kernalContext().affinity().similaryAffinityKey(aff,
 -                ccfg.getNodeFilter(),
 -                ccfg.getBackups(),
 -                aff.partitions());
 -        }
 +        AffinityFunction aff = ccfg.getAffinity();
 +
 +        Object affKey = 
cctx.kernalContext().affinity().similaryAffinityKey(aff,
 +            ccfg.getNodeFilter(),
 +            ccfg.getBackups(),
 +            aff.partitions());
  
          GridClientPartitionTopology old = clientTops.putIfAbsent(grpId,
-             top = new GridClientPartitionTopology(cctx, grpId, exchFut, 
aff.partitions(), affKey));
 -            top = new GridClientPartitionTopology(cctx, grpId, affKey));
++            top = new GridClientPartitionTopology(cctx, grpId, 
aff.partitions(), affKey));
  
          return old != null ? old : top;
      }
@@@ -1074,11 -1118,9 +1116,10 @@@
       * @param id Exchange ID.
       */
      private void sendLocalPartitions(ClusterNode node, @Nullable 
GridDhtPartitionExchangeId id) {
-         GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node,
-             id,
+         GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id,
              cctx.kernalContext().clientNode(),
 -            false);
 +            false,
 +            null);
  
          if (log.isDebugEnabled())
              log.debug("Sending local partitions [nodeId=" + node.id() + ", 
msg=" + m + ']');
@@@ -1103,13 -1144,10 +1143,12 @@@
       * @param sndCounters {@code True} if need send partition update counters.
       * @return Message.
       */
 -    public GridDhtPartitionsSingleMessage 
createPartitionsSingleMessage(@Nullable GridDhtPartitionExchangeId exchangeId,
 +    public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(
-         ClusterNode targetNode,
 +        @Nullable GridDhtPartitionExchangeId exchangeId,
          boolean clientOnlyExchange,
 -        boolean sndCounters)
 -    {
 +        boolean sndCounters,
 +        ExchangeActions exchActions
 +    ) {
          GridDhtPartitionsSingleMessage m = new 
GridDhtPartitionsSingleMessage(exchangeId,
              clientOnlyExchange,
              cctx.versions().last(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index f844e19,745e7d7..77792c7
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@@ -36,9 -36,9 +36,11 @@@ import org.apache.ignite.internal.Ignit
  import org.apache.ignite.internal.managers.discovery.DiscoCache;
  import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+ import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+ import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
  import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 +import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
 +import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
  import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
  import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
  import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
@@@ -116,15 -117,11 +118,13 @@@ public class GridClientPartitionTopolog
      /**
       * @param cctx Context.
       * @param grpId Group ID.
-      * @param exchFut Exchange ID.
 +     * @param parts Number of partitions in the group.
       * @param similarAffKey Key to find caches with similar affinity.
       */
      public GridClientPartitionTopology(
-         GridCacheSharedContext cctx,
+         GridCacheSharedContext<?, ?> cctx,
          int grpId,
-         GridDhtPartitionsExchangeFuture exchFut,
 +        int parts,
          Object similarAffKey
      ) {
          this.cctx = cctx;
@@@ -137,16 -132,9 +135,11 @@@
  
          log = cctx.logger(getClass());
  
-         lock.writeLock().lock();
+         node2part = new GridDhtPartitionFullMap(cctx.localNode().id(),
+             cctx.localNode().order(),
+             updateSeq.get());
 +
-         try {
-             cntrMap = new CachePartitionFullCountersMap(parts);
- 
-             beforeExchange0(cctx.localNode(), exchFut);
-         }
-         finally {
-             lock.writeLock().unlock();
-         }
++        cntrMap = new CachePartitionFullCountersMap(parts);
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 4608977,0dea5e4..22205ea
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@@ -255,9 -263,9 +263,9 @@@ public interface GridDhtPartitionTopolo
       * @return {@code True} if local state was changed.
       */
      public boolean update(
-         @Nullable AffinityTopologyVersion exchangeVer,
+         @Nullable AffinityTopologyVersion exchangeResVer,
          GridDhtPartitionFullMap partMap,
 -        @Nullable Map<Integer, T2<Long, Long>> cntrMap,
 +        @Nullable CachePartitionFullCountersMap cntrMap,
          Set<Integer> partsToReload,
          @Nullable AffinityTopologyVersion msgTopVer);
  
@@@ -267,19 -276,13 +276,20 @@@
       * @return {@code True} if local state was changed.
       */
      public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
-         GridDhtPartitionMap parts);
+         GridDhtPartitionMap parts,
+         boolean force);
  
      /**
 +     * Collects update counters collected during exchange. Called on 
coordinator.
 +     *
       * @param cntrMap Counters map.
       */
 -    public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap);
 +    public void collectUpdateCounters(CachePartitionPartialCountersMap 
cntrMap);
 +
 +    /**
 +     * Applies update counters collected during exchange on coordinator. 
Called on coordinator.
 +     */
 +    public void applyUpdateCounters();
  
      /**
       * Checks if there is at least one owner for each partition in the cache 
topology.

http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 84ae9af,72ab8c8..23e28b0
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -41,9 -41,8 +41,10 @@@ import org.apache.ignite.internal.manag
  import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
  import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+ import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
  import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 +import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
 +import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
  import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
  import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
  import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
@@@ -1252,8 -1312,15 +1311,8 @@@ public class GridDhtPartitionTopologyIm
                      if (state == OWNING) {
                          GridDhtLocalPartition locPart = locParts.get(p);
  
-                         assert locPart != null;
+                         assert locPart != null : grp.cacheOrGroupName();
  
 -                        if (incomeCntrMap != null) {
 -                            T2<Long, Long> cntr = incomeCntrMap.get(p);
 -
 -                            if (cntr != null && cntr.get2() > 
locPart.updateCounter())
 -                                locPart.updateCounter(cntr.get2());
 -                        }
 -
                          if (locPart.state() == MOVING) {
                              boolean success = locPart.own();
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index cae3ce2,95c1a4f..84cc792
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@@ -90,6 -104,19 +102,13 @@@ public abstract class GridDhtPartitions
      }
  
      /**
+      * @param exchId Exchange ID.
+      */
+     public void exchangeId(GridDhtPartitionExchangeId exchId) {
+         this.exchId = exchId;
+     }
+ 
+     /**
 -     * @param grpId Cache group ID.
 -     * @return Parition update counters.
 -     */
 -    public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int 
grpId);
 -
 -    /**
       * @return Last used version among all nodes.
       */
      @Nullable public GridCacheVersion lastVersion() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 8530a23,07f36af..861ab38
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -622,9 -762,9 +762,9 @@@ public class GridDhtPartitionsExchangeF
                  boolean updateTop = 
exchId.topologyVersion().equals(grp.localStartVersion());
  
                  if (updateTop && clientTop != null) {
-                     top.update(topologyVersion(),
+                     top.update(null,
                          clientTop.partitionMap(true),
 -                        clientTop.updateCounters(false),
 +                        clientTop.fullUpdateCounters(),
                          Collections.<Integer>emptySet(),
                          null);
                  }
@@@ -1129,17 -1228,15 +1228,16 @@@
                  true);
          }
          else {
-             msg = cctx.exchange().createPartitionsSingleMessage(node,
-                 exchangeId(),
+             msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(),
                  false,
 -                true);
 +                true,
 +                exchActions);
-         }
  
-         Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
+             Map<Integer, Map<Integer, Long>> partHistReserved0 = 
partHistReserved;
  
-         if (partHistReserved0 != null)
-             msg.partitionHistoryCounters(partHistReserved0);
+             if (partHistReserved0 != null)
+                 msg.partitionHistoryCounters(partHistReserved0);
+         }
  
          if (stateChangeExchange() && changeGlobalStateE != null)
              msg.setError(changeGlobalStateE);
@@@ -1720,34 -2155,102 +2158,107 @@@
                  }
              }
  
-             for (GridDhtPartitionsAbstractMessage msg : msgs.values()) {
-                 if (msg instanceof GridDhtPartitionsSingleMessage) {
-                     GridDhtPartitionsSingleMessage msg0 = 
(GridDhtPartitionsSingleMessage)msg;
+             if (exchCtx.mergeExchanges()) {
+                 log.info("Coordinator received all messages, try merge [ver=" 
+ initialVersion() + ']');
+ 
+                 boolean finish = 
cctx.exchange().mergeExchangesOnCoordinator(this);
+ 
+                 if (!finish)
+                     return;
+             }
+ 
+             finishExchangeOnCoordinator(sndResNodes);
+         }
+         catch (IgniteCheckedException e) {
+             if (reconnectOnError(e))
+                 onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+             else
+                 onDone(e);
+         }
+     }
+ 
+     /**
+      * @param sndResNodes Additional nodes to send finish message to.
+      */
+     private void finishExchangeOnCoordinator(@Nullable 
Collection<ClusterNode> sndResNodes) {
+         try {
+             AffinityTopologyVersion resTopVer = 
exchCtx.events().topologyVersion();
  
-                     for (Map.Entry<Integer, GridDhtPartitionMap> entry : 
msg0.partitions().entrySet()) {
-                         Integer grpId = entry.getKey();
-                         CacheGroupContext grp = 
cctx.cache().cacheGroup(grpId);
+             log.info("finishExchangeOnCoordinator [topVer=" + 
initialVersion() +
+                 ", resVer=" + resTopVer + ']');
  
-                         GridDhtPartitionTopology top = grp != null ? 
grp.topology() :
-                             cctx.exchange().clientTopology(grpId, this);
+             Map<Integer, CacheGroupAffinityMessage> idealAffDiff = null;
  
-                         CachePartitionPartialCountersMap cntrs = 
msg0.partitionUpdateCounters(grpId);
+             if (exchCtx.mergeExchanges()) {
+                 synchronized (mux) {
+                     if (mergedJoinExchMsgs != null) {
+                         for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> 
e : mergedJoinExchMsgs.entrySet()) {
+                             msgs.put(e.getKey(), e.getValue());
  
-                         if (cntrs != null)
-                             top.collectUpdateCounters(cntrs);
+                             updatePartitionSingleMap(e.getKey(), 
e.getValue());
+                         }
                      }
                  }
+ 
+                 assert exchCtx.events().hasServerJoin() || 
exchCtx.events().hasServerLeft();
+ 
+                 exchCtx.events().processEvents(this);
+ 
+                 if (exchCtx.events().hasServerLeft())
+                     idealAffDiff = 
cctx.affinity().onServerLeftWithExchangeMergeProtocol(this);
+                 else
+                     
cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, true);
+ 
+                 for (CacheGroupDescriptor desc : 
cctx.affinity().cacheGroups().values()) {
+                     if (desc.config().getCacheMode() == CacheMode.LOCAL)
+                         continue;
+ 
+                     CacheGroupContext grp = 
cctx.cache().cacheGroup(desc.groupId());
+ 
+                     GridDhtPartitionTopology top = grp != null ? 
grp.topology() :
+                         cctx.exchange().clientTopology(desc.groupId());
+ 
+                     top.beforeExchange(this, true, true);
+                 }
+             }
+ 
+             Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null;
+ 
+             for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : 
msgs.entrySet()) {
+                 GridDhtPartitionsSingleMessage msg = e.getValue();
+ 
+                 // Apply update counters after all single messages are 
received.
+                 for (Map.Entry<Integer, GridDhtPartitionMap> entry : 
msg.partitions().entrySet()) {
+                     Integer grpId = entry.getKey();
+ 
+                     CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+ 
+                     GridDhtPartitionTopology top = grp != null ? 
grp.topology() :
+                         cctx.exchange().clientTopology(grpId);
+ 
 -                    Map<Integer, T2<Long, Long>> cntrs = 
msg.partitionUpdateCounters(grpId);
++                        CachePartitionPartialCountersMap cntrs = 
msg.partitionUpdateCounters(grpId);
+ 
+                     if (cntrs != null)
 -                        top.applyUpdateCounters(cntrs);
++                        top.collectUpdateCounters(cntrs);
+                 }
+ 
+                 Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
+ 
+                 if (affReq != null) {
+                     joinedNodeAff = 
CacheGroupAffinityMessage.createAffinityMessages(cctx,
+                         resTopVer,
+                         affReq,
+                         joinedNodeAff);
+                 }
              }
  
 +            for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
 +                if (!grpCtx.isLocal())
 +                    grpCtx.topology().applyUpdateCounters();
 +            }
 +
-             if (discoEvt.type() == EVT_NODE_JOINED)
-                 assignPartitionsStates();
-             else if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
-                 assert discoEvt instanceof DiscoveryCustomEvent;
+             if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+                 assert firstDiscoEvt instanceof DiscoveryCustomEvent;
  
                  if (activateCluster())
                      assignPartitionsStates();

http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 5f415e2,a164e85..2bb19cd
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@@ -32,6 -32,8 +32,7 @@@ import org.apache.ignite.internal.proce
  import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
  import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
  import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+ import org.apache.ignite.internal.util.typedef.F;
 -import org.apache.ignite.internal.util.typedef.T2;
  import org.apache.ignite.internal.util.typedef.internal.S;
  import org.apache.ignite.internal.util.typedef.internal.U;
  import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;

http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 3c11fa7,bc7d314..44815ca
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@@ -17,12 -17,14 +17,14 @@@
  
  package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
  
+ import java.util.Collection;
 -import java.util.Map;
 -import java.util.HashMap;
 +import java.io.Externalizable;
  import java.nio.ByteBuffer;
  import java.util.Collections;
 -import java.io.Externalizable;
 +import java.util.HashMap;
 +import java.util.Map;
  import org.apache.ignite.IgniteCheckedException;
+ import org.apache.ignite.internal.GridDirectCollection;
  import org.apache.ignite.internal.GridDirectMap;
  import org.apache.ignite.internal.GridDirectTransient;
  import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;

http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
index 75cfb37,dc2fbf8..e7954d9
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
@@@ -30,9 -32,16 +30,16 @@@ public class IgniteDhtPartitionCounters
      private static final long serialVersionUID = 0L;
  
      /** */
 -    private Map<Integer, Map<Integer, T2<Long, Long>>> map;
 +    private Map<Integer, CachePartitionFullCountersMap> map;
  
      /**
+      * @return {@code True} if map is empty.
+      */
+     public synchronized boolean empty() {
+         return map == null || map.isEmpty();
+     }
+ 
+     /**
       * @param cacheId Cache ID.
       * @param cntrMap Counters map.
       */

http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
----------------------------------------------------------------------

Reply via email to