Repository: ignite Updated Branches: refs/heads/ignite-5872-5578 [created] 30339d35c
Merge remote-tracking branch 'apache/ignite-5578' into ignite-5872-5578 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30339d35 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30339d35 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30339d35 Branch: refs/heads/ignite-5872-5578 Commit: 30339d35c9bba365db0369ca8d892efb3a2b3a96 Parents: 855ece3 bc9a416 Author: Alexey Goncharuk <alexey.goncha...@gmail.com> Authored: Thu Aug 10 17:09:53 2017 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Thu Aug 10 17:09:53 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 3 + .../java/org/apache/ignite/TestDebugLog.java | 219 ++ .../internal/IgniteDiagnosticMessage.java | 2 +- .../communication/GridIoMessageFactory.java | 9 +- .../internal/managers/discovery/DiscoCache.java | 79 +- .../discovery/GridDiscoveryManager.java | 28 +- .../affinity/AffinityTopologyVersion.java | 7 + .../affinity/GridAffinityAssignmentCache.java | 42 + .../affinity/GridAffinityProcessor.java | 8 +- .../cache/CacheAffinitySharedManager.java | 688 ++++-- .../processors/cache/CacheGroupContext.java | 18 +- .../cache/CachePartitionExchangeWorkerTask.java | 5 +- .../ClientCacheChangeDummyDiscoveryMessage.java | 5 + .../cache/ClientCacheUpdateTimeout.java | 5 + .../processors/cache/ClusterCachesInfo.java | 22 +- .../processors/cache/ExchangeContext.java | 131 ++ .../cache/ExchangeDiscoveryEvents.java | 262 +++ .../processors/cache/GridCacheAdapter.java | 8 +- .../processors/cache/GridCacheContext.java | 2 +- .../processors/cache/GridCacheIoManager.java | 57 +- .../processors/cache/GridCacheMapEntry.java | 6 +- .../GridCachePartitionExchangeManager.java | 413 +++- .../processors/cache/GridCacheProcessor.java | 14 +- .../dht/ClientCacheDhtTopologyFuture.java | 12 +- .../dht/GridClientPartitionTopology.java | 132 +- .../distributed/dht/GridDhtCacheAdapter.java | 18 +- .../distributed/dht/GridDhtLocalPartition.java | 4 +- .../dht/GridDhtPartitionTopology.java | 37 +- .../dht/GridDhtPartitionTopologyImpl.java | 492 +++-- .../dht/GridDhtPartitionsReservation.java | 2 +- .../distributed/dht/GridDhtTopologyFuture.java | 36 +- .../dht/GridDhtTransactionalCacheAdapter.java | 95 +- .../dht/GridPartitionedGetFuture.java | 4 +- .../dht/GridPartitionedSingleGetFuture.java | 4 +- .../GridDhtAtomicAbstractUpdateFuture.java | 2 +- .../dht/atomic/GridDhtAtomicCache.java | 31 +- .../GridNearAtomicSingleUpdateFuture.java | 1 - .../colocated/GridDhtColocatedLockFuture.java | 2 +- .../preloader/CacheGroupAffinityMessage.java | 274 +++ .../preloader/ForceRebalanceExchangeTask.java | 5 + .../preloader/GridDhtPartitionExchangeId.java | 11 + .../dht/preloader/GridDhtPartitionMap.java | 2 +- .../dht/preloader/GridDhtPartitionSupplier.java | 2 +- .../GridDhtPartitionsAbstractMessage.java | 37 +- .../GridDhtPartitionsExchangeFuture.java | 1974 +++++++++++++----- .../preloader/GridDhtPartitionsFullMessage.java | 161 +- .../GridDhtPartitionsSingleMessage.java | 78 +- .../GridDhtPartitionsSingleRequest.java | 47 +- .../dht/preloader/GridDhtPreloader.java | 34 +- .../IgniteDhtPartitionCountersMap.java | 7 + .../dht/preloader/InitNewCoordinatorFuture.java | 307 +++ .../RebalanceReassignExchangeTask.java | 5 + .../distributed/near/GridNearCacheAdapter.java | 2 +- .../distributed/near/GridNearGetFuture.java | 4 +- .../distributed/near/GridNearLockFuture.java | 2 +- ...arOptimisticSerializableTxPrepareFuture.java | 1 + .../near/GridNearOptimisticTxPrepareFuture.java | 1 + .../GridNearPessimisticTxPrepareFuture.java | 1 + .../near/GridNearTxPrepareRequest.java | 14 + .../GridCacheDatabaseSharedManager.java | 7 +- .../cache/query/GridCacheQueryAdapter.java | 4 +- .../cache/transactions/IgniteTxAdapter.java | 2 +- .../cache/transactions/IgniteTxHandler.java | 184 +- .../closure/GridClosureProcessor.java | 36 +- .../cluster/GridClusterStateProcessor.java | 2 +- .../datastreamer/DataStreamProcessor.java | 57 +- .../datastreamer/DataStreamerImpl.java | 65 +- .../datastreamer/PlatformDataStreamer.java | 3 +- .../query/schema/SchemaExchangeWorkerTask.java | 5 + .../SchemaNodeLeaveExchangeWorkerTask.java | 5 + .../processors/task/GridTaskWorker.java | 8 +- .../org/apache/ignite/thread/IgniteThread.java | 9 + .../internal/TestDelayingCommunicationSpi.java | 63 + ...CacheExchangeMessageDuplicatedStateTest.java | 9 +- .../IgniteClientCacheStartFailoverTest.java | 4 +- .../IgniteClusterActivateDeactivateTest.java | 4 +- .../cache/IgniteDynamicCacheStartSelfTest.java | 26 +- ...niteTopologyValidatorGridSplitCacheTest.java | 6 +- ...AffinityCoordinatorDynamicStartStopTest.java | 2 +- ...eAbstractDataStructuresFailoverSelfTest.java | 7 +- .../distributed/CacheExchangeMergeTest.java | 1530 ++++++++++++++ .../CacheLateAffinityAssignmentTest.java | 598 ++++-- ...CacheLoadingConcurrentGridStartSelfTest.java | 11 + .../CacheLockReleaseNodeLeaveTest.java | 13 +- .../distributed/CachePartitionStateTest.java | 18 +- ...ncurrentGridStartSelfTestAllowOverwrite.java | 33 + ...niteCacheClientNodeChangingTopologyTest.java | 5 +- ...teCacheClientNodePartitionsExchangeTest.java | 52 +- ...ePrimaryNodeFailureRecoveryAbstractTest.java | 4 +- ...eAtomicInvalidPartitionHandlingSelfTest.java | 36 +- .../IgniteChangeGlobalStateTest.java | 11 +- .../join/JoinInActiveNodeToActiveCluster.java | 4 +- .../junits/common/GridCommonAbstractTest.java | 22 +- .../testsuites/IgniteCacheTestSuite2.java | 7 +- .../testsuites/IgniteCacheTestSuite6.java | 3 + .../cache/WaitMapExchangeFinishCallable.java | 4 +- 96 files changed, 7264 insertions(+), 1469 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 9762586,3d25084..a597227 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@@ -439,9 -457,9 +457,9 @@@ public class CacheAffinitySharedManager GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology(grp.groupId()); if (clientTop != null) { - grp.topology().update(topVer, + grp.topology().update(grpHolder.affinity().lastVersion(), clientTop.partitionMap(true), - clientTop.updateCounters(false), + clientTop.fullUpdateCounters(), Collections.<Integer>emptySet(), null); } http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 27fda14,48909d4..70f1dfa --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@@ -668,21 -723,23 +723,21 @@@ public class GridCachePartitionExchange if (top != null) return top; - CacheGroupDescriptor grpDesc = cctx.cache().cacheGroupDescriptors().get(grpId); - Object affKey = null; - + CacheGroupDescriptor grpDesc = cctx.affinity().cacheGroups().get(grpId); - assert grpDesc != null: "Failed for exchange: " + exchFut; - if (grpDesc != null) { - CacheConfiguration<?, ?> ccfg = grpDesc.config(); ++ assert grpDesc != null; - AffinityFunction aff = ccfg.getAffinity(); + CacheConfiguration<?, ?> ccfg = grpDesc.config(); - affKey = cctx.kernalContext().affinity().similaryAffinityKey(aff, - ccfg.getNodeFilter(), - ccfg.getBackups(), - aff.partitions()); - } + AffinityFunction aff = ccfg.getAffinity(); + + Object affKey = cctx.kernalContext().affinity().similaryAffinityKey(aff, + ccfg.getNodeFilter(), + ccfg.getBackups(), + aff.partitions()); GridClientPartitionTopology old = clientTops.putIfAbsent(grpId, - top = new GridClientPartitionTopology(cctx, grpId, exchFut, aff.partitions(), affKey)); - top = new GridClientPartitionTopology(cctx, grpId, affKey)); ++ top = new GridClientPartitionTopology(cctx, grpId, aff.partitions(), affKey)); return old != null ? old : top; } @@@ -1074,11 -1118,9 +1116,10 @@@ * @param id Exchange ID. */ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) { - GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node, - id, + GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id, cctx.kernalContext().clientNode(), - false); + false, + null); if (log.isDebugEnabled()) log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']'); @@@ -1103,13 -1144,10 +1143,12 @@@ * @param sndCounters {@code True} if need send partition update counters. * @return Message. */ - public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(@Nullable GridDhtPartitionExchangeId exchangeId, + public GridDhtPartitionsSingleMessage createPartitionsSingleMessage( - ClusterNode targetNode, + @Nullable GridDhtPartitionExchangeId exchangeId, boolean clientOnlyExchange, - boolean sndCounters) - { + boolean sndCounters, + ExchangeActions exchActions + ) { GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId, clientOnlyExchange, cctx.versions().last(), http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index f844e19,745e7d7..77792c7 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@@ -36,9 -36,9 +36,11 @@@ import org.apache.ignite.internal.Ignit import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; + import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; + import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; @@@ -116,15 -117,11 +118,13 @@@ public class GridClientPartitionTopolog /** * @param cctx Context. * @param grpId Group ID. - * @param exchFut Exchange ID. + * @param parts Number of partitions in the group. * @param similarAffKey Key to find caches with similar affinity. */ public GridClientPartitionTopology( - GridCacheSharedContext cctx, + GridCacheSharedContext<?, ?> cctx, int grpId, - GridDhtPartitionsExchangeFuture exchFut, + int parts, Object similarAffKey ) { this.cctx = cctx; @@@ -137,16 -132,9 +135,11 @@@ log = cctx.logger(getClass()); - lock.writeLock().lock(); + node2part = new GridDhtPartitionFullMap(cctx.localNode().id(), + cctx.localNode().order(), + updateSeq.get()); + - try { - cntrMap = new CachePartitionFullCountersMap(parts); - - beforeExchange0(cctx.localNode(), exchFut); - } - finally { - lock.writeLock().unlock(); - } ++ cntrMap = new CachePartitionFullCountersMap(parts); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 4608977,0dea5e4..22205ea --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@@ -255,9 -263,9 +263,9 @@@ public interface GridDhtPartitionTopolo * @return {@code True} if local state was changed. */ public boolean update( - @Nullable AffinityTopologyVersion exchangeVer, + @Nullable AffinityTopologyVersion exchangeResVer, GridDhtPartitionFullMap partMap, - @Nullable Map<Integer, T2<Long, Long>> cntrMap, + @Nullable CachePartitionFullCountersMap cntrMap, Set<Integer> partsToReload, @Nullable AffinityTopologyVersion msgTopVer); @@@ -267,19 -276,13 +276,20 @@@ * @return {@code True} if local state was changed. */ public boolean update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts); + GridDhtPartitionMap parts, + boolean force); /** + * Collects update counters collected during exchange. Called on coordinator. + * * @param cntrMap Counters map. */ - public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap); + public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap); + + /** + * Applies update counters collected during exchange on coordinator. Called on coordinator. + */ + public void applyUpdateCounters(); /** * Checks if there is at least one owner for each partition in the cache topology. http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 84ae9af,72ab8c8..23e28b0 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@@ -41,9 -41,8 +41,10 @@@ import org.apache.ignite.internal.manag import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; + import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; @@@ -1252,8 -1312,15 +1311,8 @@@ public class GridDhtPartitionTopologyIm if (state == OWNING) { GridDhtLocalPartition locPart = locParts.get(p); - assert locPart != null; + assert locPart != null : grp.cacheOrGroupName(); - if (incomeCntrMap != null) { - T2<Long, Long> cntr = incomeCntrMap.get(p); - - if (cntr != null && cntr.get2() > locPart.updateCounter()) - locPart.updateCounter(cntr.get2()); - } - if (locPart.state() == MOVING) { boolean success = locPart.own(); http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index cae3ce2,95c1a4f..84cc792 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@@ -90,6 -104,19 +102,13 @@@ public abstract class GridDhtPartitions } /** + * @param exchId Exchange ID. + */ + public void exchangeId(GridDhtPartitionExchangeId exchId) { + this.exchId = exchId; + } + + /** - * @param grpId Cache group ID. - * @return Parition update counters. - */ - public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId); - - /** * @return Last used version among all nodes. */ @Nullable public GridCacheVersion lastVersion() { http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 8530a23,07f36af..861ab38 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@@ -622,9 -762,9 +762,9 @@@ public class GridDhtPartitionsExchangeF boolean updateTop = exchId.topologyVersion().equals(grp.localStartVersion()); if (updateTop && clientTop != null) { - top.update(topologyVersion(), + top.update(null, clientTop.partitionMap(true), - clientTop.updateCounters(false), + clientTop.fullUpdateCounters(), Collections.<Integer>emptySet(), null); } @@@ -1129,17 -1228,15 +1228,16 @@@ true); } else { - msg = cctx.exchange().createPartitionsSingleMessage(node, - exchangeId(), + msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(), false, - true); + true, + exchActions); - } - Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved; + Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved; - if (partHistReserved0 != null) - msg.partitionHistoryCounters(partHistReserved0); + if (partHistReserved0 != null) + msg.partitionHistoryCounters(partHistReserved0); + } if (stateChangeExchange() && changeGlobalStateE != null) msg.setError(changeGlobalStateE); @@@ -1720,34 -2155,102 +2158,107 @@@ } } - for (GridDhtPartitionsAbstractMessage msg : msgs.values()) { - if (msg instanceof GridDhtPartitionsSingleMessage) { - GridDhtPartitionsSingleMessage msg0 = (GridDhtPartitionsSingleMessage)msg; + if (exchCtx.mergeExchanges()) { + log.info("Coordinator received all messages, try merge [ver=" + initialVersion() + ']'); + + boolean finish = cctx.exchange().mergeExchangesOnCoordinator(this); + + if (!finish) + return; + } + + finishExchangeOnCoordinator(sndResNodes); + } + catch (IgniteCheckedException e) { + if (reconnectOnError(e)) + onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); + else + onDone(e); + } + } + + /** + * @param sndResNodes Additional nodes to send finish message to. + */ + private void finishExchangeOnCoordinator(@Nullable Collection<ClusterNode> sndResNodes) { + try { + AffinityTopologyVersion resTopVer = exchCtx.events().topologyVersion(); - for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg0.partitions().entrySet()) { - Integer grpId = entry.getKey(); - CacheGroupContext grp = cctx.cache().cacheGroup(grpId); + log.info("finishExchangeOnCoordinator [topVer=" + initialVersion() + + ", resVer=" + resTopVer + ']'); - GridDhtPartitionTopology top = grp != null ? grp.topology() : - cctx.exchange().clientTopology(grpId, this); + Map<Integer, CacheGroupAffinityMessage> idealAffDiff = null; - CachePartitionPartialCountersMap cntrs = msg0.partitionUpdateCounters(grpId); + if (exchCtx.mergeExchanges()) { + synchronized (mux) { + if (mergedJoinExchMsgs != null) { + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : mergedJoinExchMsgs.entrySet()) { + msgs.put(e.getKey(), e.getValue()); - if (cntrs != null) - top.collectUpdateCounters(cntrs); + updatePartitionSingleMap(e.getKey(), e.getValue()); + } } } + + assert exchCtx.events().hasServerJoin() || exchCtx.events().hasServerLeft(); + + exchCtx.events().processEvents(this); + + if (exchCtx.events().hasServerLeft()) + idealAffDiff = cctx.affinity().onServerLeftWithExchangeMergeProtocol(this); + else + cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, true); + + for (CacheGroupDescriptor desc : cctx.affinity().cacheGroups().values()) { + if (desc.config().getCacheMode() == CacheMode.LOCAL) + continue; + + CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId()); + + GridDhtPartitionTopology top = grp != null ? grp.topology() : + cctx.exchange().clientTopology(desc.groupId()); + + top.beforeExchange(this, true, true); + } + } + + Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null; + + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { + GridDhtPartitionsSingleMessage msg = e.getValue(); + + // Apply update counters after all single messages are received. + for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { + Integer grpId = entry.getKey(); + + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); + + GridDhtPartitionTopology top = grp != null ? grp.topology() : + cctx.exchange().clientTopology(grpId); + - Map<Integer, T2<Long, Long>> cntrs = msg.partitionUpdateCounters(grpId); ++ CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId); + + if (cntrs != null) - top.applyUpdateCounters(cntrs); ++ top.collectUpdateCounters(cntrs); + } + + Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); + + if (affReq != null) { + joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(cctx, + resTopVer, + affReq, + joinedNodeAff); + } } + for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) { + if (!grpCtx.isLocal()) + grpCtx.topology().applyUpdateCounters(); + } + - if (discoEvt.type() == EVT_NODE_JOINED) - assignPartitionsStates(); - else if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { - assert discoEvt instanceof DiscoveryCustomEvent; + if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { + assert firstDiscoEvt instanceof DiscoveryCustomEvent; if (activateCluster()) assignPartitionsStates(); http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 5f415e2,a164e85..2bb19cd --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@@ -32,6 -32,8 +32,7 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; + import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 3c11fa7,bc7d314..44815ca --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@@ -17,12 -17,14 +17,14 @@@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + import java.util.Collection; -import java.util.Map; -import java.util.HashMap; +import java.io.Externalizable; import java.nio.ByteBuffer; import java.util.Collections; -import java.io.Externalizable; +import java.util.HashMap; +import java.util.Map; import org.apache.ignite.IgniteCheckedException; + import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java index 75cfb37,dc2fbf8..e7954d9 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java @@@ -30,9 -32,16 +30,16 @@@ public class IgniteDhtPartitionCounters private static final long serialVersionUID = 0L; /** */ - private Map<Integer, Map<Integer, T2<Long, Long>>> map; + private Map<Integer, CachePartitionFullCountersMap> map; /** + * @return {@code True} if map is empty. + */ + public synchronized boolean empty() { + return map == null || map.isEmpty(); + } + + /** * @param cacheId Cache ID. * @param cntrMap Counters map. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/30339d35/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java ----------------------------------------------------------------------