http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 4bd2372..625fb7d 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -65,6 +65,8 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; +import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; +import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheExplicitLockSpan; @@ -1731,4 +1733,66 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { return U.field(tm, "completedVersHashMap"); } + + /** + * + */ + protected final void checkCacheDiscoveryDataConsistent() { + Map<Integer, CacheGroupDescriptor> cacheGrps = null; + Map<String, DynamicCacheDescriptor> caches = null; + + for (Ignite node : G.allGrids()) { + Map<Integer, CacheGroupDescriptor> cacheGrps0 = + ((IgniteKernal)node).context().cache().cacheGroupDescriptors(); + Map<String, DynamicCacheDescriptor> caches0 = + ((IgniteKernal)node).context().cache().cacheDescriptors(); + + assertNotNull(cacheGrps0); + assertNotNull(caches0); + + if (cacheGrps == null) { + cacheGrps = cacheGrps0; + caches = caches0; + } + else { + assertEquals(cacheGrps.size(), cacheGrps0.size()); + + for (Map.Entry<Integer, CacheGroupDescriptor> e : cacheGrps.entrySet()) { + CacheGroupDescriptor desc = e.getValue(); + CacheGroupDescriptor desc0 = cacheGrps0.get(e.getKey()); + + assertNotNull(desc0); + checkGroupDescriptorsData(desc, desc0); + } + + for (Map.Entry<String, DynamicCacheDescriptor> e : caches.entrySet()) { + DynamicCacheDescriptor desc = e.getValue(); + DynamicCacheDescriptor desc0 = caches.get(e.getKey()); + + assertNotNull(desc0); + assertEquals(desc.deploymentId(), desc0.deploymentId()); + assertEquals(desc.receivedFrom(), desc0.receivedFrom()); + assertEquals(desc.startTopologyVersion(), desc0.startTopologyVersion()); + assertEquals(desc.cacheConfiguration().getName(), desc0.cacheConfiguration().getName()); + assertEquals(desc.cacheConfiguration().getGroupName(), desc0.cacheConfiguration().getGroupName()); + checkGroupDescriptorsData(desc.groupDescriptor(), desc0.groupDescriptor()); + } + } + } + } + + /** + * @param desc First descriptor. + * @param desc0 Second descriptor. + */ + private void checkGroupDescriptorsData(CacheGroupDescriptor desc, CacheGroupDescriptor desc0) { + assertEquals(desc.groupName(), desc0.groupName()); + assertEquals(desc.sharedGroup(), desc0.sharedGroup()); + assertEquals(desc.deploymentId(), desc0.deploymentId()); + assertEquals(desc.receivedFrom(), desc0.receivedFrom()); + assertEquals(desc.startTopologyVersion(), desc0.startTopologyVersion()); + assertEquals(desc.config().getName(), desc0.config().getName()); + assertEquals(desc.config().getGroupName(), desc0.config().getGroupName()); + assertEquals(desc.caches(), desc0.caches()); + } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java index 710b2a1..11a4a10 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePart import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledMultiNodeP2PDisabledFullApiSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledMultiNodeWithGroupFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledOnheapFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledOnheapMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.CachePartitionedMultiNodeLongTxTimeoutFullApiTest; @@ -38,7 +39,9 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAto import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicLateAffDisabledMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicMultiNodeWithGroupFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicNearEnabledMultiNodeFullApiSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicNearEnabledMultiNodeWithGroupFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicNearOnlyMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicNearOnlyMultiNodeP2PDisabledFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicOnheapFullApiSelfTest; @@ -58,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePar import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedMultiNodeCounterSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedMultiNodeP2PDisabledFullApiSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedMultiNodeWithGroupFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedNearOnlyNoPrimaryFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedOnheapFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedOnheapMultiNodeFullApiSelfTest; @@ -71,8 +75,10 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCa import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedMultiNodeP2PDisabledFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedNearOnlyMultiNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.local.GridCacheLocalAtomicFullApiSelfTest; +import org.apache.ignite.internal.processors.cache.local.GridCacheLocalAtomicWithGroupFullApiSelfTest; import org.apache.ignite.internal.processors.cache.local.GridCacheLocalFullApiMultithreadedSelfTest; import org.apache.ignite.internal.processors.cache.local.GridCacheLocalFullApiSelfTest; +import org.apache.ignite.internal.processors.cache.local.GridCacheLocalWithGroupFullApiSelfTest; /** * Test suite for cache API. @@ -161,6 +167,13 @@ public class IgniteCacheFullApiSelfTestSuite extends TestSuite { // Other. suite.addTestSuite(GridCacheClearSelfTest.class); + suite.addTestSuite(GridCacheLocalWithGroupFullApiSelfTest.class); + suite.addTestSuite(GridCacheLocalAtomicWithGroupFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicMultiNodeWithGroupFullApiSelfTest.class); + suite.addTestSuite(GridCacheAtomicNearEnabledMultiNodeWithGroupFullApiSelfTest.class); + suite.addTestSuite(GridCachePartitionedMultiNodeWithGroupFullApiSelfTest.class); + suite.addTestSuite(GridCachePartitionedNearDisabledMultiNodeWithGroupFullApiSelfTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java index ebcf1df..d3471ca 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.CacheGroupsMetricsRebalanceTest; import org.apache.ignite.internal.processors.cache.CacheMetricsForClusterGroupSelfTest; import org.apache.ignite.internal.processors.cache.OffheapCacheMetricsForClusterGroupSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPartitionedMetricsSelfTest; @@ -57,6 +58,8 @@ public class IgniteCacheMetricsSelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheAtomicPartitionedTckMetricsSelfTestImpl.class); suite.addTestSuite(GridCacheAtomicLocalTckMetricsSelfTestImpl.class); + suite.addTestSuite(CacheGroupsMetricsRebalanceTest.class); + // Cluster wide metrics. suite.addTestSuite(CacheMetricsForClusterGroupSelfTest.class); suite.addTestSuite(OffheapCacheMetricsForClusterGroupSelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java index 222ac30..feb2cdf 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java @@ -32,9 +32,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheValueConsistencyTran import org.apache.ignite.internal.processors.cache.GridCacheValueConsistencyTransactionalSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheVersionSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheVersionTopologyChangeTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheGroupsTest; import org.apache.ignite.internal.processors.cache.IgniteCacheInterceptorSelfTestSuite; import org.apache.ignite.internal.processors.cache.IgniteCacheScanPredicateDeploymentSelfTest; import org.apache.ignite.internal.processors.cache.distributed.CacheAsyncOperationsTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheGroupsPreloadTest; import org.apache.ignite.internal.processors.cache.distributed.GridCacheMixedModeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteTxGetAfterStopTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDaemonNodePartitionedSelfTest; @@ -195,6 +197,8 @@ public class IgniteCacheTestSuite3 extends TestSuite { suite.addTestSuite(CacheAsyncOperationsTest.class); + suite.addTestSuite(IgniteCacheGroupsTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 23f8445..1b35acb 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -88,6 +88,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarl import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest; import org.apache.ignite.internal.processors.cache.distributed.CacheDiscoveryDataConcurrentJoinTest; import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheGroupsPreloadTest; import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest; import org.apache.ignite.internal.processors.cache.distributed.CacheStartOnJoinTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest; @@ -305,6 +306,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(GridCacheDhtTxPreloadSelfTest.class); suite.addTestSuite(GridCacheNearTxPreloadSelfTest.class); suite.addTestSuite(GridReplicatedTxPreloadTest.class); + suite.addTestSuite(CacheGroupsPreloadTest.class); suite.addTestSuite(IgniteDynamicCacheFilterTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 63e56ca..41176e1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1734,7 +1734,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { IgniteCacheOffheapManager offheapMgr = cctx.isNear() ? cctx.near().dht().context().offheap() : cctx.offheap(); for (int p = 0; p < cctx.affinity().partitions(); p++) { - try (GridCloseableIterator<KeyCacheObject> keyIter = offheapMgr.keysIterator(p)) { + try (GridCloseableIterator<KeyCacheObject> keyIter = offheapMgr.cacheKeysIterator(cctx.cacheId(), p)) { while (keyIter.hasNext()) { cctx.shared().database().checkpointReadLock(); @@ -2283,5 +2283,4 @@ public class IgniteH2Indexing implements GridQueryIndexing { private interface ClIter<X> extends AutoCloseable, Iterator<X> { // No-op. } - } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java index 7caf354..0440615 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java @@ -108,7 +108,7 @@ public class H2PkHashIndex extends GridH2IndexBase { List<GridCursor<? extends CacheDataRow>> cursors = new ArrayList<>(); for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores()) - cursors.add(store.cursor(lowerObj, upperObj)); + cursors.add(store.cursor(cctx.cacheId(), lowerObj, upperObj)); return new H2Cursor(new CompositeGridCursor<>(cursors.iterator()), p); } @@ -126,7 +126,7 @@ public class H2PkHashIndex extends GridH2IndexBase { @Override public GridH2Row findOne(GridH2Row row) { try { for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores()) { - CacheDataRow found = store.find(row.key); + CacheDataRow found = store.find(cctx, row.key); if (found != null) tbl.rowDescriptor().createRow(row.key(), row.partition(), row.value(), row.version(), 0); http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java index 2024c36..86b2749 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java @@ -59,7 +59,7 @@ public class H2RowFactory { final CacheDataRowAdapter rowBuilder = new CacheDataRowAdapter(link); - rowBuilder.initFromLink(cctx, CacheDataRowAdapter.RowData.FULL); + rowBuilder.initFromLink(cctx.group(), CacheDataRowAdapter.RowData.FULL); GridH2Row row; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java index f673717..f2b8bad 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java @@ -63,7 +63,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { /** * @param name Tree name. * @param reuseList Reuse list. - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param pageMem Page memory. * @param wal Write ahead log manager. * @param rowStore Row data store. @@ -74,7 +74,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { protected H2Tree( String name, ReuseList reuseList, - int cacheId, + int grpId, PageMemory pageMem, IgniteWriteAheadLogManager wal, AtomicLong globalRmvId, @@ -85,7 +85,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> { List<InlineIndexHelper> inlineIdxs, int inlineSize ) throws IgniteCheckedException { - super(name, cacheId, pageMem, wal, globalRmvId, metaPageId, reuseList); + super(name, grpId, pageMem, wal, globalRmvId, metaPageId, reuseList); if (!initNew) { // Page is ready - read inline size from it. http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java index c1c1d9c..b84daf1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java @@ -24,7 +24,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.database.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.database.RootPage; import org.apache.ignite.internal.processors.cache.database.tree.BPlusTree; import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO; @@ -100,8 +99,6 @@ public class H2TreeIndex extends GridH2IndexBase { name = BPlusTree.treeName(name, "H2Tree"); if (cctx.affinityNode()) { - IgniteCacheDatabaseSharedManager dbMgr = cctx.shared().database(); - inlineIdxs = getAvailableInlineColumns(cols); segments = new H2Tree[segmentsCnt]; @@ -110,8 +107,9 @@ public class H2TreeIndex extends GridH2IndexBase { RootPage page = getMetaPage(name, i); segments[i] = new H2Tree( - name,cctx.offheap().reuseListForIndex(name), - cctx.cacheId(), + name, + cctx.offheap().reuseListForIndex(name), + cctx.groupId(), cctx.memoryPolicy().pageMemory(), cctx.shared().wal(), cctx.offheap().globalRemoveId(), @@ -326,10 +324,12 @@ public class H2TreeIndex extends GridH2IndexBase { @Override public void destroy() { try { if (cctx.affinityNode()) { - for (H2Tree tree : segments) { + for (int i = 0; i < segments.length; i++) { + H2Tree tree = segments[i]; + tree.destroy(); - cctx.offheap().dropRootPageForIndex(tree.getName()); + dropMetaPage(tree.getName(), i); } } } @@ -413,10 +413,20 @@ public class H2TreeIndex extends GridH2IndexBase { /** * @param name Name. + * @param segIdx Segment index. * @return RootPage for meta page. * @throws IgniteCheckedException If failed. */ private RootPage getMetaPage(String name, int segIdx) throws IgniteCheckedException { - return cctx.offheap().rootPageForIndex(name + "%" + segIdx); + return cctx.offheap().rootPageForIndex(cctx.cacheId(), name + "%" + segIdx); + } + + /** + * @param name Name. + * @param segIdx Segment index. + * @throws IgniteCheckedException If failed. + */ + private void dropMetaPage(String name, int segIdx) throws IgniteCheckedException { + cctx.offheap().dropRootPageForIndex(cctx.cacheId(), name + "%" + segIdx); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 2d12635..c1cbada 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -1026,12 +1026,12 @@ public class GridReduceQueryExecutor { } /** - * @param cacheName Cache name. + * @param grpId Cache group ID. * @param topVer Topology version. * @return Collection of data nodes. */ - private Collection<ClusterNode> dataNodes(String cacheName, AffinityTopologyVersion topVer) { - Collection<ClusterNode> res = ctx.discovery().cacheAffinityNodes(cacheName, topVer); + private Collection<ClusterNode> dataNodes(int grpId, AffinityTopologyVersion topVer) { + Collection<ClusterNode> res = ctx.discovery().cacheGroupAffinityNodes(grpId, topVer); return res != null ? res : Collections.<ClusterNode>emptySet(); } @@ -1047,7 +1047,7 @@ public class GridReduceQueryExecutor { String cacheName = cctx.name(); - Set<ClusterNode> dataNodes = new HashSet<>(dataNodes(cacheName, NONE)); + Set<ClusterNode> dataNodes = new HashSet<>(dataNodes(cctx.groupId(), NONE)); if (dataNodes.isEmpty()) throw new CacheException("Failed to find data nodes for cache: " + cacheName); @@ -1110,7 +1110,7 @@ public class GridReduceQueryExecutor { continue; } - else if (!F.isEmpty(dataNodes(cctx.name(), NONE))) + else if (!F.isEmpty(dataNodes(cctx.groupId(), NONE))) return null; // Retry. throw new CacheException("Failed to find data nodes [cache=" + cctx.name() + ", part=" + p + "]"); @@ -1139,7 +1139,7 @@ public class GridReduceQueryExecutor { continue; // Skip unmapped partitions. if (F.isEmpty(owners)) { - if (!F.isEmpty(dataNodes(extraCctx.name(), NONE))) + if (!F.isEmpty(dataNodes(extraCctx.groupId(), NONE))) return null; // Retry. throw new CacheException("Failed to find data nodes [cache=" + extraCctx.name() + http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java new file mode 100644 index 0000000..b538449 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsSqlTest.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.List; +import java.util.concurrent.Callable; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.binary.AffinityKey; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheGroupsSqlTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String GROUP1 = "grp1"; + + /** */ + private static final String GROUP2 = "grp2"; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + startGridsMultiThreaded(3); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testSqlQuery() throws Exception { + Ignite node = ignite(0); + + IgniteCache c1 = node.createCache(personCacheConfiguration(GROUP1, "c1")); + IgniteCache c2 = node.createCache(personCacheConfiguration(GROUP1, "c2")); + + SqlFieldsQuery qry = new SqlFieldsQuery("select name from Person where name=?"); + qry.setArgs("p1"); + + assertEquals(0, c1.query(qry).getAll().size()); + assertEquals(0, c2.query(qry).getAll().size()); + + c1.put(1, new Person("p1")); + + assertEquals(1, c1.query(qry).getAll().size()); + assertEquals(0, c2.query(qry).getAll().size()); + + c2.put(2, new Person("p1")); + + assertEquals(1, c1.query(qry).getAll().size()); + assertEquals(1, c2.query(qry).getAll().size()); + } + + /** + * @throws Exception If failed. + */ + public void testJoinQuery1() throws Exception { + joinQuery(GROUP1, GROUP2, REPLICATED, PARTITIONED, TRANSACTIONAL, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testJoinQuery2() throws Exception { + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + joinQuery(GROUP1, GROUP1, REPLICATED, PARTITIONED, TRANSACTIONAL, TRANSACTIONAL); + return null; + } + }, IgniteCheckedException.class, "Cache mode mismatch for caches related to the same group"); + } + + /** + * @throws Exception If failed. + */ + public void testJoinQuery3() throws Exception { + joinQuery(GROUP1, GROUP1, PARTITIONED, PARTITIONED, TRANSACTIONAL, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testJoinQuery4() throws Exception { + joinQuery(GROUP1, GROUP1, REPLICATED, REPLICATED, ATOMIC, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testJoinQuery5() throws Exception { + joinQuery(GROUP1, null, REPLICATED, PARTITIONED, TRANSACTIONAL, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testJoinQuery6() throws Exception { + joinQuery(GROUP1, null, PARTITIONED, PARTITIONED, TRANSACTIONAL, ATOMIC); + } + + /** + * @param grp1 First cache group. + * @param grp2 Second cache group. + * @param cm1 First cache mode. + * @param cm2 Second cache mode. + * @param cam1 First cache atomicity mode. + * @param cam2 Second cache atomicity mode. + * @throws Exception If failed. + */ + private void joinQuery(String grp1, String grp2, CacheMode cm1, + CacheMode cm2, CacheAtomicityMode cam1, CacheAtomicityMode cam2) throws Exception { + int keys = 1000; + int accsPerPerson = 4; + + Ignite srv0 = ignite(0); + + IgniteCache pers = srv0.createCache(personCacheConfiguration(grp1, "pers") + .setAffinity(new RendezvousAffinityFunction().setPartitions(10)) + .setCacheMode(cm1) + .setAtomicityMode(cam1)); + + IgniteCache acc = srv0.createCache(accountCacheConfiguration(grp2, "acc") + .setAffinity(new RendezvousAffinityFunction().setPartitions(10)) + .setCacheMode(cm2) + .setAtomicityMode(cam2)); + + try(Transaction tx = cam1 == TRANSACTIONAL || cam2 == TRANSACTIONAL ? srv0.transactions().txStart() : null) { + for (int i = 0; i < keys; i++) { + + int pKey = i - (i % accsPerPerson); + + if (i % accsPerPerson == 0) + pers.put(pKey, new Person("pers-" + pKey)); + + + acc.put(new AffinityKey(i, pKey), new Account(pKey, "acc-" + i)); + } + + if (tx != null) + tx.commit(); + } + + Ignite node = ignite(2); + + SqlFieldsQuery qry = new SqlFieldsQuery( + "select p._key as p_key, p.name, a._key as a_key, a.personId, a.attr \n" + + "from \"pers\".Person p inner join \"acc\".Account a \n" + + "on (p._key = a.personId)"); + + IgniteCache<Object, Object> cache = node.cache("acc"); + + List<List<?>> res = cache.query(qry).getAll(); + + assertEquals(keys, res.size()); + + for (List<?> row : res) + assertEquals(row.get(0), row.get(3)); + } + + /** + * @param grpName Group name. + * @param cacheName Cache name. + * @return Person cache configuration. + */ + private CacheConfiguration personCacheConfiguration(String grpName, String cacheName) { + QueryEntity entity = new QueryEntity(); + + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Person.class.getName()); + entity.addQueryField("name", String.class.getName(), null); + + return cacheConfiguration(grpName, cacheName, entity); + } + + /** + * @param grpName Group name. + * @param cacheName Cache name. + * @return Account cache configuration. + */ + private CacheConfiguration accountCacheConfiguration(String grpName, String cacheName) { + QueryEntity entity = new QueryEntity(); + + entity.setKeyType(AffinityKey.class.getName()); + entity.setValueType(Account.class.getName()); + entity.addQueryField("personId", Integer.class.getName(), null); + entity.addQueryField("attr", String.class.getName(), null); + entity.setIndexes(F.asList(new QueryIndex("personId"))); + + return cacheConfiguration(grpName, cacheName, entity); + } + + /** + * @param grpName Group name. + * @param cacheName Cache name. + * @param queryEntity Query entity. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(String grpName, String cacheName, QueryEntity queryEntity) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setGroupName(grpName); + ccfg.setName(cacheName); + + ccfg.setQueryEntities(F.asList(queryEntity)); + + return ccfg; + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + String name; + + /** + * @param name Name. + */ + public Person(String name) { + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Person.class, this); + } + } + + /** + * + */ + private static class Account implements Serializable { + /** */ + Integer personId; + + /** */ + String attr; + + /** + * @param personId Person ID. + * @param attr Attribute (some data). + */ + public Account(Integer personId, String attr) { + this.personId = personId; + this.attr = attr; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Account.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoClassQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoClassQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoClassQuerySelfTest.java index e0148b3..770770e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoClassQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoClassQuerySelfTest.java @@ -20,13 +20,10 @@ package org.apache.ignite.internal.processors.cache; import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; -import org.apache.ignite.Ignite; -import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -44,23 +41,6 @@ public class IgniteCacheNoClassQuerySelfTest extends GridCommonAbstractTest { /** */ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - /** */ - protected Ignite ignite; - - /** - * @return Atomicity mode. - */ - protected CacheAtomicityMode atomicityMode() { - return TRANSACTIONAL; - } - - /** - * @return Distribution. - */ - protected NearCacheConfiguration nearCacheConfiguration() { - return new NearCacheConfiguration(); - } - /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { @@ -118,5 +98,8 @@ public class IgniteCacheNoClassQuerySelfTest extends GridCommonAbstractTest { catch (Exception e) { assertTrue(e.getMessage().contains("default marshaller")); } + finally { + stopAllGrids(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java index 0f4a418..794ec4d 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOffheapIndexEntryEvi import org.apache.ignite.internal.processors.cache.GridCacheOffheapIndexGetSelfTest; import org.apache.ignite.internal.processors.cache.GridIndexingWithNoopSwapSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheConfigurationPrimitiveTypesSelfTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheGroupsSqlTest; import org.apache.ignite.internal.processors.cache.IgniteCacheStarvationOnRebalanceTest; import org.apache.ignite.internal.processors.cache.IgniteClientReconnectQueriesTest; import org.apache.ignite.internal.processors.cache.ttl.CacheTtlAtomicLocalSelfTest; @@ -72,6 +73,8 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite { suite.addTestSuite(ClientReconnectAfterClusterRestartTest.class); + suite.addTestSuite(IgniteCacheGroupsSqlTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java index 0f75b0d..1b36451 100755 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java @@ -94,6 +94,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.ClusterState; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -214,7 +215,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private volatile GridFutureAdapter<Void> enableChangeApplied; /** */ - public ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); + private ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); /** */ private long checkpointFreq; @@ -409,25 +410,25 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (!reconnect && !cctx.kernalContext().clientNode() && cctx.kernalContext().state().active()) { Collection<String> cacheNames = new HashSet<>(); - for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration()) + // TODO IGNITE-5075 group descriptors. + for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration()) { if (CU.isSystemCache(ccfg.getName())) { - storeMgr.initializeForCache(ccfg); + storeMgr.initializeForCache(cctx.cache().cacheDescriptors().get(ccfg.getName()).groupDescriptor(), ccfg); cacheNames.add(ccfg.getName()); } + } for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration()) if (!CU.isSystemCache(ccfg.getName())) { - storeMgr.initializeForCache(ccfg); + storeMgr.initializeForCache(cctx.cache().cacheDescriptors().get(ccfg.getName()).groupDescriptor(), ccfg); cacheNames.add(ccfg.getName()); } - for (String name : cctx.pageStore().savedCacheNames()) { - CacheConfiguration ccfg = cctx.pageStore().readConfiguration(name); - - if (ccfg != null && !cacheNames.contains(name)) - storeMgr.initializeForCache(ccfg); + for (CacheConfiguration ccfg : cctx.pageStore().readCacheConfigurations().values()) { + if (!cacheNames.contains(ccfg.getName())) + storeMgr.initializeForCache(cctx.cache().cacheDescriptors().get(ccfg.getName()).groupDescriptor(), ccfg); } readCheckpointAndRestoreMemory(); @@ -548,7 +549,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** {@inheritDoc} */ - protected long[] calculateFragmentSizes(int concLvl, long cacheSize) { + private long[] calculateFragmentSizes(int concLvl, long cacheSize) { if (concLvl < 2) concLvl = Runtime.getRuntime().availableProcessors(); @@ -599,7 +600,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan FullPageId fullId, PageMemoryEx pageMem ) throws IgniteCheckedException { - snapshotMgr.onChangeTrackerPage(page,fullId,pageMem); + snapshotMgr.onChangeTrackerPage(page, fullId, pageMem); } }, this @@ -688,7 +689,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (cctx.kernalContext().query().moduleEnabled()) { for (GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) { if (cacheCtx.startTopologyVersion().equals(fut.topologyVersion()) && - !cctx.pageStore().hasIndexStore(cacheCtx.cacheId()) && cacheCtx.affinityNode()) { + !cctx.pageStore().hasIndexStore(cacheCtx.groupId()) && cacheCtx.affinityNode()) { final int cacheId = cacheCtx.cacheId(); final IgniteInternalFuture<?> rebuildFut = cctx.kernalContext().query() @@ -717,7 +718,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** {@inheritDoc} */ - @Override public void onCachesStopped(Collection<IgniteBiTuple<GridCacheContext, Boolean>> stoppedCtxs) { + @Override public void onCacheGroupsStopped( + Collection<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGrps) { try { waitForCheckpoint("caches stop"); } @@ -727,30 +729,30 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan Map<PageMemoryEx, Collection<Integer>> destroyed = new HashMap<>(); - for (IgniteBiTuple<GridCacheContext, Boolean> tup : stoppedCtxs) { + for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) { PageMemoryEx pageMem = (PageMemoryEx)tup.get1().memoryPolicy().pageMemory(); - Collection<Integer> cacheIds = destroyed.get(pageMem); + Collection<Integer> grpIds = destroyed.get(pageMem); - if (cacheIds == null) { - cacheIds = new HashSet<>(); + if (grpIds == null) { + grpIds = new HashSet<>(); - destroyed.put(pageMem, cacheIds); + destroyed.put(pageMem, grpIds); } - cacheIds.add(tup.get1().cacheId()); + grpIds.add(tup.get1().groupId()); - pageMem.onCacheDestroyed(tup.get1().cacheId()); + pageMem.onCacheGroupDestroyed(tup.get1().groupId()); } Collection<IgniteInternalFuture<Void>> clearFuts = new ArrayList<>(destroyed.size()); for (Map.Entry<PageMemoryEx, Collection<Integer>> entry : destroyed.entrySet()) { - final Collection<Integer> cacheIds = entry.getValue(); + final Collection<Integer> grpIds = entry.getValue(); clearFuts.add(entry.getKey().clearAsync(new P3<Integer, Long, Integer>() { - @Override public boolean apply(Integer cacheId, Long pageId, Integer tag) { - return cacheIds.contains(cacheId); + @Override public boolean apply(Integer grpId, Long pageId, Integer tag) { + return grpIds.contains(grpId); } }, false)); } @@ -764,15 +766,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } if (cctx.pageStore() != null) { - for (IgniteBiTuple<GridCacheContext, Boolean> tup : stoppedCtxs) { - GridCacheContext cacheCtx = tup.get1(); + for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) { + CacheGroupContext grp = tup.get1(); - try { - cctx.pageStore().shutdownForCache(cacheCtx, tup.get2()); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to gracefully clean page store resources for destroyed cache " + - "[cache=" + cacheCtx.name() + "]", e); + if (grp.affinityNode()) { + try { + cctx.pageStore().shutdownForCacheGroup(grp, tup.get2()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to gracefully clean page store resources for destroyed cache " + + "[cache=" + grp.cacheOrGroupName() + "]", e); + } } } } @@ -849,15 +853,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan checkpointLock.readLock().unlock(); if (checkpointer != null) { - Collection<GridCacheContext> cacheCtxs = context().cacheContexts(); + Collection<MemoryPolicy> memPlcs = context().database().memoryPolicies(); - for (GridCacheContext cacheCtx : cacheCtxs) { - PageMemoryEx mem = (PageMemoryEx) cacheCtx.memoryPolicy().pageMemory(); + if (memPlcs != null) { + for (MemoryPolicy memPlc : memPlcs) { + PageMemoryEx mem = (PageMemoryEx)memPlc.pageMemory(); - if (mem != null && !mem.safeToUpdate()) { - checkpointer.wakeupForCheckpoint(0, "too many dirty pages"); + if (mem != null && !mem.safeToUpdate()) { + checkpointer.wakeupForCheckpoint(0, "too many dirty pages"); - break; + break; + } } } } @@ -869,7 +875,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * @throws IgniteCheckedException If failed to restore database status from WAL. */ - public void restoreState() throws IgniteCheckedException { + private void restoreState() throws IgniteCheckedException { try { CheckpointStatus status = readCheckpointStatus(); @@ -899,27 +905,27 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan reservedForExchange = new HashMap<>(); - for (GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - for (GridDhtLocalPartition part : cacheCtx.topology().currentLocalPartitions()) { - if (part.state() != GridDhtPartitionState.OWNING || part.dataStore().size() <= walRebalanceThreshold) + for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) { + if (part.state() != GridDhtPartitionState.OWNING || part.dataStore().fullSize() <= walRebalanceThreshold) continue; - CheckpointEntry cpEntry = searchCheckpointEntry(cacheCtx, part.id(), null); + CheckpointEntry cpEntry = searchCheckpointEntry(grp.groupId(), part.id(), null); try { if (cpEntry != null && cctx.wal().reserve(cpEntry.cpMark)) { - Map<Integer, T2<Long, WALPointer>> cacheMap = reservedForExchange.get(cacheCtx.cacheId()); + Map<Integer, T2<Long, WALPointer>> cacheMap = reservedForExchange.get(grp.groupId()); if (cacheMap == null) { cacheMap = new HashMap<>(); - reservedForExchange.put(cacheCtx.cacheId(), cacheMap); + reservedForExchange.put(grp.groupId(), cacheMap); } - cacheMap.put(part.id(), new T2<>(cpEntry.partitionCounter(cacheCtx.cacheId(), part.id()), cpEntry.cpMark)); + cacheMap.put(part.id(), new T2<>(cpEntry.partitionCounter(grp.groupId(), part.id()), cpEntry.cpMark)); } } catch (IgniteCheckedException ex) { @@ -962,8 +968,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** {@inheritDoc} */ - @Override public boolean reserveHistoryForPreloading(int cacheId, int partId, long cntr) { - CheckpointEntry cpEntry = searchCheckpointEntry(cctx.cacheContext(cacheId), partId, cntr); + @Override public boolean reserveHistoryForPreloading(int grpId, int partId, long cntr) { + CheckpointEntry cpEntry = searchCheckpointEntry(grpId, partId, cntr); if (cpEntry == null) return false; @@ -985,7 +991,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } if (reserved) - reservedForPreloading.put(new T2<>(cacheId, partId), new T2<>(cntr, ptr)); + reservedForPreloading.put(new T2<>(grpId, partId), new T2<>(cntr, ptr)); return reserved; } @@ -1050,30 +1056,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * Schedules partition destroy during next checkpoint. This method must be called inside checkpoint read lock. - * - * @param cacheCtx Cache context. - * @param partId Partition ID. - */ - public void schedulePartitionDestroy(GridCacheContext<?, ?> cacheCtx, int partId) { - Checkpointer cp = checkpointer; - - if (cp != null) - cp.schedulePartitionDestroy(cacheCtx, partId); - } - - /** * Cancels partition destroy if it has not begun yet. Otherwise, will wait for cleanup to finish. * - * @param cacheCtx Cache context. + * @param grpId Cache group ID. * @param partId Partition ID. */ - public void cancelOrWaitPartitionDestroy(GridCacheContext<?, ?> cacheCtx, int partId) + void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException { Checkpointer cp = checkpointer; if (cp != null) - cp.cancelOrWaitPartitionDestroy(cacheCtx, partId); + cp.cancelOrWaitPartitionDestroy(grpId, partId); } @@ -1081,13 +1074,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * Tries to search for a WAL pointer for the given partition counter start. * - * @param cacheCtx Cache context. + * @param grpId Cache group ID. * @param part Partition ID. * @param partCntrSince Partition counter or {@code null} to search for minimal counter. * @return Checkpoint entry or {@code null} if failed to search. */ - @Nullable public WALPointer searchPartitionCounter(GridCacheContext cacheCtx, int part, @Nullable Long partCntrSince) { - CheckpointEntry entry = searchCheckpointEntry(cacheCtx, part, partCntrSince); + @Nullable public WALPointer searchPartitionCounter(int grpId, int part, @Nullable Long partCntrSince) { + CheckpointEntry entry = searchCheckpointEntry(grpId, part, partCntrSince); if (entry == null) return null; @@ -1098,12 +1091,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * Tries to search for a WAL pointer for the given partition counter start. * - * @param cacheCtx Cache context. + * @param grpId Cache group ID. * @param part Partition ID. * @param partCntrSince Partition counter or {@code null} to search for minimal counter. * @return Checkpoint entry or {@code null} if failed to search. */ - @Nullable private CheckpointEntry searchCheckpointEntry(GridCacheContext cacheCtx, int part, @Nullable Long partCntrSince) { + @Nullable private CheckpointEntry searchCheckpointEntry(int grpId, int part, @Nullable Long partCntrSince) { boolean hasGap = false; CheckpointEntry first = null; @@ -1111,7 +1104,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan try { CheckpointEntry entry = checkpointHist.entry(cpTs); - Long foundCntr = entry.partitionCounter(cacheCtx.cacheId(), part); + Long foundCntr = entry.partitionCounter(grpId, part); if (foundCntr != null) { if (partCntrSince == null) { @@ -1323,7 +1316,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan int cacheId = pageRec.fullPageId().cacheId(); long pageId = pageRec.fullPageId().pageId(); - PageMemoryEx pageMem = getPageMemoryForCacheId(cacheId); + PageMemoryEx pageMem = getPageMemoryForCacheGroup(cacheId); long page = pageMem.acquirePage(cacheId, pageId, true); @@ -1353,7 +1346,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan final int cId = destroyRec.cacheId(); final int pId = destroyRec.partitionId(); - PageMemoryEx pageMem = getPageMemoryForCacheId(cId); + PageMemoryEx pageMem = getPageMemoryForCacheGroup(cId); pageMem.clearAsync(new P3<Integer, Long, Integer>() { @Override public boolean apply(Integer cacheId, Long pageId, Integer tag) { @@ -1371,7 +1364,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan int cacheId = r.cacheId(); long pageId = r.pageId(); - PageMemoryEx pageMem = getPageMemoryForCacheId(cacheId); + PageMemoryEx pageMem = getPageMemoryForCacheGroup(cacheId); // Here we do not require tag check because we may be applying memory changes after // several repetitive restarts and the same pages may have changed several times. @@ -1416,20 +1409,21 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * Obtains PageMemory reference from cache descriptor instead of cache context. * - * @param cacheId Cache id. + * @param grpId Cache group id. * @return PageMemoryEx instance. * @throws IgniteCheckedException if no MemoryPolicy is configured for a name obtained from cache descriptor. */ - private PageMemoryEx getPageMemoryForCacheId(int cacheId) throws IgniteCheckedException { + private PageMemoryEx getPageMemoryForCacheGroup(int grpId) throws IgniteCheckedException { + // TODO IGNITE-5075: cache descriptor can be removed. GridCacheSharedContext sharedCtx = context(); String memPlcName = sharedCtx .cache() - .cacheDescriptor(cacheId) - .cacheConfiguration() + .cacheGroupDescriptors().get(grpId) + .config() .getMemoryPolicyName(); - return (PageMemoryEx) sharedCtx.database().memoryPolicy(memPlcName).pageMemory(); + return (PageMemoryEx)sharedCtx.database().memoryPolicy(memPlcName).pageMemory(); } /** @@ -1502,35 +1496,31 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private void restorePartitionState( Map<T2<Integer, Integer>, T2<Integer, Long>> partStates ) throws IgniteCheckedException { - Collection<GridCacheContext> cacheContexts = cctx.cacheContexts(); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + int grpId = grp.groupId(); - for (GridCacheContext context : cacheContexts) { - int cacheId = context.cacheId(); + PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory(); - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + for (int i = 0; i < grp.affinity().partitions(); i++) { + if (storeMgr.exists(grpId, i)) { + storeMgr.ensure(grpId, i); - PageMemoryEx pageMem = (PageMemoryEx)cacheCtx.memoryPolicy().pageMemory(); - - for (int i = 0; i < context.affinity().partitions(); i++) { - if (storeMgr.exists(cacheId, i)) { - storeMgr.ensure(cacheId, i); - - if (storeMgr.pages(cacheId, i) <= 1) + if (storeMgr.pages(grpId, i) <= 1) continue; - long partMetaId = pageMem.partitionMetaPageId(cacheId, i); - long partMetaPage = pageMem.acquirePage(cacheId, partMetaId); + long partMetaId = pageMem.partitionMetaPageId(grpId, i); + long partMetaPage = pageMem.acquirePage(grpId, partMetaId); try { - long pageAddr = pageMem.writeLock(cacheId, partMetaId, partMetaPage); + long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); boolean changed = false; try { PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr); - T2<Integer, Long> fromWal = partStates.get(new T2<>(cacheId, i)); + T2<Integer, Long> fromWal = partStates.get(new T2<>(grpId, i)); - GridDhtLocalPartition part = context.topology() + GridDhtLocalPartition part = grp.topology() .localPartition(i, AffinityTopologyVersion.NONE, true); assert part != null; @@ -1543,7 +1533,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan changed = updateState(part, stateId); if (stateId == GridDhtPartitionState.OWNING.ordinal()) { - cacheCtx.offheap().onPartitionInitialCounterUpdated(i, fromWal.get2()); + grp.offheap().onPartitionInitialCounterUpdated(i, fromWal.get2()); if (part.initialUpdateCounter() < fromWal.get2()) { part.initialUpdateCounter(fromWal.get2()); @@ -1556,11 +1546,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan changed = updateState(part, (int)io.getPartitionState(pageAddr)); } finally { - pageMem.writeUnlock(cacheId, partMetaId, partMetaPage, null, changed); + pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed); } } finally { - pageMem.releasePage(cacheId, partMetaId, partMetaPage); + pageMem.releasePage(grpId, partMetaId, partMetaPage); } } } @@ -1598,6 +1588,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan case CREATE: case UPDATE: cacheCtx.offheap().update( + cacheCtx, dataEntry.key(), dataEntry.value(), dataEntry.writeVersion(), @@ -1611,7 +1602,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan break; case DELETE: - cacheCtx.offheap().remove(dataEntry.key(), dataEntry.partitionId(), locPart); + cacheCtx.offheap().remove(cacheCtx, dataEntry.key(), dataEntry.partitionId(), locPart); if (dataEntry.partitionCounter() != 0) cacheCtx.offheap().onPartitionInitialCounterUpdated(dataEntry.partitionId(), dataEntry.partitionCounter()); @@ -1744,7 +1735,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan ch.force(true); return type == CheckpointEntryType.START ? - new CheckpointEntry(cpTs, ptr, cpId, rec.cacheStates()) : null; + new CheckpointEntry(cpTs, ptr, cpId, rec.cacheGroupStates()) : null; } catch (IOException e) { throw new IgniteCheckedException(e); @@ -1771,28 +1762,29 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan final Set<PageMemoryEx> pageMemSet = new HashSet<>(); for (PartitionDestroyRequest req : reqs) { - Collection<Integer> partIds = filterMap.get(req.cacheId); + Collection<Integer> partIds = filterMap.get(req.grpId); if (partIds == null) { partIds = new HashSet<>(); - filterMap.put(req.cacheId, partIds); + filterMap.put(req.grpId, partIds); } partIds.add(req.partId); - pageMemSet.add((PageMemoryEx)cctx.cacheContext(req.cacheId).memoryPolicy().pageMemory()); + // TODO IGNITE-5075. + pageMemSet.add((PageMemoryEx)cctx.cache().cacheGroup(req.grpId).memoryPolicy().pageMemory()); } for (PageMemoryEx pageMem : pageMemSet) { IgniteInternalFuture<Void> clearFut = pageMem.clearAsync(new P3<Integer, Long, Integer>() { - @Override public boolean apply(Integer cacheId, Long pageId, Integer tag) { - assert cacheId != null; + @Override public boolean apply(Integer grpId, Long pageId, Integer tag) { + assert grpId != null; assert pageId != null; int partId = PageIdUtils.partId(pageId); - Collection<Integer> parts = filterMap.get(cacheId); + Collection<Integer> parts = filterMap.get(grpId); return parts != null && parts.contains(partId); } @@ -1805,7 +1797,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan assert !req.allowFastEviction; // Tag should never grow in this case. - cctx.pageStore().onPartitionDestroyed(req.cacheId, req.partId, 1); + cctx.pageStore().onPartitionDestroyed(req.grpId, req.partId, 1); } catch (IgniteCheckedException e) { req.onDone(e); @@ -1932,36 +1924,36 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * @param cacheCtx Cache context. + * @param grp Cache group. * @param partId Partition ID. */ - private void schedulePartitionDestroy(GridCacheContext<?, ?> cacheCtx, int partId) { + private void schedulePartitionDestroy(CacheGroupContext grp, int partId) { synchronized (this) { - scheduledCp.destroyQueue.addDestroyRequest(cacheCtx, partId); + scheduledCp.destroyQueue.addDestroyRequest(grp, partId); } wakeupForCheckpoint(partDestroyCheckpointDelay, "partition destroy"); } /** - * @param cacheCtx Cache context. + * @param grpId Cache group ID. * @param partId Partition ID. */ - private void cancelOrWaitPartitionDestroy(GridCacheContext<?, ?> cacheCtx, int partId) + private void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException { CheckpointProgress cur = curCpProgress; PartitionDestroyRequest req; if (cur != null) { - req = cur.destroyQueue.cancelDestroy(cacheCtx.cacheId(), partId); + req = cur.destroyQueue.cancelDestroy(grpId, partId); if (req != null) req.waitCompleted(); } synchronized (this) { - req = scheduledCp.destroyQueue.cancelDestroy(cacheCtx.cacheId(), partId); + req = scheduledCp.destroyQueue.cancelDestroy(grpId, partId); } if (req != null) @@ -2060,7 +2052,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (req != null) { // Log destroy record before actual partition clear. - lastPtr = cctx.wal().log(new PartitionDestroyRecord(req.cacheId, req.partId)); + lastPtr = cctx.wal().log(new PartitionDestroyRecord(req.grpId, req.partId)); if (reqs == null) reqs = new ArrayList<>(); @@ -2178,18 +2170,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan for (DbCheckpointListener lsnr : lsnrs) lsnr.onCheckpointBegin(ctx0); - Collection<GridCacheContext> cacheCtxs = ((GridCacheSharedContext<Object, Object>)cctx).cacheContexts(); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) + continue; - for (GridCacheContext cacheCtx : cacheCtxs) { CacheState state = new CacheState(); - if (cacheCtx.isLocal()) - continue; - - for (GridDhtLocalPartition part : cacheCtx.topology().currentLocalPartitions()) - state.addPartitionState(part.id(), part.dataStore().size(), part.lastAppliedUpdate()); + for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) + state.addPartitionState(part.id(), part.dataStore().fullSize(), part.lastAppliedUpdate()); - cpRec.addCacheState(cacheCtx.cacheId(), state); + cpRec.addCacheGroupState(grp.groupId(), state); } if (curr.nextSnapshot) @@ -2359,14 +2349,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan snapshotMgr.beforePageWrite(fullId); - int cacheId = fullId.cacheId(); + int grpId = fullId.cacheId(); - GridCacheContext cacheCtx = context().cacheContext(cacheId); + CacheGroupContext grp = context().cache().cacheGroup(grpId); - if (cacheCtx == null) + if (grp == null) continue; - PageMemoryEx pageMem = (PageMemoryEx) cacheCtx.memoryPolicy().pageMemory(); + PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory(); Integer tag = pageMem.getForCheckpoint(fullId, tmpWriteBuf); @@ -2385,7 +2375,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan PageIO.setCrc(writeAddr, 0); - PageStore store = storeMgr.writeInternal(cacheId, fullId.pageId(), tmpWriteBuf, tag); + PageStore store = storeMgr.writeInternal(grpId, fullId.pageId(), tmpWriteBuf, tag); updStores.add(store); } @@ -2685,15 +2675,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan try { entry.initIfNeeded(cctx); - if (entry.cacheStates == null) + if (entry.cacheGrpStates == null) continue; - CacheState cacheState = entry.cacheStates.get(cacheId); + CacheState grpState = entry.cacheGrpStates.get(cacheId); - if (cacheState == null) + if (grpState == null) continue; - CacheState.PartitionState partState = cacheState.partitions().get(partId); + CacheState.PartitionState partState = grpState.partitions().get(partId); if (partState != null) { if (cctx.wal().reserve(entry.checkpointMark())) @@ -2734,7 +2724,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private UUID cpId; /** Cache states. Initialized lazily. */ - private Map<Integer, CacheState> cacheStates; + private Map<Integer, CacheState> cacheGrpStates; /** Initialization exception. */ private IgniteCheckedException initEx; @@ -2760,13 +2750,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @param cpTs Checkpoint timestamp. * @param cpMark Checkpoint mark pointer. * @param cpId Checkpoint ID. - * @param cacheStates Cache states. + * @param cacheGrpStates Cache groups states. */ - private CheckpointEntry(long cpTs, WALPointer cpMark, UUID cpId, Map<Integer, CacheState> cacheStates) { + private CheckpointEntry(long cpTs, WALPointer cpMark, UUID cpId, Map<Integer, CacheState> cacheGrpStates) { this.cpTs = cpTs; this.cpMark = cpMark; this.cpId = cpId; - this.cacheStates = cacheStates; + this.cacheGrpStates = cacheGrpStates; initGuard = 1; initLatch = new CountDownLatch(0); @@ -2808,17 +2798,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param part Partition ID. * @return Partition counter or {@code null} if not found. */ - private Long partitionCounter(int cacheId, int part) { + private Long partitionCounter(int grpId, int part) { assert initGuard != 0; - if (initEx != null || cacheStates == null) + if (initEx != null || cacheGrpStates == null) return null; - CacheState state = cacheStates.get(cacheId); + CacheState state = cacheGrpStates.get(grpId); if (state != null) { CacheState.PartitionState partState = state.partitions().get(part); @@ -2841,7 +2831,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan CheckpointRecord rec = (CheckpointRecord)tup.get2(); cpId = rec.checkpointId(); - cacheStates = rec.cacheStates(); + cacheGrpStates = rec.cacheGroupStates(); } else initEx = new IgniteCheckedException("Failed to find checkpoint record at " + @@ -2874,16 +2864,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan new ConcurrentHashMap<>(); /** - * @param cacheCtx Cache context. + * @param grp Cache group. * @param partId Partition ID to destroy. */ - private void addDestroyRequest(GridCacheContext<?, ?> cacheCtx, int partId) { - PartitionDestroyRequest req = new PartitionDestroyRequest(cacheCtx, partId); + private void addDestroyRequest(CacheGroupContext grp, int partId) { + PartitionDestroyRequest req = new PartitionDestroyRequest(grp, partId); - PartitionDestroyRequest old = pendingReqs.putIfAbsent(new T2<>(cacheCtx.cacheId(), partId), req); + PartitionDestroyRequest old = pendingReqs.putIfAbsent(new T2<>(grp.groupId(), partId), req); assert old == null : "Must wait for old destroy request to finish before adding a new one " + - "[cacheId=" + cacheCtx.cacheId() + ", cacheName=" + cacheCtx.name() + ", partId=" + partId + ']'; + "[grpId=" + grp.groupId() + ", name=" + grp.cacheOrGroupName() + ", partId=" + partId + ']'; } /** @@ -2913,10 +2903,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan */ private static class PartitionDestroyRequest { /** */ - private int cacheId; + private int grpId; /** */ - private String cacheName; + private String name; /** */ private int partId; @@ -2931,13 +2921,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private GridFutureAdapter<Void> destroyFut; /** - * @param cacheCtx Cache context. + * @param grp Cache group. * @param partId Partition ID. */ - private PartitionDestroyRequest(GridCacheContext<?, ?> cacheCtx, int partId) { - cacheId = cacheCtx.cacheId(); - cacheName = cacheCtx.name(); - allowFastEviction = cacheCtx.allowFastEviction(); + private PartitionDestroyRequest(CacheGroupContext grp, int partId) { + grpId = grp.groupId(); + name = grp.cacheOrGroupName(); + allowFastEviction = grp.allowFastEviction(); this.partId = partId; } @@ -3005,7 +2995,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** {@inheritDoc} */ @Override public String toString() { - return "PartitionDestroyRequest [cacheId=" + cacheId + ", cacheName=" + cacheName + + return "PartitionDestroyRequest [grpId=" + grpId + ", name=" + name + ", partId=" + partId + ']'; } }
