ignite-5075 'logical' caches sharing the same 'physical' cache group
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7e45010b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7e45010b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7e45010b Branch: refs/heads/ignite-5267 Commit: 7e45010b4848d0a570995e6dc938875710d846d8 Parents: c7a7e64 Author: sboikov <[email protected]> Authored: Sun Jun 4 11:02:31 2017 +0300 Committer: sboikov <[email protected]> Committed: Sun Jun 4 11:02:37 2017 +0300 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 21 + .../managers/communication/GridIoManager.java | 12 +- .../internal/managers/discovery/DiscoCache.java | 30 +- .../discovery/GridDiscoveryManager.java | 178 +- .../ignite/internal/pagemem/PageUtils.java | 16 + .../pagemem/snapshot/SnapshotOperation.java | 18 +- .../pagemem/store/IgnitePageStoreManager.java | 50 +- .../pagemem/wal/record/CheckpointRecord.java | 22 +- .../MetaPageUpdatePartitionDataRecord.java | 14 +- .../affinity/GridAffinityAssignmentCache.java | 38 +- .../cache/CacheAffinitySharedManager.java | 627 +-- .../CacheClientReconnectDiscoveryData.java | 66 +- .../internal/processors/cache/CacheData.java | 22 +- .../processors/cache/CacheGroupContext.java | 964 +++++ .../processors/cache/CacheGroupData.java | 147 + .../processors/cache/CacheGroupDescriptor.java | 210 + .../cache/CacheJoinNodeDiscoveryData.java | 4 +- .../processors/cache/CacheMetricsImpl.java | 10 +- .../cache/CacheNodeCommonDiscoveryData.java | 19 + .../cache/CacheOffheapEvictionManager.java | 11 +- .../processors/cache/ClusterCachesInfo.java | 531 ++- .../cache/ClusterCachesReconnectResult.java | 61 + .../cache/DynamicCacheDescriptor.java | 26 + .../processors/cache/ExchangeActions.java | 86 +- .../processors/cache/GridCacheAdapter.java | 63 +- .../cache/GridCacheAffinityManager.java | 62 +- .../processors/cache/GridCacheAttributes.java | 21 +- .../cache/GridCacheClearAllRunnable.java | 2 +- .../cache/GridCacheConcurrentMap.java | 55 +- .../cache/GridCacheConcurrentMapImpl.java | 171 +- .../processors/cache/GridCacheContext.java | 167 +- .../processors/cache/GridCacheEntryInfo.java | 37 +- .../processors/cache/GridCacheEventManager.java | 36 - .../cache/GridCacheGroupIdMessage.java | 110 + .../processors/cache/GridCacheIdMessage.java | 117 + .../processors/cache/GridCacheIoManager.java | 282 +- .../cache/GridCacheLocalConcurrentMap.java | 56 +- .../processors/cache/GridCacheMapEntry.java | 76 +- .../processors/cache/GridCacheMessage.java | 97 +- .../GridCachePartitionExchangeManager.java | 298 +- .../processors/cache/GridCachePreloader.java | 23 +- .../cache/GridCachePreloaderAdapter.java | 46 +- .../processors/cache/GridCacheProcessor.java | 476 ++- .../cache/GridCacheSharedContext.java | 4 +- .../processors/cache/GridCacheTtlManager.java | 22 +- .../processors/cache/GridCacheUtils.java | 42 +- .../GridChangeGlobalStateMessageResponse.java | 20 +- .../processors/cache/GridNoStorageCacheMap.java | 30 +- .../cache/IgniteCacheOffheapManager.java | 169 +- .../cache/IgniteCacheOffheapManagerImpl.java | 1260 ++++-- .../cache/affinity/GridCacheAffinityImpl.java | 9 +- .../processors/cache/database/CacheDataRow.java | 5 - .../cache/database/CacheDataRowAdapter.java | 65 +- .../cache/database/CacheSearchRow.java | 5 + .../IgniteCacheDatabaseSharedManager.java | 9 +- .../database/IgniteCacheSnapshotManager.java | 14 +- .../processors/cache/database/MetaStore.java | 6 +- .../cache/database/MetadataStorage.java | 22 +- .../processors/cache/database/RowStore.java | 29 +- .../cache/database/tree/BPlusTree.java | 39 +- .../cache/database/tree/io/PageIO.java | 30 + .../tree/io/PagePartitionCountersIO.java | 175 + .../database/tree/io/PagePartitionMetaIO.java | 20 + .../distributed/GridCacheTtlUpdateRequest.java | 4 +- .../distributed/GridDistributedBaseMessage.java | 4 +- .../GridDistributedCacheAdapter.java | 13 +- .../GridDistributedTxFinishResponse.java | 28 +- .../GridDistributedTxRemoteAdapter.java | 2 + .../dht/GridCachePartitionedConcurrentMap.java | 76 +- .../dht/GridClientPartitionTopology.java | 25 +- .../dht/GridDhtAffinityAssignmentRequest.java | 10 +- .../dht/GridDhtAffinityAssignmentResponse.java | 12 +- .../dht/GridDhtAssignmentFetchFuture.java | 22 +- .../distributed/dht/GridDhtCacheAdapter.java | 363 +- .../distributed/dht/GridDhtCacheEntry.java | 16 +- .../cache/distributed/dht/GridDhtGetFuture.java | 8 +- .../distributed/dht/GridDhtGetSingleFuture.java | 39 +- .../distributed/dht/GridDhtLocalPartition.java | 444 ++- .../distributed/dht/GridDhtLockResponse.java | 2 +- .../dht/GridDhtPartitionTopology.java | 9 +- .../dht/GridDhtPartitionTopologyImpl.java | 278 +- .../dht/GridDhtTransactionalCacheAdapter.java | 37 +- .../dht/GridDhtTxFinishResponse.java | 14 +- .../dht/GridDhtTxOnePhaseCommitAckRequest.java | 16 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 4 +- .../dht/GridPartitionedGetFuture.java | 2 +- .../dht/GridPartitionedSingleGetFuture.java | 2 +- .../GridDhtAtomicAbstractUpdateRequest.java | 4 +- .../dht/atomic/GridDhtAtomicCache.java | 68 +- .../dht/atomic/GridDhtAtomicCacheEntry.java | 53 - .../GridDhtAtomicDeferredUpdateResponse.java | 4 +- .../dht/atomic/GridDhtAtomicNearResponse.java | 4 +- .../dht/atomic/GridDhtAtomicUpdateResponse.java | 5 +- .../GridNearAtomicAbstractUpdateRequest.java | 4 +- .../GridNearAtomicCheckUpdateRequest.java | 4 +- .../atomic/GridNearAtomicUpdateResponse.java | 4 +- .../dht/colocated/GridDhtColocatedCache.java | 24 +- .../colocated/GridDhtColocatedCacheEntry.java | 52 - .../dht/preloader/GridDhtForceKeysFuture.java | 15 +- .../dht/preloader/GridDhtForceKeysRequest.java | 4 +- .../dht/preloader/GridDhtForceKeysResponse.java | 8 +- .../GridDhtPartitionDemandMessage.java | 12 +- .../dht/preloader/GridDhtPartitionDemander.java | 262 +- .../dht/preloader/GridDhtPartitionSupplier.java | 73 +- .../GridDhtPartitionSupplyMessage.java | 116 +- .../GridDhtPartitionsAbstractMessage.java | 26 +- .../GridDhtPartitionsExchangeFuture.java | 259 +- .../preloader/GridDhtPartitionsFullMessage.java | 74 +- .../GridDhtPartitionsSingleMessage.java | 62 +- .../GridDhtPartitionsSingleRequest.java | 7 +- .../dht/preloader/GridDhtPreloader.java | 419 +- .../distributed/near/GridNearAtomicCache.java | 2 +- .../distributed/near/GridNearCacheAdapter.java | 16 +- .../distributed/near/GridNearGetRequest.java | 4 +- .../distributed/near/GridNearGetResponse.java | 4 +- .../near/GridNearOptimisticTxPrepareFuture.java | 21 +- .../near/GridNearSingleGetRequest.java | 4 +- .../near/GridNearSingleGetResponse.java | 6 +- .../near/GridNearTransactionalCache.java | 4 +- .../near/GridNearTxFinishResponse.java | 14 +- .../processors/cache/local/GridLocalCache.java | 21 +- .../local/atomic/GridLocalAtomicCache.java | 4 +- .../query/GridCacheDistributedQueryManager.java | 10 +- .../cache/query/GridCacheQueryManager.java | 4 +- .../cache/query/GridCacheQueryRequest.java | 3 +- .../cache/query/GridCacheQueryResponse.java | 4 +- .../CacheContinuousQueryBatchAck.java | 4 +- .../CacheContinuousQueryEventBuffer.java | 5 +- .../continuous/CacheContinuousQueryHandler.java | 66 +- .../CacheContinuousQueryListener.java | 20 + .../continuous/CacheContinuousQueryManager.java | 59 +- .../query/continuous/CounterSkipContext.java | 78 + .../cache/transactions/IgniteTxAdapter.java | 2 +- .../cache/transactions/IgniteTxEntry.java | 4 +- .../cache/transactions/IgniteTxHandler.java | 26 +- .../transactions/IgniteTxLocalAdapter.java | 2 + .../cache/transactions/TxLocksRequest.java | 20 +- .../cache/transactions/TxLocksResponse.java | 28 +- .../cluster/GridClusterStateProcessor.java | 9 +- .../processors/query/GridQueryProcessor.java | 4 +- .../internal/processors/query/QueryUtils.java | 29 +- .../visor/cache/VisorCachePartitionsTask.java | 2 +- .../CacheAtomicSingleMessageCountSelfTest.java | 2 +- .../cache/CacheDeferredDeleteQueueTest.java | 2 +- ...cheDhtLocalPartitionAfterRemoveSelfTest.java | 2 +- ...CacheExchangeMessageDuplicatedStateTest.java | 54 +- .../cache/CacheGroupsMetricsRebalanceTest.java | 140 + .../cache/CacheOffheapMapEntrySelfTest.java | 9 +- .../GridCacheConditionalDeploymentSelfTest.java | 18 + .../processors/cache/GridCacheLeakTest.java | 3 +- .../GridCacheOrderedPreloadingSelfTest.java | 14 +- .../cache/GridCacheTtlManagerSelfTest.java | 3 +- .../processors/cache/IgniteCacheGroupsTest.java | 3765 ++++++++++++++++++ .../cache/IgniteCachePeekModesAbstractTest.java | 2 +- .../processors/cache/IgniteCacheStartTest.java | 5 +- .../cache/IgniteOnePhaseCommitInvokeTest.java | 4 +- ...niteTopologyValidatorGridSplitCacheTest.java | 8 +- .../IgniteTxStoreExceptionAbstractSelfTest.java | 4 +- .../GridCacheBinaryObjectsAbstractSelfTest.java | 2 +- .../GridCacheQueueCleanupSelfTest.java | 13 +- .../GridCacheSetAbstractSelfTest.java | 17 +- .../GridCacheSetFailoverAbstractSelfTest.java | 6 +- .../IgnitePartitionedQueueNoBackupsTest.java | 6 +- .../IgnitePartitionedSetNoBackupsSelfTest.java | 6 +- .../CacheDiscoveryDataConcurrentJoinTest.java | 17 + .../distributed/CacheGroupsPreloadTest.java | 194 + .../CacheLateAffinityAssignmentTest.java | 6 +- .../IgniteCachePartitionLossPolicySelfTest.java | 19 +- .../IgniteCacheReadFromBackupTest.java | 5 +- ...sabledMultiNodeWithGroupFullApiSelfTest.java | 35 + .../atomic/IgniteCacheAtomicProtocolTest.java | 7 +- ...AtomicMultiNodeWithGroupFullApiSelfTest.java | 34 + ...nabledMultiNodeWithGroupFullApiSelfTest.java | 35 + .../near/GridCacheNearReadersSelfTest.java | 4 +- ...tionedMultiNodeWithGroupFullApiSelfTest.java | 34 + .../GridCacheReplicatedPreloadSelfTest.java | 3 +- .../IgniteCacheClientNearCacheExpiryTest.java | 20 +- .../expiry/IgniteCacheTtlCleanupSelfTest.java | 2 +- ...IgniteCacheJdbcBlobStoreNodeRestartTest.java | 3 + ...acheLocalAtomicWithGroupFullApiSelfTest.java | 34 + .../local/GridCacheLocalFullApiSelfTest.java | 1 - .../GridCacheLocalWithGroupFullApiSelfTest.java | 34 + ...nuousQueryConcurrentPartitionUpdateTest.java | 229 +- ...ContinuousQueryFailoverAbstractSelfTest.java | 7 +- .../TxOptimisticDeadlockDetectionTest.java | 2 +- .../TxPessimisticDeadlockDetectionTest.java | 2 +- .../loadtests/hashmap/GridCacheTestContext.java | 6 +- .../communication/GridCacheMessageSelfTest.java | 30 + .../ignite/testframework/GridTestUtils.java | 9 + .../testframework/junits/GridAbstractTest.java | 17 + .../junits/common/GridCommonAbstractTest.java | 64 + .../IgniteCacheFullApiSelfTestSuite.java | 13 + .../IgniteCacheMetricsSelfTestSuite.java | 3 + .../testsuites/IgniteCacheTestSuite3.java | 4 + .../testsuites/IgniteCacheTestSuite4.java | 2 + .../processors/query/h2/IgniteH2Indexing.java | 3 +- .../query/h2/database/H2PkHashIndex.java | 4 +- .../query/h2/database/H2RowFactory.java | 2 +- .../processors/query/h2/database/H2Tree.java | 6 +- .../query/h2/database/H2TreeIndex.java | 26 +- .../h2/twostep/GridReduceQueryExecutor.java | 12 +- .../cache/IgniteCacheGroupsSqlTest.java | 317 ++ .../cache/IgniteCacheNoClassQuerySelfTest.java | 23 +- .../IgniteCacheWithIndexingTestSuite.java | 3 + .../GridCacheDatabaseSharedManager.java | 298 +- .../cache/database/GridCacheOffheapManager.java | 490 ++- .../database/file/FilePageStoreManager.java | 202 +- .../cache/database/pagemem/PageMemoryEx.java | 8 +- .../cache/database/pagemem/PageMemoryImpl.java | 10 +- .../wal/serializer/RecordV1Serializer.java | 12 +- .../IgnitePersistentStoreCacheGroupsTest.java | 516 +++ ...IgnitePersistentStoreDataStructuresTest.java | 205 + ...tentStoreMultiNodePutGetRestartSelfTest.java | 2 - ...dexingAndGroupPutGetPersistenceSelfTest.java | 42 + .../database/pagemem/NoOpPageStoreManager.java | 22 +- .../ignite/testsuites/IgnitePdsTestSuite.java | 5 + .../ignite/testsuites/IgnitePdsTestSuite2.java | 5 +- modules/yardstick/pom.xml | 6 + .../yardstick/IgniteBenchmarkArguments.java | 39 +- .../org/apache/ignite/yardstick/IgniteNode.java | 23 +- .../cache/IgniteCacheAbstractBenchmark.java | 158 + .../yardstick/cache/IgniteGetAllBenchmark.java | 3 + .../cache/IgniteGetAllPutAllTxBenchmark.java | 2 + .../cache/IgniteGetAndPutBenchmark.java | 2 + .../cache/IgniteGetAndPutTxBenchmark.java | 2 + .../yardstick/cache/IgniteGetBenchmark.java | 35 +- .../IgniteGetEntriesPutAllTxBenchmark.java | 2 + .../yardstick/cache/IgniteInvokeBenchmark.java | 2 + .../cache/IgniteInvokeTxBenchmark.java | 2 + .../IgniteInvokeWithInjectionBenchmark.java | 2 + .../yardstick/cache/IgnitePutAllBenchmark.java | 2 + .../IgnitePutAllSerializableTxBenchmark.java | 2 + .../yardstick/cache/IgnitePutBenchmark.java | 2 + .../cache/IgnitePutGetBatchBenchmark.java | 2 + .../yardstick/cache/IgnitePutGetBenchmark.java | 2 + .../cache/IgnitePutGetEntryBenchmark.java | 2 + .../cache/IgnitePutGetEntryTxBenchmark.java | 2 + .../cache/IgnitePutGetTxBatchBenchmark.java | 2 + .../cache/IgnitePutGetTxBenchmark.java | 2 + ...IgnitePutIfAbsentIndexedValue1Benchmark.java | 2 + .../cache/IgnitePutIndexedValue1Benchmark.java | 2 + .../cache/IgnitePutIndexedValue2Benchmark.java | 2 + .../cache/IgnitePutIndexedValue8Benchmark.java | 2 + .../IgnitePutRandomValueSizeBenchmark.java | 2 + .../cache/IgnitePutRemoveBenchmark.java | 2 + .../yardstick/cache/IgnitePutTxBenchmark.java | 2 + .../cache/IgnitePutTxImplicitBenchmark.java | 2 + .../cache/IgnitePutTxPrimaryOnlyBenchmark.java | 2 + .../IgnitePutTxSkipLocalBackupBenchmark.java | 2 + .../cache/IgnitePutValue8Benchmark.java | 2 + .../IgniteReplaceIndexedValue1Benchmark.java | 2 + .../cache/IgniteScanQueryBenchmark.java | 88 + .../cache/IgniteSqlQueryBenchmark.java | 16 +- .../IgniteSqlQueryDistributedJoinBenchmark.java | 24 +- .../cache/IgniteSqlQueryJoinBenchmark.java | 14 +- .../cache/IgniteSqlQueryPutBenchmark.java | 8 +- .../IgniteSqlQueryPutSeparatedBenchmark.java | 4 + .../cache/jdbc/JdbcPutGetBenchmark.java | 4 +- .../IgniteCacheRandomOperationBenchmark.java | 46 +- .../yardstick/cache/load/model/ModelUtil.java | 6 +- 260 files changed, 14449 insertions(+), 4079 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 21f2fba..67b7590 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -202,6 +202,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Cache name. */ private String name; + /** Cache group name. */ + private String grpName; + /** Name of {@link MemoryPolicyConfiguration} for this cache */ private String memPlcName; @@ -408,6 +411,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { evictFilter = cc.getEvictionFilter(); evictPlc = cc.getEvictionPolicy(); expiryPolicyFactory = cc.getExpiryPolicyFactory(); + grpName = cc.getGroupName(); indexedTypes = cc.getIndexedTypes(); interceptor = cc.getInterceptor(); invalidate = cc.isInvalidate(); @@ -455,6 +459,23 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { } /** + * @return Cache group name. + */ + public String getGroupName() { + return grpName; + } + + /** + * @param grpName Cache group name. + * @return {@code this} for chaining. + */ + public CacheConfiguration<K, V> setGroupName(String grpName) { + this.grpName = grpName; + + return this; + } + + /** * Cache name or {@code null} if not provided, then this will be considered a default * cache which can be accessed via {@link Ignite#cache(String)} method. Otherwise, if name * is provided, the cache will be accessed via {@link Ignite#cache(String)} method. http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 698baf8..ea49dbe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1008,7 +1008,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } default: - assert plc >= 0 : "Negative policy: " + plc; + assert plc >= 0 : "Negative policy [plc=" + plc + ", msg=" + msg + ']'; if (isReservedGridIoPolicy(plc)) throw new IgniteCheckedException("Failed to process message with policy of reserved range. " + @@ -1153,10 +1153,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa pools.poolForPolicy(plc).execute(c); } catch (RejectedExecutionException e) { - U.error(log, "Failed to process regular message due to execution rejection. Will attempt to process " + - "message in the listener thread instead.", e); + if (!ctx.isStopping()) { + U.error(log, "Failed to process regular message due to execution rejection. Will attempt to process " + + "message in the listener thread instead.", e); - c.run(); + c.run(); + } + else if (log.isDebugEnabled()) + log.debug("Failed to process regular message due to execution rejection: " + msg); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java index 5247ac1..22c2d07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -69,7 +69,7 @@ public class DiscoCache { /** Affinity cache nodes by cache name. */ @GridToStringInclude - private final Map<Integer, List<ClusterNode>> affCacheNodes; + private final Map<Integer, List<ClusterNode>> cacheGrpAffNodes; /** Node map. */ private final Map<UUID, ClusterNode> nodeMap; @@ -91,7 +91,7 @@ public class DiscoCache { * @param allNodesWithCaches All nodes with at least one cache configured. * @param rmtNodesWithCaches Remote nodes with at least one cache configured. * @param allCacheNodes Cache nodes by cache name. - * @param affCacheNodes Affinity cache nodes by cache name. + * @param cacheGrpAffNodes Affinity nodes by cache group ID. * @param nodeMap Node map. * @param nearEnabledCaches Caches where at least one node has near cache enabled. * @param alives Alive nodes. @@ -105,7 +105,7 @@ public class DiscoCache { List<ClusterNode> allNodesWithCaches, List<ClusterNode> rmtNodesWithCaches, Map<Integer, List<ClusterNode>> allCacheNodes, - Map<Integer, List<ClusterNode>> affCacheNodes, + Map<Integer, List<ClusterNode>> cacheGrpAffNodes, Map<UUID, ClusterNode> nodeMap, Set<Integer> nearEnabledCaches, Set<UUID> alives) { @@ -118,7 +118,7 @@ public class DiscoCache { this.allNodesWithCaches = allNodesWithCaches; this.rmtNodesWithCaches = rmtNodesWithCaches; this.allCacheNodes = allCacheNodes; - this.affCacheNodes = affCacheNodes; + this.cacheGrpAffNodes = cacheGrpAffNodes; this.nodeMap = nodeMap; this.nearEnabledCaches = nearEnabledCaches; this.alives.addAll(alives); @@ -235,25 +235,11 @@ public class DiscoCache { } /** - * Gets all nodes that have cache with given ID and should participate in affinity calculation. With - * partitioned cache nodes with near-only cache do not participate in affinity node calculation. - * - * @param cacheName Cache name. - * @return Collection of nodes. - */ - public List<ClusterNode> cacheAffinityNodes(@Nullable String cacheName) { - return cacheAffinityNodes(CU.cacheId(cacheName)); - } - - /** - * Gets all nodes that have cache with given ID and should participate in affinity calculation. With - * partitioned cache nodes with near-only cache do not participate in affinity node calculation. - * - * @param cacheId Cache ID. - * @return Collection of nodes. + * @param grpId Cache group ID. + * @return All nodes that participate in affinity calculation. */ - public List<ClusterNode> cacheAffinityNodes(int cacheId) { - return emptyIfNull(affCacheNodes.get(cacheId)); + public List<ClusterNode> cacheGroupAffinityNodes(int grpId) { + return emptyIfNull(cacheGrpAffNodes.get(grpId)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 7c702c2..e144d9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -69,7 +69,7 @@ import org.apache.ignite.internal.managers.GridManagerAdapter; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; +import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics; import org.apache.ignite.internal.processors.security.SecurityContext; @@ -251,12 +251,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { new ConcurrentHashMap8<>(); /** Local node initialization event listeners. */ - private final Collection<IgniteInClosure<ClusterNode>> localNodeInitLsnrs = new ArrayList<>(); + private final Collection<IgniteInClosure<ClusterNode>> locNodeInitLsnrs = new ArrayList<>(); /** Map of dynamic cache filters. */ private Map<String, CachePredicate> registeredCaches = new HashMap<>(); /** */ + private Map<Integer, CacheGroupAffinity> registeredCacheGrps = new HashMap<>(); + + /** */ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); /** Received custom messages history. */ @@ -321,24 +324,55 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * + */ + public void onLocalNodeJoin() { + registeredCacheGrps.clear(); + registeredCaches.clear(); + } + + /** + * @param grpDesc Cache group descriptor. + * @param filter Node filter. + * @param cacheMode Cache mode. + */ + public void addCacheGroup(CacheGroupDescriptor grpDesc, IgnitePredicate<ClusterNode> filter, CacheMode cacheMode) { + CacheGroupAffinity old = registeredCacheGrps.put(grpDesc.groupId(), + new CacheGroupAffinity(grpDesc.cacheOrGroupName(), filter, cacheMode)); + + assert old == null : old; + } + + /** + * @param grpDesc Cache group descriptor. + */ + public void removeCacheGroup(CacheGroupDescriptor grpDesc) { + CacheGroupAffinity rmvd = registeredCacheGrps.remove(grpDesc.groupId()); + + assert rmvd != null : grpDesc.cacheOrGroupName(); + } + + /** * Adds dynamic cache filter. * + * @param grpId Cache group ID. * @param cacheName Cache name. - * @param filter Cache filter. * @param nearEnabled Near enabled flag. - * @param cacheMode Cache mode. */ public void setCacheFilter( + int grpId, String cacheName, - IgnitePredicate<ClusterNode> filter, - boolean nearEnabled, - CacheMode cacheMode + boolean nearEnabled ) { if (!registeredCaches.containsKey(cacheName)) { - if (cacheMode == CacheMode.REPLICATED) + CacheGroupAffinity grp = registeredCacheGrps.get(grpId); + + assert grp != null : "Failed to find cache group [grpId=" + grpId + ", cache=" + cacheName + ']'; + + if (grp.cacheMode == CacheMode.REPLICATED) nearEnabled = false; - - registeredCaches.put(cacheName, new CachePredicate(filter, nearEnabled, cacheMode)); + + registeredCaches.put(cacheName, new CachePredicate(grp, nearEnabled)); } } @@ -401,7 +435,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } - return res; + return res == null ? Collections.<String, Map<UUID,Boolean>>emptyMap() : res; } /** @@ -493,7 +527,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** {@inheritDoc} */ @Override public void onLocalNodeInitialized(ClusterNode locNode) { - for (IgniteInClosure<ClusterNode> lsnr : localNodeInitLsnrs) + for (IgniteInClosure<ClusterNode> lsnr : locNodeInitLsnrs) lsnr.apply(locNode); } @@ -639,6 +673,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { locJoin = new GridFutureAdapter<>(); registeredCaches.clear(); + registeredCacheGrps.clear(); for (AffinityTopologyVersion histVer : discoCacheHist.keySet()) { Object rmvd = discoCacheHist.remove(histVer); @@ -801,7 +836,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param lsnr Listener to add. */ public void addLocalNodeInitializedEventListener(IgniteInClosure<ClusterNode> lsnr) { - localNodeInitLsnrs.add(lsnr); + locNodeInitLsnrs.add(lsnr); } /** @@ -1742,27 +1777,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Gets cache nodes for cache with given name that participate in affinity calculation. - * - * @param cacheName Cache name. - * @param topVer Topology version. - * @return Collection of cache affinity nodes. - */ - public Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) { - int cacheId = CU.cacheId(cacheName); - - return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId); - } - - /** * Gets cache nodes for cache with given ID that participate in affinity calculation. * - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param topVer Topology version. * @return Collection of cache affinity nodes. */ - public Collection<ClusterNode> cacheAffinityNodes(int cacheId, AffinityTopologyVersion topVer) { - return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId); + public Collection<ClusterNode> cacheGroupAffinityNodes(int grpId, AffinityTopologyVersion topVer) { + return resolveDiscoCache(grpId, topVer).cacheGroupAffinityNodes(grpId); } /** @@ -1779,6 +1801,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * Checks if node is a data node for the given cache group. + * + * @param node Node to check. + * @param grpId Cache group ID. + * @return {@code True} if node is a cache data node. + */ + public boolean cacheGroupAffinityNode(ClusterNode node, int grpId) { + CacheGroupAffinity aff = registeredCacheGrps.get(grpId); + + return CU.affinityNode(node, aff.cacheFilter); + } + + /** * @param node Node to check. * @param cacheName Cache name. * @return {@code True} if node has near cache enabled. @@ -1825,7 +1860,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (!CU.isSystemCache(cacheName) && !CU.isIgfsCache(ctx.config(), cacheName) && pred != null && pred.cacheNode(node)) - caches.put(cacheName, pred.cacheMode); + caches.put(cacheName, pred.aff.cacheMode); } return caches; @@ -1845,21 +1880,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * Gets discovery cache for given topology version. * - * @param cacheId Cache ID (participates in exception message). + * @param grpId Cache group ID (participates in exception message). * @param topVer Topology version. * @return Discovery cache. */ - private DiscoCache resolveDiscoCache(int cacheId, AffinityTopologyVersion topVer) { + private DiscoCache resolveDiscoCache(int grpId, AffinityTopologyVersion topVer) { Snapshot snap = topSnap.get(); DiscoCache cache = AffinityTopologyVersion.NONE.equals(topVer) || topVer.equals(snap.topVer) ? snap.discoCache : discoCacheHist.get(topVer); if (cache == null) { - DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheId); + CacheGroupDescriptor desc = ctx.cache().cacheGroupDescriptors().get(grpId); throw new IgniteException("Failed to resolve nodes topology [" + - "cacheName=" + (desc != null ? desc.cacheConfiguration().getName() : "N/A") + + "cacheGrp=" + (desc != null ? desc.cacheOrGroupName() : "N/A") + ", topVer=" + topVer + ", history=" + discoCacheHist.keySet() + ", snap=" + snap + @@ -2073,7 +2108,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']'; Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size()); - Map<Integer, List<ClusterNode>> affCacheNodes = U.newHashMap(allNodes.size()); + Map<Integer, List<ClusterNode>> cacheGrpAffNodes = U.newHashMap(allNodes.size()); Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); @@ -2085,6 +2120,20 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']'; assert !node.isDaemon(); + for (Map.Entry<Integer, CacheGroupAffinity> e : registeredCacheGrps.entrySet()) { + CacheGroupAffinity grpAff = e.getValue(); + Integer grpId = e.getKey(); + + if (CU.affinityNode(node, grpAff.cacheFilter)) { + List<ClusterNode> nodes = cacheGrpAffNodes.get(grpId); + + if (nodes == null) + cacheGrpAffNodes.put(grpId, nodes = new ArrayList<>()); + + nodes.add(node); + } + } + for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) { String cacheName = entry.getKey(); CachePredicate filter = entry.getValue(); @@ -2100,9 +2149,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { addToMap(allCacheNodes, cacheName, node); - if (filter.dataNode(node)) - addToMap(affCacheNodes, cacheName, node); - if (filter.nearNode(node)) nearEnabledCaches.add(CU.cacheId(cacheName)); } @@ -2119,7 +2165,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { U.sealList(allNodesWithCaches), U.sealList(rmtNodesWithCaches), Collections.unmodifiableMap(allCacheNodes), - Collections.unmodifiableMap(affCacheNodes), + Collections.unmodifiableMap(cacheGrpAffNodes), Collections.unmodifiableMap(nodeMap), Collections.unmodifiableSet(nearEnabledCaches), alives); @@ -2732,32 +2778,60 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * + */ + private static class CacheGroupAffinity { + /** */ + private final String name; + + /** Nodes filter. */ + private final IgnitePredicate<ClusterNode> cacheFilter; + + /** Cache mode. */ + private final CacheMode cacheMode; + + /** + * @param name Name. + * @param cacheFilter Node filter. + * @param cacheMode Cache mode. + */ + CacheGroupAffinity( + String name, + IgnitePredicate<ClusterNode> cacheFilter, + CacheMode cacheMode) { + this.name = name; + this.cacheFilter = cacheFilter; + this.cacheMode = cacheMode; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "CacheGroupAffinity [name=" + name + ']'; + } + } + + /** * Cache predicate. */ private static class CachePredicate { /** Cache filter. */ - private final IgnitePredicate<ClusterNode> cacheFilter; + private final CacheGroupAffinity aff; /** If near cache is enabled on data nodes. */ private final boolean nearEnabled; - /** Cache mode. */ - private final CacheMode cacheMode; - /** Collection of client near nodes. */ private final ConcurrentHashMap<UUID, Boolean> clientNodes; /** - * @param cacheFilter Cache filter. + * @param aff Cache group affinity. * @param nearEnabled Near enabled flag. - * @param cacheMode Cache mode. */ - private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, CacheMode cacheMode) { - assert cacheFilter != null; + private CachePredicate(CacheGroupAffinity aff, boolean nearEnabled) { + assert aff != null; - this.cacheFilter = cacheFilter; + this.aff = aff; this.nearEnabled = nearEnabled; - this.cacheMode = cacheMode; clientNodes = new ConcurrentHashMap<>(); } @@ -2792,7 +2866,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if this node is a data node for given cache. */ public boolean dataNode(ClusterNode node) { - return CU.affinityNode(node, cacheFilter); + return CU.affinityNode(node, aff.cacheFilter); } /** @@ -2800,7 +2874,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if cache is accessible on the given node. */ public boolean cacheNode(ClusterNode node) { - return !node.isDaemon() && (CU.affinityNode(node, cacheFilter) || clientNodes.containsKey(node.id())); + return !node.isDaemon() && (CU.affinityNode(node, aff.cacheFilter) || clientNodes.containsKey(node.id())); } /** @@ -2808,7 +2882,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if near cache is present on the given nodes. */ public boolean nearNode(ClusterNode node) { - if (CU.affinityNode(node, cacheFilter)) + if (CU.affinityNode(node, aff.cacheFilter)) return nearEnabled; Boolean near = clientNodes.get(node.id()); http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java index f824368..3fa5954 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java @@ -138,6 +138,22 @@ public class PageUtils { /** * @param addr Address. * @param off Offset. + * @param bytes Bytes array. + * @param bytesOff Bytes array offset. + * @param len Length. + */ + public static void putBytes(long addr, int off, byte[] bytes, int bytesOff, int len) { + assert addr > 0 : addr; + assert off >= 0; + assert bytes != null; + assert bytesOff >= 0 && (bytesOff < bytes.length || bytes.length == 0) : bytesOff; + + GridUnsafe.copyMemory(bytes, GridUnsafe.BYTE_ARR_OFF + bytesOff, null, addr + off, len); + } + + /** + * @param addr Address. + * @param off Offset. * @param v Value. */ public static void putByte(long addr, int off, byte v) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java index 39a76dd..bdcc05a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java @@ -37,7 +37,10 @@ public class SnapshotOperation implements Serializable { */ private final long snapshotId; - /** */ + /** Cache group ids. */ + private final Set<Integer> cacheGrpIds; + + /** Cache names. */ private final Set<String> cacheNames; /** Message. */ @@ -49,6 +52,7 @@ public class SnapshotOperation implements Serializable { /** * @param type Type. * @param snapshotId Snapshot id. + * @param cacheGrpIds Cache group ids. * @param cacheNames Cache names. * @param msg * @param extraParam Additional parameter. @@ -56,12 +60,14 @@ public class SnapshotOperation implements Serializable { public SnapshotOperation( SnapshotOperationType type, long snapshotId, + Set<Integer> cacheGrpIds, Set<String> cacheNames, String msg, Object extraParam ) { this.type = type; this.snapshotId = snapshotId; + this.cacheGrpIds = cacheGrpIds; this.cacheNames = cacheNames; this.msg = msg; this.extraParam = extraParam; @@ -84,10 +90,17 @@ public class SnapshotOperation implements Serializable { } /** - * Cache names included to this snapshot. + * Cache group ids included to this snapshot. * * @return Cache names. */ + public Set<Integer> cacheGroupIds() { + return cacheGrpIds; + } + + /** + * Cache names included to this snapshot. + */ public Set<String> cacheNames() { return cacheNames; } @@ -170,6 +183,7 @@ public class SnapshotOperation implements Serializable { "type=" + type + ", snapshotId=" + snapshotId + ", cacheNames=" + cacheNames + + ", cacheGroupIds=" + cacheGrpIds + ", msg='" + msg + '\'' + ", extraParam=" + extraParam + '}'; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java index 0453ecb..11a3804 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java @@ -17,14 +17,14 @@ package org.apache.ignite.internal.pagemem.store; -import java.util.Set; +import java.nio.ByteBuffer; +import java.util.Map; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.pagemem.PageMemory; -import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheSharedManager; - -import java.nio.ByteBuffer; import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; /** @@ -44,39 +44,40 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh /** * Callback called when a cache is starting. * + * @param grpDesc Cache group descriptor. * @param ccfg Cache configuration of the cache being started. * @throws IgniteCheckedException If failed to handle cache start callback. */ - public void initializeForCache(CacheConfiguration ccfg) throws IgniteCheckedException; + public void initializeForCache(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) throws IgniteCheckedException; /** * Callback called when a cache is stopping. After this callback is invoked, no data associated with * the given cache will be stored on disk. * - * @param cacheCtx Cache context of the cache being stopped. + * @param grp Cache group being stopped. * @param destroy Flag indicating if the cache is being destroyed and data should be cleaned. * @throws IgniteCheckedException If failed to handle cache destroy callback. */ - public void shutdownForCache(GridCacheContext cacheCtx, boolean destroy) throws IgniteCheckedException; + public void shutdownForCacheGroup(CacheGroupContext grp, boolean destroy) throws IgniteCheckedException; /** * Callback called when a partition is created on the local node. * - * @param cacheId Cache ID where the partition is being created. + * @param grpId Cache group ID where the partition is being created. * @param partId ID of the partition being created. * @throws IgniteCheckedException If failed to handle partition create callback. */ - public void onPartitionCreated(int cacheId, int partId) throws IgniteCheckedException; + public void onPartitionCreated(int grpId, int partId) throws IgniteCheckedException; /** * Callback called when a partition for the given cache is evicted from the local node. * After this callback is invoked, no data associated with the partition will be stored on disk. * - * @param cacheId Cache ID of the evicted partition. + * @param grpId Cache group ID of the evicted partition. * @param partId Partition ID. * @throws IgniteCheckedException If failed to handle partition destroy callback. */ - public void onPartitionDestroyed(int cacheId, int partId, int tag) throws IgniteCheckedException; + public void onPartitionDestroyed(int grpId, int partId, int tag) throws IgniteCheckedException; /** * Reads a page for the given cache ID. Cache ID may be {@code 0} if the page is a meta page. @@ -89,17 +90,17 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh public void read(int cacheId, long pageId, ByteBuffer pageBuf) throws IgniteCheckedException; /** - * Checks if page exists. + * Checks if partition store exists. * - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param partId Partition ID. - * @return {@code True} if page exists. + * @return {@code True} if partition store exists. * @throws IgniteCheckedException If failed. */ - public boolean exists(int cacheId, int partId) throws IgniteCheckedException; + public boolean exists(int grpId, int partId) throws IgniteCheckedException; /** - * Reads a header of apage store. + * Reads a header of a page store. * * @param cacheId Cache ID. * @param partId Partition ID. @@ -174,19 +175,14 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh public long metaPageId(int cacheId); /** - * @return set of cache names which configurations were saved - */ - public Set<String> savedCacheNames(); - - /** - * @param cacheName Cache name. - * @return saved configuration for cache + * @return Saved cache configurations. + * @throws IgniteCheckedException If failed. */ - public CacheConfiguration readConfiguration(String cacheName); + public Map<String, CacheConfiguration> readCacheConfigurations() throws IgniteCheckedException; /** - * @param cacheId Cache ID. - * @return {@code True} if index store for given cache existed before node started. + * @param grpId Cache group ID. + * @return {@code True} if index store for given cache group existed before node started. */ - public boolean hasIndexStore(int cacheId); + public boolean hasIndexStore(int grpId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CheckpointRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CheckpointRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CheckpointRecord.java index 7aaf1c5..cfcd62a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CheckpointRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CheckpointRecord.java @@ -35,7 +35,7 @@ public class CheckpointRecord extends WALRecord { private boolean end; /** */ - private Map<Integer, CacheState> cacheStates; + private Map<Integer, CacheState> cacheGrpStates; /** Safe replay pointer. */ private WALPointer cpMark; @@ -65,28 +65,28 @@ public class CheckpointRecord extends WALRecord { } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param state Cache state. */ - public void addCacheState(int cacheId, CacheState state) { - if (cacheStates == null) - cacheStates = new HashMap<>(); + public void addCacheGroupState(int grpId, CacheState state) { + if (cacheGrpStates == null) + cacheGrpStates = new HashMap<>(); - cacheStates.put(cacheId, state); + cacheGrpStates.put(grpId, state); } /** - * @param cacheStates Cache states. + * @param cacheGrpStates Cache states. */ - public void cacheStates(Map<Integer, CacheState> cacheStates) { - this.cacheStates = cacheStates; + public void cacheGroupStates(Map<Integer, CacheState> cacheGrpStates) { + this.cacheGrpStates = cacheGrpStates; } /** * @return Cache states. */ - public Map<Integer, CacheState> cacheStates() { - return cacheStates != null ? cacheStates : Collections.<Integer, CacheState>emptyMap(); + public Map<Integer, CacheState> cacheGroupStates() { + return cacheGrpStates != null ? cacheGrpStates : Collections.<Integer, CacheState>emptyMap(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java index ef57c46..b28dd52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java @@ -41,6 +41,9 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord { /** */ private int allocatedIdxCandidate; + /** */ + private long cntrsPageId; + /** * @param cacheId Cache ID. * @param pageId Page ID. @@ -52,7 +55,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord { long updateCntr, long globalRmvId, int partSize, - byte state, + long cntrsPageId, byte state, int allocatedIdxCandidate ) { super(cacheId, pageId); @@ -62,6 +65,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord { this.partSize = partSize; this.state = state; this.allocatedIdxCandidate = allocatedIdxCandidate; + this.cntrsPageId = cntrsPageId; } /** @@ -86,6 +90,13 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord { } /** + * @return Partition size. + */ + public long countersPageId() { + return cntrsPageId; + } + + /** * @return Partition state */ public byte state() { @@ -99,6 +110,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord { io.setUpdateCounter(pageAddr, updateCntr); io.setGlobalRemoveId(pageAddr, globalRmvId); io.setSize(pageAddr, partSize); + io.setCountersPageId(pageAddr, cntrsPageId); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 4830f06..b478462 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -60,11 +60,11 @@ public class GridAffinityAssignmentCache { /** Cleanup history size. */ private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 500); - /** Cache name. */ - private final String cacheName; + /** Group name if specified or cache name. */ + private final String cacheOrGrpName; - /** */ - private final int cacheId; + /** Group ID. */ + private final int grpId; /** Number of backups. */ private final int backups; @@ -115,7 +115,8 @@ public class GridAffinityAssignmentCache { * Constructs affinity cached calculations. * * @param ctx Kernal context. - * @param cacheName Cache name. + * @param cacheOrGrpName Cache or cache group name. + * @param grpId Group ID. * @param aff Affinity function. * @param nodeFilter Node filter. * @param backups Number of backups. @@ -123,7 +124,8 @@ public class GridAffinityAssignmentCache { */ @SuppressWarnings("unchecked") public GridAffinityAssignmentCache(GridKernalContext ctx, - String cacheName, + String cacheOrGrpName, + int grpId, AffinityFunction aff, IgnitePredicate<ClusterNode> nodeFilter, int backups, @@ -132,16 +134,16 @@ public class GridAffinityAssignmentCache { assert ctx != null; assert aff != null; assert nodeFilter != null; + assert grpId != 0; this.ctx = ctx; this.aff = aff; this.nodeFilter = nodeFilter; - this.cacheName = cacheName; + this.cacheOrGrpName = cacheOrGrpName; + this.grpId = grpId; this.backups = backups; this.locCache = locCache; - cacheId = CU.cacheId(cacheName); - log = ctx.log(GridAffinityAssignmentCache.class); partsCnt = aff.partitions(); @@ -161,17 +163,17 @@ public class GridAffinityAssignmentCache { } /** - * @return Cache name. + * @return Group name if it is specified, otherwise cache name. */ - public String cacheName() { - return cacheName; + public String cacheOrGroupName() { + return cacheOrGrpName; } /** - * @return Cache ID. + * @return Cache group ID. */ - public int cacheId() { - return cacheId; + public int groupId() { + return grpId; } /** @@ -269,7 +271,7 @@ public class GridAffinityAssignmentCache { List<ClusterNode> sorted; if (!locCache) { - sorted = new ArrayList<>(discoCache.cacheAffinityNodes(cacheId())); + sorted = new ArrayList<>(discoCache.cacheGroupAffinityNodes(groupId())); Collections.sort(sorted, GridNodeOrderComparator.INSTANCE); } @@ -432,7 +434,7 @@ public class GridAffinityAssignmentCache { */ public void dumpDebugInfo() { if (!readyFuts.isEmpty()) { - U.warn(log, "Pending affinity ready futures [cache=" + cacheName + ", lastVer=" + lastVersion() + "]:"); + U.warn(log, "Pending affinity ready futures [grp=" + cacheOrGrpName + ", lastVer=" + lastVersion() + "]:"); for (AffinityReadyFuture fut : readyFuts.values()) U.warn(log, ">>> " + fut); @@ -461,7 +463,7 @@ public class GridAffinityAssignmentCache { if (cache == null) { throw new IllegalStateException("Getting affinity for topology version earlier than affinity is " + "calculated [locNode=" + ctx.discovery().localNode() + - ", cache=" + cacheName + + ", grp=" + cacheOrGrpName + ", topVer=" + topVer + ", head=" + head.get().topologyVersion() + ", history=" + affCache.keySet() +
