Merge remote-tracking branch 'remotes/community/ignite-5075' into ignite-5075-pds
# Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/08404350 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/08404350 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/08404350 Branch: refs/heads/ignite-5075-pds Commit: 08404350a6cfbf841638a40e5b96823bc06f87e3 Parents: 09e0bc2 db7a809 Author: sboikov <[email protected]> Authored: Mon May 22 12:58:11 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon May 22 12:58:11 2017 +0300 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 13 + .../managers/communication/GridIoManager.java | 10 +- .../internal/managers/discovery/DiscoCache.java | 30 +- .../discovery/GridDiscoveryManager.java | 144 +- .../affinity/GridAffinityAssignmentCache.java | 38 +- .../affinity/GridAffinityProcessor.java | 2 +- .../processors/affinity/GridAffinityUtils.java | 2 +- .../cache/CacheAffinitySharedManager.java | 604 ++--- .../CacheClientReconnectDiscoveryData.java | 62 +- .../internal/processors/cache/CacheData.java | 18 +- .../processors/cache/CacheGroupData.java | 142 ++ .../processors/cache/CacheGroupDescriptor.java | 210 ++ .../cache/CacheGroupInfrastructure.java | 783 ++++++ .../processors/cache/CacheMetricsImpl.java | 10 +- .../cache/CacheNodeCommonDiscoveryData.java | 33 + .../processors/cache/ClusterCachesInfo.java | 364 ++- .../cache/ClusterCachesReconnectResult.java | 75 + .../cache/DynamicCacheDescriptor.java | 26 + .../processors/cache/ExchangeActions.java | 86 +- .../processors/cache/GridCacheAdapter.java | 51 +- .../cache/GridCacheAffinityManager.java | 64 +- .../processors/cache/GridCacheAttributes.java | 14 + .../cache/GridCacheClearAllRunnable.java | 2 +- .../cache/GridCacheConcurrentMap.java | 22 +- .../cache/GridCacheConcurrentMapImpl.java | 132 +- .../processors/cache/GridCacheContext.java | 145 +- .../processors/cache/GridCacheEntryInfo.java | 36 +- .../processors/cache/GridCacheEventManager.java | 36 - .../cache/GridCacheGroupIdMessage.java | 110 + .../processors/cache/GridCacheIdMessage.java | 117 + .../processors/cache/GridCacheIoManager.java | 288 ++- .../cache/GridCacheLocalConcurrentMap.java | 42 +- .../processors/cache/GridCacheMapEntry.java | 18 +- .../processors/cache/GridCacheMessage.java | 97 +- .../GridCachePartitionExchangeManager.java | 289 +-- .../processors/cache/GridCachePreloader.java | 23 +- .../cache/GridCachePreloaderAdapter.java | 46 +- .../processors/cache/GridCacheProcessor.java | 373 ++- .../cache/GridCacheSharedContext.java | 4 +- .../processors/cache/GridCacheTtlManager.java | 22 +- .../processors/cache/GridCacheUtils.java | 42 +- .../GridChangeGlobalStateMessageResponse.java | 20 +- .../processors/cache/GridNoStorageCacheMap.java | 26 +- .../cache/IgniteCacheOffheapManager.java | 122 +- .../cache/IgniteCacheOffheapManagerImpl.java | 1164 ++++++--- .../cache/affinity/GridCacheAffinityImpl.java | 9 +- .../processors/cache/database/CacheDataRow.java | 5 - .../cache/database/CacheDataRowAdapter.java | 47 +- .../cache/database/CacheSearchRow.java | 5 + .../processors/cache/database/RowStore.java | 16 +- .../cache/database/tree/BPlusTree.java | 39 +- .../cache/database/tree/io/PageIO.java | 24 + .../distributed/GridCacheTtlUpdateRequest.java | 4 +- .../distributed/GridDistributedBaseMessage.java | 4 +- .../GridDistributedCacheAdapter.java | 11 +- .../GridDistributedTxFinishResponse.java | 28 +- .../dht/GridCachePartitionedConcurrentMap.java | 72 +- .../dht/GridClientPartitionTopology.java | 25 +- .../dht/GridDhtAffinityAssignmentRequest.java | 10 +- .../dht/GridDhtAffinityAssignmentResponse.java | 12 +- .../dht/GridDhtAssignmentFetchFuture.java | 22 +- .../distributed/dht/GridDhtCacheAdapter.java | 338 ++- .../distributed/dht/GridDhtCacheEntry.java | 6 +- .../cache/distributed/dht/GridDhtGetFuture.java | 8 +- .../distributed/dht/GridDhtGetSingleFuture.java | 11 +- .../distributed/dht/GridDhtLocalPartition.java | 366 ++- .../distributed/dht/GridDhtLockResponse.java | 2 +- .../dht/GridDhtPartitionTopology.java | 9 +- .../dht/GridDhtPartitionTopologyImpl.java | 272 +- .../dht/GridDhtTransactionalCacheAdapter.java | 37 +- .../dht/GridDhtTxFinishResponse.java | 14 +- .../dht/GridDhtTxOnePhaseCommitAckRequest.java | 16 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 4 +- .../dht/GridPartitionedGetFuture.java | 2 +- .../dht/GridPartitionedSingleGetFuture.java | 2 +- .../GridDhtAtomicAbstractUpdateRequest.java | 4 +- .../dht/atomic/GridDhtAtomicCache.java | 60 +- .../dht/atomic/GridDhtAtomicCacheEntry.java | 53 - .../GridDhtAtomicDeferredUpdateResponse.java | 4 +- .../dht/atomic/GridDhtAtomicNearResponse.java | 4 +- .../dht/atomic/GridDhtAtomicUpdateResponse.java | 5 +- .../GridNearAtomicAbstractUpdateRequest.java | 4 +- .../GridNearAtomicCheckUpdateRequest.java | 4 +- .../atomic/GridNearAtomicUpdateResponse.java | 4 +- .../dht/colocated/GridDhtColocatedCache.java | 23 +- .../colocated/GridDhtColocatedCacheEntry.java | 52 - .../dht/preloader/GridDhtForceKeysFuture.java | 15 +- .../dht/preloader/GridDhtForceKeysRequest.java | 4 +- .../dht/preloader/GridDhtForceKeysResponse.java | 8 +- .../GridDhtPartitionDemandMessage.java | 12 +- .../dht/preloader/GridDhtPartitionDemander.java | 208 +- .../dht/preloader/GridDhtPartitionSupplier.java | 57 +- .../GridDhtPartitionSupplyMessage.java | 26 +- .../GridDhtPartitionsAbstractMessage.java | 26 +- .../GridDhtPartitionsExchangeFuture.java | 216 +- .../preloader/GridDhtPartitionsFullMessage.java | 62 +- .../GridDhtPartitionsSingleMessage.java | 41 +- .../GridDhtPartitionsSingleRequest.java | 7 +- .../dht/preloader/GridDhtPreloader.java | 410 +-- .../distributed/near/GridNearAtomicCache.java | 2 +- .../distributed/near/GridNearCacheAdapter.java | 14 +- .../distributed/near/GridNearGetRequest.java | 4 +- .../distributed/near/GridNearGetResponse.java | 4 +- .../near/GridNearSingleGetRequest.java | 4 +- .../near/GridNearSingleGetResponse.java | 4 +- .../near/GridNearTransactionalCache.java | 4 +- .../near/GridNearTxFinishResponse.java | 14 +- .../processors/cache/local/GridLocalCache.java | 15 +- .../local/atomic/GridLocalAtomicCache.java | 4 +- .../query/GridCacheDistributedQueryManager.java | 10 +- .../cache/query/GridCacheQueryManager.java | 4 +- .../cache/query/GridCacheQueryRequest.java | 3 +- .../cache/query/GridCacheQueryResponse.java | 4 +- .../CacheContinuousQueryBatchAck.java | 4 +- .../continuous/CacheContinuousQueryHandler.java | 2 +- .../continuous/CacheContinuousQueryManager.java | 7 +- .../cache/transactions/IgniteTxEntry.java | 4 +- .../cache/transactions/IgniteTxHandler.java | 26 +- .../cache/transactions/TxLocksRequest.java | 20 +- .../cache/transactions/TxLocksResponse.java | 28 +- .../cluster/GridClusterStateProcessor.java | 4 +- .../visor/cache/VisorCachePartitionsTask.java | 2 +- .../CacheAtomicSingleMessageCountSelfTest.java | 2 +- .../cache/CacheDeferredDeleteQueueTest.java | 2 +- ...cheDhtLocalPartitionAfterRemoveSelfTest.java | 2 +- ...CacheExchangeMessageDuplicatedStateTest.java | 54 +- .../cache/CacheOffheapMapEntrySelfTest.java | 9 +- .../GridCacheConditionalDeploymentSelfTest.java | 18 + .../processors/cache/GridCacheLeakTest.java | 3 +- .../GridCacheOrderedPreloadingSelfTest.java | 14 +- .../cache/GridCacheTtlManagerSelfTest.java | 3 +- .../processors/cache/IgniteCacheGroupsTest.java | 2406 ++++++++++++++++++ .../cache/IgniteCachePeekModesAbstractTest.java | 2 +- .../processors/cache/IgniteCacheStartTest.java | 5 +- .../cache/IgniteOnePhaseCommitInvokeTest.java | 4 +- ...niteTopologyValidatorGridSplitCacheTest.java | 8 +- .../IgniteTxStoreExceptionAbstractSelfTest.java | 4 +- .../GridCacheBinaryObjectsAbstractSelfTest.java | 2 +- .../GridCacheQueueCleanupSelfTest.java | 13 +- .../GridCacheSetAbstractSelfTest.java | 17 +- .../GridCacheSetFailoverAbstractSelfTest.java | 6 +- .../IgnitePartitionedQueueNoBackupsTest.java | 6 +- .../IgnitePartitionedSetNoBackupsSelfTest.java | 6 +- .../CacheDiscoveryDataConcurrentJoinTest.java | 17 + .../CacheLateAffinityAssignmentTest.java | 6 +- .../IgniteCachePartitionLossPolicySelfTest.java | 19 +- .../IgniteCacheReadFromBackupTest.java | 5 +- ...sabledMultiNodeWithGroupFullApiSelfTest.java | 35 + .../atomic/IgniteCacheAtomicProtocolTest.java | 7 +- ...AtomicMultiNodeWithGroupFullApiSelfTest.java | 34 + ...nabledMultiNodeWithGroupFullApiSelfTest.java | 35 + .../near/GridCacheNearReadersSelfTest.java | 4 +- ...tionedMultiNodeWithGroupFullApiSelfTest.java | 34 + .../GridCacheReplicatedPreloadSelfTest.java | 3 +- .../IgniteCacheClientNearCacheExpiryTest.java | 20 +- .../expiry/IgniteCacheTtlCleanupSelfTest.java | 2 +- ...IgniteCacheJdbcBlobStoreNodeRestartTest.java | 3 + ...acheLocalAtomicWithGroupFullApiSelfTest.java | 34 + .../local/GridCacheLocalFullApiSelfTest.java | 1 - .../GridCacheLocalWithGroupFullApiSelfTest.java | 34 + .../TxOptimisticDeadlockDetectionTest.java | 2 +- .../TxPessimisticDeadlockDetectionTest.java | 2 +- .../loadtests/hashmap/GridCacheTestContext.java | 6 +- .../communication/GridCacheMessageSelfTest.java | 30 + .../testframework/junits/GridAbstractTest.java | 17 + .../junits/common/GridCommonAbstractTest.java | 64 + .../IgniteCacheFullApiSelfTestSuite.java | 13 + .../testsuites/IgniteCacheTestSuite3.java | 3 + .../query/h2/database/H2PkHashIndex.java | 4 +- .../query/h2/database/H2RowFactory.java | 2 +- .../h2/twostep/GridReduceQueryExecutor.java | 12 +- .../cache/IgniteCacheGroupsSqlTest.java | 144 ++ .../cache/IgniteCacheNoClassQuerySelfTest.java | 23 +- .../IgniteCacheWithIndexingTestSuite.java | 3 + .../yardstick/IgniteBenchmarkArguments.java | 12 + .../org/apache/ignite/yardstick/IgniteNode.java | 3 + 176 files changed, 9049 insertions(+), 3043 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 6601591,c3311a8..875f684 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@@ -515,10 -527,10 +532,10 @@@ public class CacheAffinitySharedManager /** * */ - public void removeAllCacheInfo(){ + public void removeAllCacheInfo() { - caches.clear(); + grpHolders.clear(); - registeredCaches.clear(); + registeredGrps.clear(); } /** @@@ -720,8 -733,7 +738,8 @@@ * @param nodeId Node ID. * @param res Response. */ - private void processAffinityAssignmentResponse(UUID nodeId, - private void processAffinityAssignmentResponse(Integer grpId, UUID nodeId, GridDhtAffinityAssignmentResponse res) { ++ private void processAffinityAssignmentResponse(Integer grpId, UUID nodeId, + GridDhtAffinityAssignmentResponse res) { if (log.isDebugEnabled()) log.debug("Processing affinity assignment response [node=" + nodeId + ", res=" + res + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index c7d1fa7,96ae0b9..976b843 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@@ -845,10 -844,9 +846,9 @@@ public class GridCachePartitionExchange /** * @param nodes Nodes. - * @return {@code True} if message was sent, {@code false} if node left grid. */ - private boolean sendAllPartitions(Collection<ClusterNode> nodes) { + private void sendAllPartitions(Collection<ClusterNode> nodes) { - GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, true); + GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, null, null, true); if (log.isDebugEnabled()) log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']'); @@@ -1266,13 -1253,13 +1260,13 @@@ GridDhtPartitionTopology top = null; - if (cacheCtx == null) - top = clientTops.get(cacheId); - else if (!cacheCtx.isLocal()) - top = cacheCtx.topology(); + if (grp == null) + top = clientTops.get(grpId); + else if (!grp.isLocal()) + top = grp.topology(); if (top != null) - updated |= top.update(null, entry.getValue(), null) != null; + updated |= top.update(null, entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), cacheId)) != null; } if (!cctx.kernalContext().clientNode() && updated) http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 3f26871,e00ba5f..7dd457b --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@@ -73,9 -73,9 +73,10 @@@ import org.apache.ignite.internal.pagem import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; + import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cache.database.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.database.IgniteCacheSnapshotManager; import org.apache.ignite.internal.processors.cache.database.MemoryPolicy; import org.apache.ignite.internal.processors.cache.database.freelist.FreeList; import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; @@@ -1229,13 -1291,27 +1293,19 @@@ public class GridCacheProcessor extend ctx.kernalContext().cache().context().database().onCacheStop(ctx); + ctx.kernalContext().cache().context().snapshot().onCacheStop(ctx); + + ctx.group().stopCache(ctx, destroy); + U.stopLifecycleAware(log, lifecycleAwares(cache.configuration(), ctx.store().configuredStore())); - if (log.isInfoEnabled()) - log.info("Stopped cache: " + cache.name()); + if (log.isInfoEnabled()) { + if (ctx.group().sharedGroup()) + log.info("Stopped cache [cacheName=" + cache.name() + ", group=" + ctx.group().name() + ']'); + else + log.info("Stopped cache [cacheName=" + cache.name() + ']'); + } - if (sharedCtx.pageStore() != null) { - try { - sharedCtx.pageStore().shutdownForCache(ctx, destroy); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to gracefully clean page store resources for destroyed cache " + - "[cache=" + ctx.name() + "]", e); - } - } - cleanup(ctx); } @@@ -1855,10 -1930,9 +1928,12 @@@ startCache(cache, schema != null ? schema : new QuerySchema()); + grp.onCacheStarted(cacheCtx); + onKernalStart(cache); + + if (proxyRestart) + proxy.onRestarted(cacheCtx, cache); } /** @@@ -1963,28 -2069,16 +2088,31 @@@ for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) { stopGateway(action.request()); - prepareCacheStop(action.request()); + sharedCtx.database().checkpointReadLock(); + + try { + stopCtx = prepareCacheStop(action.request()); + destroy = action.request().destroy(); + } + finally { + sharedCtx.database().checkpointReadUnlock(); + } + + if (stopCtx != null) { + if (stopped == null) + stopped = new ArrayList<>(); + + stopped.add(F.<GridCacheContext, Boolean>t(stopCtx, destroy)); + } } - for (DynamicCacheChangeRequest req : exchActions.closeRequests(ctx.localNodeId())) { - String cacheName = req.cacheName(); + for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStop()) + stopCacheGroup(grpDesc.groupId()); + + for (ExchangeActions.ActionData req : exchActions.closeRequests(ctx.localNodeId())) { + String cacheName = req.request().cacheName(); - IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName); + IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(cacheName); if (proxy != null) { if (proxy.context().affinityNode()) { @@@ -2000,28 -2093,13 +2128,33 @@@ proxy.context().gate().onStopped(); - CacheGroupInfrastructure grp = prepareCacheStop(req.request()); + sharedCtx.database().checkpointReadLock(); + + try { + stopCtx = prepareCacheStop(req); + - if (grp != null && !grp.hasCaches()) - stopCacheGroup(grp.groupId()); + destroy = req.destroy(); ++ ++ if (stopCtx != null && !stopCtx.group().hasCaches()) ++ stopCacheGroup(stopCtx.groupId()); ++ + } + finally { + sharedCtx.database().checkpointReadUnlock(); + } } } + + if (stopCtx != null) { + if (stopped == null) + stopped = new ArrayList<>(); + + stopped.add(F.<GridCacheContext, Boolean>t(stopCtx, destroy)); + } } + + if (stopped != null && !sharedCtx.kernalContext().clientNode()) + sharedCtx.database().onCachesStopped(stopped); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 5bcefda,d344e20..7ea1f9a --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@@ -96,9 -126,11 +126,10 @@@ public interface IgniteCacheOffheapMana public CacheDataStore dataStore(GridDhtLocalPartition part); /** - * @param p Partition ID. * @param store Data store. + * @throws IgniteCheckedException If failed. */ - public void destroyCacheDataStore(int p, CacheDataStore store) throws IgniteCheckedException; + public void destroyCacheDataStore(CacheDataStore store) throws IgniteCheckedException; /** * TODO: GG-10884, used on only from initialValue. @@@ -147,16 -185,7 +182,17 @@@ ) throws IgniteCheckedException; /** + * @param key Key. + * @param part Partition. + * @throws IgniteCheckedException If failed. + */ + public void updateIndexes( + KeyCacheObject key, + GridDhtLocalPartition part + ) throws IgniteCheckedException; + + /** + * @param cctx Cache context. * @param key Key. * @param partId Partition number. * @param part Partition. @@@ -348,26 -403,26 +410,30 @@@ @Nullable CacheDataRow oldRow) throws IgniteCheckedException; /** + * @param cctx Cache context. * @param key Key. - * @param part Partition. * @param val Value. * @param ver Version. * @param expireTime Expire time. * @param oldRow Old row if available. * @throws IgniteCheckedException If failed. */ - void update(KeyCacheObject key, + void update( + GridCacheContext cctx, + KeyCacheObject key, - int part, CacheObject val, GridCacheVersion ver, long expireTime, @Nullable CacheDataRow oldRow) throws IgniteCheckedException; /** + * @param key Key. + * @throws IgniteCheckedException If failed. + */ + void updateIndexes(KeyCacheObject key) throws IgniteCheckedException; + + /** + * @param cctx Cache context. * @param key Key. * @param c Closure. * @throws IgniteCheckedException If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 947421b,8da7357..db08801 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@@ -81,8 -78,21 +79,20 @@@ import static org.apache.ignite.interna * */ @SuppressWarnings("PublicInnerClass") - public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter implements IgniteCacheOffheapManager { + public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager { + /** */ + private static final int UNDEFINED_CACHE_ID = 0; + + /** */ + protected GridCacheSharedContext ctx; + + /** */ + protected CacheGroupInfrastructure grp; + + /** */ + protected IgniteLogger log; + /** */ - // TODO GG-11208 need restore size after restart. private CacheDataStore locCacheDataStore; /** */ @@@ -118,15 -122,15 +125,17 @@@ } /** {@inheritDoc} */ - @Override protected void start0() throws IgniteCheckedException { - super.start0(); + @Override public void start(GridCacheSharedContext ctx, CacheGroupInfrastructure grp) throws IgniteCheckedException { + this.ctx = ctx; + this.grp = grp; + this.log = ctx.logger(getClass()); + indexingEnabled = QueryUtils.isEnabled(cctx.config()); + - updateValSizeThreshold = cctx.shared().database().pageSize() / 2; + updateValSizeThreshold = ctx.database().pageSize() / 2; - if (cctx.affinityNode()) { - cctx.shared().database().checkpointReadLock(); + if (grp.affinityNode()) { + ctx.database().checkpointReadLock(); try { initDataStructures(); @@@ -170,9 -175,21 +180,21 @@@ } /** {@inheritDoc} */ - @Override protected void onKernalStop0(boolean cancel) { - super.onKernalStop0(cancel); + @Override public void stop() { + try { + for (CacheDataStore store : cacheDataStores()) - store.destroy(); ++ destroyCacheDataStore(store); + + if (pendingEntries != null) + pendingEntries.destroy(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e.getMessage(), e); + } + } + /** {@inheritDoc} */ + @Override public void onKernalStop() { busyLock.block(); } @@@ -229,10 -253,10 +258,10 @@@ * @return Partition data. */ @Nullable private CacheDataStore partitionData(int p) { - if (cctx.isLocal()) + if (grp.isLocal()) return locCacheDataStore; else { - GridDhtLocalPartition part = cctx.topology().localPartition(p, AffinityTopologyVersion.NONE, false, true); - GridDhtLocalPartition part = grp.topology().localPartition(p, AffinityTopologyVersion.NONE, false); ++ GridDhtLocalPartition part = grp.topology().localPartition(p, AffinityTopologyVersion.NONE, false, true); return part != null ? part.dataStore() : null; } @@@ -341,12 -351,7 +355,12 @@@ ) throws IgniteCheckedException { assert expireTime >= 0; - dataStore(part).update(key, val, ver, expireTime, oldRow); - dataStore(part).update(cctx, key, partId, val, ver, expireTime, oldRow); ++ dataStore(part).update(cctx, key, val, ver, expireTime, oldRow); + } + + /** {@inheritDoc} */ + @Override public void updateIndexes(KeyCacheObject key, GridDhtLocalPartition part) throws IgniteCheckedException { + dataStore(part).updateIndexes(key); } /** {@inheritDoc} */ @@@ -419,34 -425,27 +434,34 @@@ * @param readers {@code True} to clear readers. */ @SuppressWarnings("unchecked") - @Override public void clear(boolean readers) { + @Override public void clear(GridCacheContext cctx, boolean readers) { GridCacheVersion obsoleteVer = null; - GridIterator<CacheDataRow> it = rowsIterator(true, true, null); + GridIterator<CacheDataRow> it = iterator(cctx.cacheId(), cacheDataStores().iterator()); while (it.hasNext()) { - KeyCacheObject key = it.next().key(); + cctx.shared().database().checkpointReadLock(); try { - if (obsoleteVer == null) - obsoleteVer = ctx.versions().next(); + KeyCacheObject key = it.next().key(); - GridCacheEntryEx entry = cctx.cache().entryEx(key); + try { + if (obsoleteVer == null) - obsoleteVer = cctx.versions().next(); ++ obsoleteVer = ctx.versions().next(); - entry.clear(obsoleteVer, readers); - } - catch (GridDhtInvalidPartitionException ignore) { - // Ignore. + GridCacheEntryEx entry = cctx.cache().entryEx(key); + + entry.clear(obsoleteVer, readers); + } + catch (GridDhtInvalidPartitionException ignore) { + // Ignore. + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to clear cache entry: " + key, e); + } } - catch (IgniteCheckedException e) { - U.error(log, "Failed to clear cache entry: " + key, e); + finally { + cctx.shared().database().checkpointReadUnlock(); } } } @@@ -826,39 -826,41 +864,46 @@@ IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, int amount ) throws IgniteCheckedException { + assert !cctx.isNear() : cctx.name(); + if (hasPendingEntries && pendingEntries != null) { - GridCacheVersion obsoleteVer = null; + cctx.shared().database().checkpointReadLock(); - long now = U.currentTimeMillis(); + try { + GridCacheVersion obsoleteVer = null; + + long now = U.currentTimeMillis(); - GridCursor<PendingRow> cur = pendingEntries.find(START_PENDING_ROW, new PendingRow(now, 0)); - GridCursor<PendingRow> cur; ++ GridCursor<PendingRow> cur; + + if (grp.sharedGroup()) + cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0)); + else + cur = pendingEntries.find(null, new PendingRow(UNDEFINED_CACHE_ID, now, 0)); - int cleared = 0; + int cleared = 0; - while (cur.next()) { - PendingRow row = cur.get(); + while (cur.next()) { + PendingRow row = cur.get(); - if (amount != -1 && cleared > amount) - return true; + if (amount != -1 && cleared > amount) + return true; if (row.key.partition() == -1) - row.key.partition(cctx.affinity().partition(row.key)); + row.key.partition(cctx.affinity().partition(row.key));assert row.key != null && row.link != 0 && row.expireTime != 0 : row; - if (pendingEntries.remove(row) != null) { - assert row.key != null && row.link != 0 && row.expireTime != 0 : row; ++ if (pendingEntries.removex(row)) { + if (obsoleteVer == null) - obsoleteVer = cctx.versions().next(); ++ obsoleteVer = ctx.versions().next(); - if (pendingEntries.removex(row)) { - if (obsoleteVer == null) - obsoleteVer = ctx.versions().next(); + c.apply(cctx.cache().entryEx(row.key), obsoleteVer); + } - c.apply(cctx.cache().entryEx(row.key), obsoleteVer); + cleared++; } - - cleared++; + } + finally { + cctx.shared().database().checkpointReadUnlock(); } } @@@ -1043,8 -1101,10 +1144,8 @@@ } /** {@inheritDoc} */ -- @Override public void update( - GridCacheContext cctx, -- KeyCacheObject key, - int p, ++ @Override public void update(GridCacheContext cctx,KeyCacheObject key, ++ CacheObject val, GridCacheVersion ver, long expireTime, @@@ -1056,10 -1115,11 +1157,11 @@@ throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); try { - int cacheId = cctx.memoryPolicy().config().getPageEvictionMode() != DataPageEvictionMode.DISABLED ? - cctx.cacheId() : 0; + int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : UNDEFINED_CACHE_ID; + + assert oldRow == null || oldRow.cacheId() == cacheId : oldRow; - DataRow dataRow = new DataRow(key, val, ver, p, expireTime, cacheId); + DataRow dataRow = new DataRow(key, val, ver, partId, expireTime, cacheId); CacheObjectContext coCtx = cctx.cacheObjectContext(); @@@ -1152,28 -1216,7 +1261,28 @@@ } /** {@inheritDoc} */ + @Override public void updateIndexes(KeyCacheObject key) throws IgniteCheckedException { + if (indexingEnabled) { + CacheDataRow row = dataTree.findOne(new SearchRow(key)); + + GridCacheQueryManager qryMgr = cctx.queries(); + + if (row != null) { + qryMgr.store( + key, + partId, + null, + null, + row.value(), + row.version(), + row.expireTime(), + row.link()); + } + } + } + + /** {@inheritDoc} */ - @Override public void remove(KeyCacheObject key, int partId) throws IgniteCheckedException { + @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) throws IgniteCheckedException { if (!busyLock.enterBusy()) throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index c966877,0955a51..59a5bb9 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@@ -306,9 -305,9 +305,9 @@@ public abstract class GridDistributedCa IgniteCacheOffheapManager offheap = ctx.offheap(); - if (ctx.affinity().primaryByPartition(ctx.localNode(), partition, topVer) && modes.primary || - ctx.affinity().backupByPartition(ctx.localNode(), partition, topVer) && modes.backup) - size += offheap.cacheEntriesCount(ctx.cacheId(), partition); + if (ctx.affinity().primaryByPartition(ctx.localNode(), part, topVer) && modes.primary || + ctx.affinity().backupByPartition(ctx.localNode(), part, topVer) && modes.backup) - size += offheap.entriesCount(part); ++ size += offheap.cacheEntriesCount(ctx.cacheId(), part); } return size; http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 31edeea,e94415c..fe92cfb --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@@ -355,14 -353,8 +355,14 @@@ public class GridClientPartitionTopolog } /** {@inheritDoc} */ + @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, + boolean create, boolean showRenting) throws GridDhtInvalidPartitionException { + return localPartition(p, topVer, create); + } + + /** {@inheritDoc} */ - @Override public GridDhtLocalPartition localPartition(Object key, boolean create) { - return localPartition(1, AffinityTopologyVersion.NONE, create); + @Override public GridDhtLocalPartition localPartition(int p) { + return localPartition(p, AffinityTopologyVersion.NONE, false); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 1d910a3,bbb3cc5..5fd7ada --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@@ -54,7 -61,7 +61,8 @@@ import org.apache.ignite.internal.util. import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; @@@ -133,22 -152,35 +153,36 @@@ public class GridDhtLocalPartition exte * reservation is released. */ private volatile boolean shouldBeRenting; + /** Set if partition must be re-created and preloaded after eviction. */ + private boolean reload; + /** - * @param cctx Context. + * @param ctx Context. + * @param grp Cache group. * @param id Partition ID. * @param entryFactory Entry factory. */ - @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") GridDhtLocalPartition(GridCacheContext cctx, - int id, GridCacheMapEntryFactory entryFactory) { - super(cctx, entryFactory, Math.max(10, GridCacheAdapter.DFLT_START_CACHE_SIZE / cctx.affinity().partitions())); + @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") + GridDhtLocalPartition(GridCacheSharedContext ctx, - CacheGroupInfrastructure grp, - int id, - GridCacheMapEntryFactory entryFactory) { - super(entryFactory); ++ CacheGroupInfrastructure grp, int id, GridCacheMapEntryFactory entryFactory) { ++ super( entryFactory); this.id = id; - this.cctx = cctx; + this.ctx = ctx; + this.grp = grp; - log = U.logger(cctx.kernalContext(), logRef, this); + log = U.logger(ctx.kernalContext(), logRef, this); + + if (grp.sharedGroup()) { + singleCacheEntryMap = null; + cachesEntryMaps = new ConcurrentHashMap<>(); + cacheSizes = new ConcurrentHashMap<>(); + } + else { + singleCacheEntryMap = createEntriesMap(); + cachesEntryMaps = null; + cacheSizes = null; + } rent = new GridFutureAdapter<Object>() { @Override public String toString() { @@@ -756,7 -867,9 +870,7 @@@ */ private void destroyCacheDataStore() { try { - cctx.offheap().destroyCacheDataStore(dataStore()); - CacheDataStore store = dataStore(); - - grp.offheap().destroyCacheDataStore(id, store); ++ grp.offheap().destroyCacheDataStore(dataStore()); } catch (IgniteCheckedException e) { log.error("Unable to destroy cache data store on partition eviction [id=" + id + "]", e); @@@ -900,31 -1030,24 +1026,29 @@@ true, false); - if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { + cctx.shared().database().checkpointReadLock(); + - try { - if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { - if (rec) { - cctx.events().addEvent(cached.partition(), - cached.key(), - cctx.localNodeId(), - (IgniteUuid)null, - null, - EVT_CACHE_REBALANCE_OBJECT_UNLOADED, - null, - false, - cached.rawGet(), - cached.hasValue(), - null, - null, - null, - false); - } ++ try {if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { + if (rec) { + cctx.events().addEvent(cached.partition(), + cached.key(), + ctx.localNodeId(), + (IgniteUuid)null, + null, + EVT_CACHE_REBALANCE_OBJECT_UNLOADED, + null, + false, + cached.rawGet(), + cached.hasValue(), + null, + null, + null, - false); ++ false);} } } + finally { + cctx.shared().database().checkpointReadUnlock(); + } } catch (GridDhtInvalidPartitionException e) { assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']'; http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 938c1be,1a36e4d..f006c83 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@@ -68,7 -73,8 +70,8 @@@ import static org.apache.ignite.interna /** * Partition topology. */ - @GridToStringExclude class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { + @GridToStringExclude -public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { ++publicclass GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** If true, then check consistency. */ private static final boolean CONSISTENCY_CHECK = false; @@@ -127,20 -136,24 +133,26 @@@ private volatile boolean treatAllPartAsLoc; /** - * @param cctx Context. + * @param ctx Cache shared context. + * @param grp Cache group. * @param entryFactory Entry factory. */ - GridDhtPartitionTopologyImpl(GridCacheContext<?, ?> cctx, GridCacheMapEntryFactory entryFactory) { - assert cctx != null; - - this.cctx = cctx; + public GridDhtPartitionTopologyImpl(GridCacheSharedContext ctx, + CacheGroupInfrastructure grp, + GridCacheMapEntryFactory entryFactory) { + assert ctx != null; + assert grp != null; + assert entryFactory != null; + + this.ctx = ctx; + this.grp = grp; this.entryFactory = entryFactory; - log = cctx.logger(getClass()); + log = ctx.logger(getClass()); - locParts = new AtomicReferenceArray<>(cctx.config().getAffinity().partitions()); + locParts = new AtomicReferenceArray<>(grp.affinityFunction().partitions()); + + part2node = new HashMap<>(cctx.config().getAffinity().partitions(), 1.0f); } /** {@inheritDoc} */ @@@ -404,100 -503,119 +415,109 @@@ ClusterState newState = exchFut.newClusterState(); treatAllPartAsLoc = (newState != null && newState == ClusterState.ACTIVE) - || (cctx.kernalContext().state().active() + || (ctx.kernalContext().state().active() && discoEvt.type() == EventType.EVT_NODE_JOINED && discoEvt.eventNode().isLocal() - && !cctx.kernalContext().clientNode() + && !ctx.kernalContext().clientNode() ); - ClusterNode loc = cctx.localNode(); - // Wait for rent outside of checkpoint lock. - waitForRent(); - + ClusterNode loc = ctx.localNode(); - cctx.shared().database().checkpointReadLock(); + ctx.database().checkpointReadLock(); - synchronized (ctx.exchange().interruptLock()) { - if (Thread.currentThread().isInterrupted()) - throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread()); + try { - synchronized (cctx.shared().exchange().interruptLock()) { ++ synchronized (ctx.exchange().interruptLock()) { + if (Thread.currentThread().isInterrupted()) + throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread()); - try { U.writeLock(lock); - } - catch (IgniteInterruptedCheckedException e) { - ctx.database().checkpointReadUnlock(); - throw e; - } + try { + GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); - try { - GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); + if (stopping) + return; - if (stopping) - return; + assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + + topVer + ", exchId=" + exchId + ']'; - assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + - topVer + ", exchId=" + exchId + ']'; + if (exchId.isLeft()) + removeNode(exchId.nodeId()); - if (exchId.isLeft()) - removeNode(exchId.nodeId()); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); - ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); + if (log.isDebugEnabled()) + log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); - if (log.isDebugEnabled()) - log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); + long updateSeq = this.updateSeq.incrementAndGet(); - long updateSeq = this.updateSeq.incrementAndGet(); + cntrMap.clear(); - cntrMap.clear(); + // If this is the oldest node. - if (oldest != null && (loc.equals(oldest) || exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom()))) { ++ if (oldest != null && (loc.equals(oldest) || exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()))) { + if (node2part == null) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); - // If this is the oldest node. - if (oldest != null && (loc.equals(oldest) || exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()))) { - if (node2part == null) { - node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); + if (log.isDebugEnabled()) + log.debug("Created brand new full topology map on oldest node [exchId=" + + exchId + ", fullMap=" + fullMapString() + ']'); + } + else if (!node2part.valid()) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); - if (log.isDebugEnabled()) - log.debug("Created brand new full topology map on oldest node [exchId=" + - exchId + ", fullMap=" + fullMapString() + ']'); - } - else if (!node2part.valid()) { - node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); + if (log.isDebugEnabled()) + log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" + + node2part + ']'); + } + else if (!node2part.nodeId().equals(loc.id())) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); - if (log.isDebugEnabled()) - log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" + - node2part + ']'); + if (log.isDebugEnabled()) + log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" + + exchId + ", fullMap=" + fullMapString() + ']'); + } } - else if (!node2part.nodeId().equals(loc.id())) { - node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); - if (log.isDebugEnabled()) - log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" + - exchId + ", fullMap=" + fullMapString() + ']'); + if (affReady) + initPartitions0(exchFut, updateSeq); + else { - List<List<ClusterNode>> aff = cctx.affinity().idealAssignment(); ++ List<List<ClusterNode>> aff = grp.affinity().idealAssignment(); + + createPartitions(aff, updateSeq); } - } - if (affReady) - initPartitions0(exchFut, updateSeq); - else { - List<List<ClusterNode>> aff = grp.affinity().idealAssignment(); + consistencyCheck(); - createPartitions(aff, updateSeq); + if (log.isDebugEnabled()) + log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" + + fullMapString() + ']'); + } + finally { + lock.writeLock().unlock(); } - - consistencyCheck(); - - if (log.isDebugEnabled()) - log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" + - fullMapString() + ']'); - } - finally { - lock.writeLock().unlock(); - - ctx.database().checkpointReadUnlock(); } } - - // Wait for evictions. - waitForRent(); + finally { - cctx.shared().database().checkpointReadUnlock(); ++ ctx.database().checkpointReadUnlock(); + } } + /** + * @param p Partition number. + * @param topVer Topology version. + * @return {@code True} if given partition belongs to local node. + */ + private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) { + return grp.affinity().nodes(p, topVer).contains(ctx.localNode()); + } + /** {@inheritDoc} */ @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { treatAllPartAsLoc = false; - boolean changed = waitForRent(); + boolean changed = false; - int num = cctx.affinity().partitions(); + int num = grp.affinity().partitions(); AffinityTopologyVersion topVer = exchFut.topologyVersion(); @@@ -522,9 -640,9 +542,9 @@@ long updateSeq = this.updateSeq.incrementAndGet(); for (int p = 0; p < num; p++) { - GridDhtLocalPartition locPart = localPartition(p, topVer, false, false); + GridDhtLocalPartition locPart = localPartition0(p, topVer, false, false, false); - if (cctx.affinity().partitionLocalNode(p, topVer)) { + if (partitionLocalNode(p, topVer)) { // This partition will be created during next topology event, // which obviously has not happened at this point. if (locPart == null) { @@@ -866,8 -980,8 +890,8 @@@ for (UUID nodeId : nodeIds) { HashSet<UUID> affIds = affAssignment.getIds(p); - if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING, RENTING)) { + if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING)) { - ClusterNode n = cctx.discovery().node(nodeId); + ClusterNode n = ctx.discovery().node(nodeId); if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { if (nodes == null) { @@@ -1113,13 -1225,15 +1141,13 @@@ } } - part2node = p2n; - boolean changed = false; - AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion affVer = grp.affinity().lastVersion(); - GridDhtPartitionMap nodeMap = partMap.get(cctx.localNodeId()); + GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId()); - if (nodeMap != null && cctx.shared().database().persistenceEnabled()) { + if (nodeMap != null && ctx.database().persistenceEnabled()) { for (Map.Entry<Integer, GridDhtPartitionState> e : nodeMap.entrySet()) { int p = e.getKey(); GridDhtPartitionState state = e.getValue(); @@@ -1185,10 -1259,8 +1213,10 @@@ } } + long updateSeq = this.updateSeq.incrementAndGet(); + if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { - List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); + List<List<ClusterNode>> aff = grp.affinity().assignments(topVer); changed |= checkEvictions(updateSeq, aff); @@@ -1531,18 -1599,9 +1563,18 @@@ GridDhtLocalPartition locPart = locParts.get(p); if (locPart != null) { - if (locPart.state() == OWNING && !owners.contains(cctx.localNodeId())) { - if (locPart.state() == OWNING && !owners.contains(ctx.localNodeId())) - locPart.moving(); ++ if (locPart.state() == OWNING && !owners.contains(ctx.localNodeId())) { + if (haveHistory) + locPart.moving(); + else { + locPart.rent(false); + + locPart.reload(true); + + result.add(cctx.localNodeId()); + } + + } } for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java index ef6a3b9,04a7e97..4a693bf --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java @@@ -89,9 -85,8 +89,9 @@@ public class GridDhtPartitionDemandMess * @param cp Message to copy from. * @param parts Partitions. */ - GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, Collection<Integer> parts, Map<Integer, Long> partsCntrs) { + GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, Collection<Integer> parts, + Map<Integer, Long> partsCntrs) { - cacheId = cp.cacheId; + grpId = cp.grpId; updateSeq = cp.updateSeq; topic = cp.topic; timeout = cp.timeout; http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 77a645e,c9a6525..485baee --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@@ -610,18 -594,11 +606,20 @@@ public class GridDhtPartitionDemander return; } - final GridDhtPartitionTopology top = cctx.dht().topology(); + final GridDhtPartitionTopology top = grp.topology(); + final boolean statsEnabled = cctx.config().isStatisticsEnabled(); + + if (statsEnabled) { + if (supply.estimatedKeysCount() != -1) + cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supply.estimatedKeysCount()); + + cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize()); + } + try { + AffinityAssignment aff = grp.affinity().cachedAffinity(topVer); + // Preload. for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) { int p = e.getKey(); @@@ -867,16 -836,13 +867,14 @@@ long updateSeq) { assert assigns != null; - this.exchFut = assigns.exchangeFuture(); - this.topVer = assigns.topologyVersion(); + exchFut = assigns.exchangeFuture(); + topVer = assigns.topologyVersion(); + - this.cctx = cctx; + this.grp = grp; this.log = log; - this.startedEvtSent = startedEvtSent; - this.stoppedEvtSent = stoppedEvtSent; this.updateSeq = updateSeq; + + ctx= grp.shared(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 0ff03f7,ce5f9ea..0c907f5 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@@ -295,16 -285,10 +300,16 @@@ class GridDhtPartitionSupplier IgniteRebalanceIterator iter; if (sctx == null || sctx.entryIt == null) { - iter = cctx.offheap().rebalanceIterator(part, d.topologyVersion(), - iter = grp.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part)); ++ iter = grp.offheap().rebalanceIterator(part, d.topologyVersion(), + d.isHistorical(part) ? d.partitionCounter(part) : null); + + if (!iter.historical()) { + assert !cctx.shared().database().persistenceEnabled() || !d.isHistorical(part); - if (!iter.historical()) s.clean(part); + } + else + assert cctx.shared().database().persistenceEnabled() && d.isHistorical(part); } else iter = (IgniteRebalanceIterator)sctx.entryIt; http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index 1cb32e3,5d02f3f..9f66491 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@@ -74,14 -75,12 +75,14 @@@ public class GridDhtPartitionSupplyMess private Map<Integer, CacheEntryInfoCollection> infos; /** Message size. */ - @GridDirectTransient private int msgSize; + /** Estimated keys count. */ + private long estimatedKeysCnt = -1; + /** * @param updateSeq Update sequence for this node. - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param topVer Topology version. * @param addDepInfo Deployment info flag. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index c75b0a2,6725773..65edd96 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@@ -57,11 -55,11 +57,12 @@@ import org.apache.ignite.internal.pagem import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; + import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; +import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.ClusterState; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; -import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@@ -714,16 -643,16 +733,16 @@@ public class GridDhtPartitionsExchangeF long updSeq = clientTop == null ? -1 : clientTop.lastUpdateSequence(); - GridDhtPartitionTopology top = cacheCtx.topology(); + GridDhtPartitionTopology top = grp.topology(); if (crd) { - boolean updateTop = exchId.topologyVersion().equals(cacheCtx.startTopologyVersion()); + boolean updateTop = exchId.topologyVersion().equals(grp.localStartVersion()); if (updateTop && clientTop != null) - top.update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false)); + top.update(this, clientTop.partitionMap(true), clientTop.updateCounters(false), Collections.<Integer>emptySet()); } - top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId())); + top.updateTopologyVersion(exchId, this, updSeq, cacheGroupStopping(grp.groupId())); } for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) @@@ -827,11 -756,12 +846,15 @@@ if (updateTop) { for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { - if (top.cacheId() == cacheCtx.cacheId()) { - cacheCtx.topology().update(this, - top.partitionMap(true), + if (top.groupId() == grp.groupId()) { + GridDhtPartitionFullMap fullMap = top.partitionMap(true); + + assert fullMap != null; + - grp.topology().update(exchId, fullMap, top.updateCounters(false)); ++ grp.topology().update(this, ++ fullMap, + top.updateCounters(false), + Collections.<Integer>emptySet()); break; } @@@ -869,18 -799,13 +892,18 @@@ assert !cctx.kernalContext().clientNode(); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - cacheCtx.preloader().onTopologyChanged(this); + grp.preloader().onTopologyChanged(this); } + cctx.database().releaseHistoryForPreloading(); + + // To correctly rebalance when persistence is enabled, it is necessary to reserve history within exchange. + partHistReserved = cctx.database().reserveHistoryForExchange(); + waitPartitionRelease(); boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null; @@@ -1212,12 -1138,9 +1248,12 @@@ @Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable err) { boolean realExchange = !dummy && !forcePreload; + if (!done.compareAndSet(false, true)) + return dummy; + if (err == null && realExchange) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; try { @@@ -1607,12 -1489,11 +1647,12 @@@ */ private void assignPartitionStates(GridDhtPartitionTopology top) { Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>(); + Map<Integer, Long> minCntrs = new HashMap<>(); - for (Map.Entry<UUID, GridDhtPartitionsAbstractMessage> e : msgs.entrySet()) { + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { - assert e.getValue().partitionUpdateCounters(top.cacheId()) != null; + assert e.getValue().partitionUpdateCounters(top.groupId()) != null; - for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.cacheId()).entrySet()) { + for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet()) { int p = e0.getKey(); UUID uuid = e.getKey(); @@@ -1768,12 -1592,10 +1816,12 @@@ try { assert crd.isLocal(); + assert partHistSuppliers.isEmpty(); + if (!crd.equals(discoCache.serverNodes().get(0))) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) - cacheCtx.topology().beforeExchange(this, !centralizedAff); + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (!grp.isLocal()) + grp.topology().beforeExchange(this, !centralizedAff); } } @@@ -1975,25 -1797,20 +2023,25 @@@ private void updatePartitionFullMap(GridDhtPartitionsFullMessage msg) { cctx.versions().onExchange(msg.lastVersion().order()); + assert partHistSuppliers.isEmpty(); + + partHistSuppliers.putAll(msg.partitionHistorySuppliers()); + for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { - Integer cacheId = entry.getKey(); + Integer grpId = entry.getKey(); - Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(cacheId); + Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(grpId); - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); - if (cacheCtx != null) - cacheCtx.topology().update(this, entry.getValue(), cntrMap, + if (grp != null) - grp.topology().update(exchId, entry.getValue(), cntrMap); ++ grp.topology().update(this, entry.getValue(), cntrMap, + msg.partsToReload(cctx.localNodeId(), cacheId)); else { ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldest != null && oldest.isLocal()) - cctx.exchange().clientTopology(cacheId, this).update(this, entry.getValue(), cntrMap, Collections.<Integer>emptySet()); - cctx.exchange().clientTopology(grpId, this).update(exchId, entry.getValue(), cntrMap); ++ cctx.exchange().clientTopology(grpId, this).update(this, entry.getValue(), cntrMap, Collections.<Integer>emptySet()); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 94ad21e,f9bc5df..b64a58c --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@@ -121,10 -102,13 +121,15 @@@ public class GridDhtPartitionsFullMessa assert id == null || topVer.equals(id.topologyVersion()); this.topVer = topVer; + this.partHistSuppliers = partHistSuppliers; + this.partsToReload = partsToReload; } + /** {@inheritDoc} */ + @Override public int handlerId() { + return 0; + } + /** * @param compress {@code True} if it is possible to use compression for message. */ @@@ -175,24 -161,27 +182,23 @@@ } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param cntrMap Partition update counters. */ - public void addPartitionUpdateCounters(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) { + public void addPartitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) { if (partCntrs == null) - partCntrs = new HashMap<>(); + partCntrs = new IgniteDhtPartitionCountersMap(); - partCntrs.putIfAbsent(cacheId, cntrMap); - if (!partCntrs.containsKey(grpId)) - partCntrs.put(grpId, cntrMap); ++ partCntrs.putIfAbsent(grpId, cntrMap); } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @return Partition update counters. */ - @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) { - if (partCntrs != null) { - return partCntrs.get(cacheId); - } + @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) { - if (partCntrs != null) { - Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId); - - return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap(); - } ++ if (partCntrs != null) ++ return partCntrs.get(grpId); return Collections.emptyMap(); } @@@ -414,25 -356,13 +420,25 @@@ writer.incrementState(); - case 9: + case 8: - if (!writer.writeByteArray("partsBytes", partsBytes)) + if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes)) return false; writer.incrementState(); - case 10: + case 9: + if (!writer.writeByteArray("partsBytes", partsBytes)) + return false; + + writer.incrementState(); + + case 11: + if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes)) + return false; + + writer.incrementState(); + + case 12: if (!writer.writeMessage("topVer", topVer)) return false; @@@ -478,31 -408,15 +484,31 @@@ reader.incrementState(); - case 9: + case 8: - partsBytes = reader.readByteArray("partsBytes"); + partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes"); if (!reader.isLastRead()) return false; reader.incrementState(); - case 10: + case 9: + partsBytes = reader.readByteArray("partsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: + partsToReloadBytes = reader.readByteArray("partsToReloadBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 12: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) http://git-wip-us.apache.org/repos/asf/ignite/blob/08404350/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 9222251,416b127..9e399f1 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@@ -375,13 -320,7 +380,13 @@@ public class GridDhtPartitionsSingleMes writer.incrementState(); - case 10: + case 9: + if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes)) + return false; + + writer.incrementState(); + + case 11: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; @@@ -435,15 -374,7 +440,15 @@@ reader.incrementState(); - case 10: + case 9: + partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead())
