http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java index 08857ee..71c8014 100755 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java @@ -94,6 +94,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; import org.apache.ignite.internal.processors.cache.ClusterState; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -214,7 +215,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private volatile GridFutureAdapter<Void> enableChangeApplied; /** */ - public ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); + private ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); /** */ private long checkpointFreq; @@ -388,25 +389,25 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (!reconnect && !cctx.kernalContext().clientNode() && cctx.kernalContext().state().active()) { Collection<String> cacheNames = new HashSet<>(); - for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration()) + // TODO IGNITE-5075 group descriptors. + for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration()) { if (CU.isSystemCache(ccfg.getName())) { - storeMgr.initializeForCache(ccfg); + storeMgr.initializeForCache(cctx.cache().cacheDescriptors().get(ccfg.getName()).groupDescriptor(), ccfg); cacheNames.add(ccfg.getName()); } + } for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration()) if (!CU.isSystemCache(ccfg.getName())) { - storeMgr.initializeForCache(ccfg); + storeMgr.initializeForCache(cctx.cache().cacheDescriptors().get(ccfg.getName()).groupDescriptor(), ccfg); cacheNames.add(ccfg.getName()); } - for (String name : cctx.pageStore().savedCacheNames()) { - CacheConfiguration ccfg = cctx.pageStore().readConfiguration(name); - - if (ccfg != null && !cacheNames.contains(name)) - storeMgr.initializeForCache(ccfg); + for (CacheConfiguration ccfg : cctx.pageStore().readCacheConfigurations().values()) { + if (!cacheNames.contains(ccfg.getName())) + storeMgr.initializeForCache(cctx.cache().cacheDescriptors().get(ccfg.getName()).groupDescriptor(), ccfg); } readCheckpointAndRestoreMemory(); @@ -490,7 +491,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** {@inheritDoc} */ @Override public void onCacheStop(GridCacheContext cctx) { - snapshotMgr.onCacheStop(cctx); + snapshotMgr.onCacheStop(cctx); // TODO IGNITE-5075. } /** {@inheritDoc} */ @@ -527,7 +528,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** {@inheritDoc} */ - protected long[] calculateFragmentSizes(int concLvl, long cacheSize) { + private long[] calculateFragmentSizes(int concLvl, long cacheSize) { if (concLvl < 2) concLvl = Runtime.getRuntime().availableProcessors(); @@ -575,7 +576,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan FullPageId fullId, PageMemoryEx pageMem ) throws IgniteCheckedException { - snapshotMgr.onChangeTrackerPage(page,fullId,pageMem); + snapshotMgr.onChangeTrackerPage(page, fullId, pageMem); } }, this @@ -693,7 +694,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** {@inheritDoc} */ - @Override public void onCachesStopped(Collection<IgniteBiTuple<GridCacheContext, Boolean>> stoppedCtxs) { + @Override public void onCacheGroupsStopped( + Collection<IgniteBiTuple<CacheGroupInfrastructure, Boolean>> stoppedGrps) { try { waitForCheckpoint("caches stop"); } @@ -703,30 +705,30 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan Map<PageMemoryEx, Collection<Integer>> destroyed = new HashMap<>(); - for (IgniteBiTuple<GridCacheContext, Boolean> tup : stoppedCtxs) { + for (IgniteBiTuple<CacheGroupInfrastructure, Boolean> tup : stoppedGrps) { PageMemoryEx pageMem = (PageMemoryEx)tup.get1().memoryPolicy().pageMemory(); - Collection<Integer> cacheIds = destroyed.get(pageMem); + Collection<Integer> grpIds = destroyed.get(pageMem); - if (cacheIds == null) { - cacheIds = new HashSet<>(); + if (grpIds == null) { + grpIds = new HashSet<>(); - destroyed.put(pageMem, cacheIds); + destroyed.put(pageMem, grpIds); } - cacheIds.add(tup.get1().cacheId()); + grpIds.add(tup.get1().groupId()); - pageMem.onCacheDestroyed(tup.get1().cacheId()); + pageMem.onCacheGroupDestroyed(tup.get1().groupId()); } Collection<IgniteInternalFuture<Void>> clearFuts = new ArrayList<>(destroyed.size()); for (Map.Entry<PageMemoryEx, Collection<Integer>> entry : destroyed.entrySet()) { - final Collection<Integer> cacheIds = entry.getValue(); + final Collection<Integer> grpIds = entry.getValue(); clearFuts.add(entry.getKey().clearAsync(new P3<Integer, Long, Integer>() { - @Override public boolean apply(Integer cacheId, Long pageId, Integer tag) { - return cacheIds.contains(cacheId); + @Override public boolean apply(Integer grpId, Long pageId, Integer tag) { + return grpIds.contains(grpId); } }, false)); } @@ -740,15 +742,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } if (cctx.pageStore() != null) { - for (IgniteBiTuple<GridCacheContext, Boolean> tup : stoppedCtxs) { - GridCacheContext cacheCtx = tup.get1(); + for (IgniteBiTuple<CacheGroupInfrastructure, Boolean> tup : stoppedGrps) { + CacheGroupInfrastructure grp = tup.get1(); try { - cctx.pageStore().shutdownForCache(cacheCtx, tup.get2()); + cctx.pageStore().shutdownForCacheGroup(grp, tup.get2()); } catch (IgniteCheckedException e) { U.error(log, "Failed to gracefully clean page store resources for destroyed cache " + - "[cache=" + cacheCtx.name() + "]", e); + "[cache=" + grp.cacheOrGroupName() + "]", e); } } } @@ -825,15 +827,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan checkpointLock.readLock().unlock(); if (checkpointer != null) { - Collection<GridCacheContext> cacheCtxs = context().cacheContexts(); + Collection<MemoryPolicy> memPlcs = context().database().memoryPolicies(); - for (GridCacheContext cacheCtx : cacheCtxs) { - PageMemoryEx mem = (PageMemoryEx) cacheCtx.memoryPolicy().pageMemory(); + if (memPlcs != null) { + for (MemoryPolicy memPlc : memPlcs) { + PageMemoryEx mem = (PageMemoryEx)memPlc.pageMemory(); - if (mem != null && !mem.safeToUpdate()) { - checkpointer.wakeupForCheckpoint(0, "too many dirty pages"); + if (mem != null && !mem.safeToUpdate()) { + checkpointer.wakeupForCheckpoint(0, "too many dirty pages"); - break; + break; + } } } } @@ -875,34 +879,34 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan reservedForExchange = new HashMap<>(); - for (GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - for (GridDhtLocalPartition part : cacheCtx.topology().currentLocalPartitions()) { - if (part.state() != GridDhtPartitionState.OWNING || part.dataStore().size() <= ggWalRebalanceThreshold) + for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) { + if (part.state() != GridDhtPartitionState.OWNING || part.dataStore().fullSize() <= ggWalRebalanceThreshold) continue; for (Long cpTs : checkpointHist.checkpoints()) { try { CheckpointEntry entry = checkpointHist.entry(cpTs); - if (!entry.cacheStates.containsKey(cacheCtx.cacheId()) || - !entry.cacheStates.get(cacheCtx.cacheId()).partitions().containsKey(part.id())) + if (!entry.cacheGrpStates.containsKey(grp.groupId()) || + !entry.cacheGrpStates.get(grp.groupId()).partitions().containsKey(part.id())) continue; - WALPointer ptr = searchPartitionCounter(cacheCtx, part.id(), entry.checkpointTimestamp()); + WALPointer ptr = searchPartitionCounter(grp.groupId(), part.id(), entry.checkpointTimestamp()); if (ptr != null && cctx.wal().reserve(ptr)) { - Map<Integer, T2<Long, WALPointer>> cacheMap = reservedForExchange.get(cacheCtx.cacheId()); + Map<Integer, T2<Long, WALPointer>> cacheMap = reservedForExchange.get(grp.groupId()); if (cacheMap == null) { cacheMap = new HashMap<>(); - reservedForExchange.put(cacheCtx.cacheId(), cacheMap); + reservedForExchange.put(grp.groupId(), cacheMap); } - cacheMap.put(part.id(), new T2<>(entry.partitionCounter(cacheCtx.cacheId(), part.id()), ptr)); + cacheMap.put(part.id(), new T2<>(entry.partitionCounter(grp.groupId(), part.id()), ptr)); } } catch (IgniteCheckedException ex) { @@ -947,8 +951,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** {@inheritDoc} */ - @Override public boolean reserveHistoryForPreloading(int cacheId, int partId, long cntr) { - WALPointer ptr = searchPartitionCounter(cctx.cacheContext(cacheId), partId, cntr); + @Override public boolean reserveHistoryForPreloading(int grpId, int partId, long cntr) { + WALPointer ptr = searchPartitionCounter(grpId, partId, cntr); if (ptr == null) return false; @@ -965,7 +969,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } if (reserved) - reservedForPreloading.put(new T2<>(cacheId, partId), new T2<>(cntr, ptr)); + reservedForPreloading.put(new T2<>(grpId, partId), new T2<>(cntr, ptr)); return reserved; } @@ -1030,30 +1034,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * Schedules partition destroy during next checkpoint. This method must be called inside checkpoint read lock. - * - * @param cacheCtx Cache context. - * @param partId Partition ID. - */ - public void schedulePartitionDestroy(GridCacheContext<?, ?> cacheCtx, int partId) { - Checkpointer cp = checkpointer; - - if (cp != null) - cp.schedulePartitionDestroy(cacheCtx, partId); - } - - /** * Cancels partition destroy if it has not begun yet. Otherwise, will wait for cleanup to finish. * - * @param cacheCtx Cache context. + * @param grpId Cache group ID. * @param partId Partition ID. */ - public void cancelOrWaitPartitionDestroy(GridCacheContext<?, ?> cacheCtx, int partId) + void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException { Checkpointer cp = checkpointer; if (cp != null) - cp.cancelOrWaitPartitionDestroy(cacheCtx, partId); + cp.cancelOrWaitPartitionDestroy(grpId, partId); } @@ -1061,12 +1052,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * Tries to search for a WAL pointer for the given partition counter start. * - * @param cacheCtx Cache context. + * @param grpId Cache group ID. * @param part Partition ID. * @param partCntrSince Partition counter. * @return WAL pointer or {@code null} if failed to search. */ - public WALPointer searchPartitionCounter(GridCacheContext cacheCtx, int part, Long partCntrSince) { + WALPointer searchPartitionCounter(int grpId, int part, Long partCntrSince) { boolean hasGap = false; WALPointer first = null; @@ -1074,7 +1065,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan try { CheckpointEntry entry = checkpointHist.entry(cpTs); - Long foundCntr = entry.partitionCounter(cacheCtx.cacheId(), part); + Long foundCntr = entry.partitionCounter(grpId, part); if (foundCntr != null) { if (foundCntr <= partCntrSince) { @@ -1276,7 +1267,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan int cacheId = pageRec.fullPageId().cacheId(); long pageId = pageRec.fullPageId().pageId(); - PageMemoryEx pageMem = getPageMemoryForCacheId(cacheId); + PageMemoryEx pageMem = getPageMemoryForCacheGroup(cacheId); long page = pageMem.acquirePage(cacheId, pageId, true); @@ -1306,7 +1297,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan final int cId = destroyRec.cacheId(); final int pId = destroyRec.partitionId(); - PageMemoryEx pageMem = getPageMemoryForCacheId(cId); + PageMemoryEx pageMem = getPageMemoryForCacheGroup(cId); pageMem.clearAsync(new P3<Integer, Long, Integer>() { @Override public boolean apply(Integer cacheId, Long pageId, Integer tag) { @@ -1324,7 +1315,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan int cacheId = r.cacheId(); long pageId = r.pageId(); - PageMemoryEx pageMem = getPageMemoryForCacheId(cacheId); + PageMemoryEx pageMem = getPageMemoryForCacheGroup(cacheId); // Here we do not require tag check because we may be applying memory changes after // several repetitive restarts and the same pages may have changed several times. @@ -1369,17 +1360,19 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * Obtains PageMemory reference from cache descriptor instead of cache context. * - * @param cacheId Cache id. + * @param grpId Cache group id. * @return PageMemoryEx instance. * @throws IgniteCheckedException if no MemoryPolicy is configured for a name obtained from cache descriptor. */ - private PageMemoryEx getPageMemoryForCacheId(int cacheId) throws IgniteCheckedException { + private PageMemoryEx getPageMemoryForCacheGroup(int grpId) throws IgniteCheckedException { + // TODO IGNITE-5075: group ID should not change. + // TODO IGNITE-5075: cache descriptor can be removed. GridCacheSharedContext sharedCtx = context(); String memPlcName = sharedCtx .cache() - .cacheDescriptor(cacheId) - .cacheConfiguration() + .cacheGroupDescriptors().get(grpId) + .config() .getMemoryPolicyName(); return (PageMemoryEx) sharedCtx.database().memoryPolicy(memPlcName).pageMemory(); @@ -1455,35 +1448,31 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private void restorePartitionState( Map<T2<Integer, Integer>, T2<Integer, Long>> partStates ) throws IgniteCheckedException { - Collection<GridCacheContext> cacheContexts = cctx.cacheContexts(); - - for (GridCacheContext context : cacheContexts) { - int cacheId = context.cacheId(); + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + int grpId = grp.groupId(); - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory(); - PageMemoryEx pageMem = (PageMemoryEx)cacheCtx.memoryPolicy().pageMemory(); + for (int i = 0; i < grp.affinity().partitions(); i++) { + if (storeMgr.exists(grpId, i)) { + storeMgr.ensure(grpId, i); - for (int i = 0; i < context.affinity().partitions(); i++) { - if (storeMgr.exists(cacheId, i)) { - storeMgr.ensure(cacheId, i); - - if (storeMgr.pages(cacheId, i) <= 1) + if (storeMgr.pages(grpId, i) <= 1) continue; - long partMetaId = pageMem.partitionMetaPageId(cacheId, i); - long partMetaPage = pageMem.acquirePage(cacheId, partMetaId); + long partMetaId = pageMem.partitionMetaPageId(grpId, i); + long partMetaPage = pageMem.acquirePage(grpId, partMetaId); try { - long pageAddr = pageMem.writeLock(cacheId, partMetaId, partMetaPage); + long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); boolean changed = false; try { PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr); - T2<Integer, Long> fromWal = partStates.get(new T2<>(cacheId, i)); + T2<Integer, Long> fromWal = partStates.get(new T2<>(grpId, i)); - GridDhtLocalPartition part = context.topology() + GridDhtLocalPartition part = grp.topology() .localPartition(i, AffinityTopologyVersion.NONE, true); assert part != null; @@ -1496,7 +1485,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan changed = updateState(part, stateId); if (stateId == GridDhtPartitionState.OWNING.ordinal()) { - cacheCtx.offheap().onPartitionInitialCounterUpdated(i, fromWal.get2()); + grp.offheap().onPartitionInitialCounterUpdated(i, fromWal.get2()); if (part.initialUpdateCounter() < fromWal.get2()) { part.initialUpdateCounter(fromWal.get2()); @@ -1509,11 +1498,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan changed = updateState(part, (int)io.getPartitionState(pageAddr)); } finally { - pageMem.writeUnlock(cacheId, partMetaId, partMetaPage, null, changed); + pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed); } } finally { - pageMem.releasePage(cacheId, partMetaId, partMetaPage); + pageMem.releasePage(grpId, partMetaId, partMetaPage); } } } @@ -1551,6 +1540,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan case CREATE: case UPDATE: cacheCtx.offheap().update( + cacheCtx, dataEntry.key(), dataEntry.value(), dataEntry.writeVersion(), @@ -1564,7 +1554,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan break; case DELETE: - cacheCtx.offheap().remove(dataEntry.key(), dataEntry.partitionId(), locPart); + cacheCtx.offheap().remove(cacheCtx, dataEntry.key(), dataEntry.partitionId(), locPart); if (dataEntry.partitionCounter() != 0) cacheCtx.offheap().onPartitionInitialCounterUpdated(dataEntry.partitionId(), dataEntry.partitionCounter()); @@ -1697,7 +1687,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan ch.force(true); return type == CheckpointEntryType.START ? - new CheckpointEntry(cpTs, ptr, cpId, rec.cacheStates()) : null; + new CheckpointEntry(cpTs, ptr, cpId, rec.cacheGroupStates()) : null; } catch (IOException e) { throw new IgniteCheckedException(e); @@ -1724,28 +1714,29 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan final Set<PageMemoryEx> pageMemSet = new HashSet<>(); for (PartitionDestroyRequest req : reqs) { - Collection<Integer> partIds = filterMap.get(req.cacheId); + Collection<Integer> partIds = filterMap.get(req.grpId); if (partIds == null) { partIds = new HashSet<>(); - filterMap.put(req.cacheId, partIds); + filterMap.put(req.grpId, partIds); } partIds.add(req.partId); - pageMemSet.add((PageMemoryEx)cctx.cacheContext(req.cacheId).memoryPolicy().pageMemory()); + // TODO IGNITE-5075. + pageMemSet.add((PageMemoryEx)cctx.cache().cacheGroup(req.grpId).memoryPolicy().pageMemory()); } for (PageMemoryEx pageMem : pageMemSet) { IgniteInternalFuture<Void> clearFut = pageMem.clearAsync(new P3<Integer, Long, Integer>() { - @Override public boolean apply(Integer cacheId, Long pageId, Integer tag) { - assert cacheId != null; + @Override public boolean apply(Integer grpId, Long pageId, Integer tag) { + assert grpId != null; assert pageId != null; int partId = PageIdUtils.partId(pageId); - Collection<Integer> parts = filterMap.get(cacheId); + Collection<Integer> parts = filterMap.get(grpId); return parts != null && parts.contains(partId); } @@ -1758,7 +1749,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan assert !req.allowFastEviction; // Tag should never grow in this case. - cctx.pageStore().onPartitionDestroyed(req.cacheId, req.partId, 1); + cctx.pageStore().onPartitionDestroyed(req.grpId, req.partId, 1); } catch (IgniteCheckedException e) { req.onDone(e); @@ -1885,36 +1876,36 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * @param cacheCtx Cache context. + * @param grp Cache group. * @param partId Partition ID. */ - private void schedulePartitionDestroy(GridCacheContext<?, ?> cacheCtx, int partId) { + private void schedulePartitionDestroy(CacheGroupInfrastructure grp, int partId) { synchronized (this) { - scheduledCp.destroyQueue.addDestroyRequest(cacheCtx, partId); + scheduledCp.destroyQueue.addDestroyRequest(grp, partId); } wakeupForCheckpoint(partDestroyCheckpointDelay, "partition destroy"); } /** - * @param cacheCtx Cache context. + * @param grpId Cache group ID. * @param partId Partition ID. */ - private void cancelOrWaitPartitionDestroy(GridCacheContext<?, ?> cacheCtx, int partId) + private void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException { CheckpointProgress cur = curCpProgress; PartitionDestroyRequest req; if (cur != null) { - req = cur.destroyQueue.cancelDestroy(cacheCtx.cacheId(), partId); + req = cur.destroyQueue.cancelDestroy(grpId, partId); if (req != null) req.waitCompleted(); } synchronized (this) { - req = scheduledCp.destroyQueue.cancelDestroy(cacheCtx.cacheId(), partId); + req = scheduledCp.destroyQueue.cancelDestroy(grpId, partId); } if (req != null) @@ -2013,7 +2004,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (req != null) { // Log destroy record before actual partition clear. - lastPtr = cctx.wal().log(new PartitionDestroyRecord(req.cacheId, req.partId)); + lastPtr = cctx.wal().log(new PartitionDestroyRecord(req.grpId, req.partId)); if (reqs == null) reqs = new ArrayList<>(); @@ -2131,18 +2122,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan for (DbCheckpointListener lsnr : lsnrs) lsnr.onCheckpointBegin(ctx0); - Collection<GridCacheContext> cacheCtxs = ((GridCacheSharedContext<Object, Object>)cctx).cacheContexts(); + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) + continue; - for (GridCacheContext cacheCtx : cacheCtxs) { CacheState state = new CacheState(); - if (cacheCtx.isLocal()) - continue; + for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) + state.addPartitionState(part.id(), part.dataStore().fullSize(), part.lastAppliedUpdate()); - for (GridDhtLocalPartition part : cacheCtx.topology().currentLocalPartitions()) - state.addPartitionState(part.id(), part.dataStore().size(), part.lastAppliedUpdate()); - - cpRec.addCacheState(cacheCtx.cacheId(), state); + cpRec.addCacheGroupState(grp.groupId(), state); } if (curr.nextSnapshot) @@ -2638,15 +2627,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan try { entry.initIfNeeded(cctx); - if (entry.cacheStates == null) + if (entry.cacheGrpStates == null) continue; - CacheState cacheState = entry.cacheStates.get(cacheId); + CacheState grpState = entry.cacheGrpStates.get(cacheId); - if (cacheState == null) + if (grpState == null) continue; - CacheState.PartitionState partState = cacheState.partitions().get(partId); + CacheState.PartitionState partState = grpState.partitions().get(partId); if (partState != null) { if (cctx.wal().reserve(entry.checkpointMark())) @@ -2687,7 +2676,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private UUID cpId; /** Cache states. Initialized lazily. */ - private Map<Integer, CacheState> cacheStates; + private Map<Integer, CacheState> cacheGrpStates; /** Initialization exception. */ private IgniteCheckedException initEx; @@ -2713,13 +2702,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @param cpTs Checkpoint timestamp. * @param cpMark Checkpoint mark pointer. * @param cpId Checkpoint ID. - * @param cacheStates Cache states. + * @param cacheGrpStates Cache groups states. */ - private CheckpointEntry(long cpTs, WALPointer cpMark, UUID cpId, Map<Integer, CacheState> cacheStates) { + private CheckpointEntry(long cpTs, WALPointer cpMark, UUID cpId, Map<Integer, CacheState> cacheGrpStates) { this.cpTs = cpTs; this.cpMark = cpMark; this.cpId = cpId; - this.cacheStates = cacheStates; + this.cacheGrpStates = cacheGrpStates; initGuard = 1; initLatch = new CountDownLatch(0); @@ -2761,17 +2750,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param part Partition ID. * @return Partition counter or {@code null} if not found. */ - private Long partitionCounter(int cacheId, int part) { + private Long partitionCounter(int grpId, int part) { assert initGuard != 0; - if (initEx != null || cacheStates == null) + if (initEx != null || cacheGrpStates == null) return null; - CacheState state = cacheStates.get(cacheId); + CacheState state = cacheGrpStates.get(grpId); if (state != null) { CacheState.PartitionState partState = state.partitions().get(part); @@ -2794,7 +2783,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan CheckpointRecord rec = (CheckpointRecord)tup.get2(); cpId = rec.checkpointId(); - cacheStates = rec.cacheStates(); + cacheGrpStates = rec.cacheGroupStates(); } else initEx = new IgniteCheckedException("Failed to find checkpoint record at " + @@ -2827,16 +2816,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan new ConcurrentHashMap<>(); /** - * @param cacheCtx Cache context. + * @param grp Cache group. * @param partId Partition ID to destroy. */ - private void addDestroyRequest(GridCacheContext<?, ?> cacheCtx, int partId) { - PartitionDestroyRequest req = new PartitionDestroyRequest(cacheCtx, partId); + private void addDestroyRequest(CacheGroupInfrastructure grp, int partId) { + PartitionDestroyRequest req = new PartitionDestroyRequest(grp, partId); - PartitionDestroyRequest old = pendingReqs.putIfAbsent(new T2<>(cacheCtx.cacheId(), partId), req); + PartitionDestroyRequest old = pendingReqs.putIfAbsent(new T2<>(grp.groupId(), partId), req); assert old == null : "Must wait for old destroy request to finish before adding a new one " + - "[cacheId=" + cacheCtx.cacheId() + ", cacheName=" + cacheCtx.name() + ", partId=" + partId + ']'; + "[grpId=" + grp.groupId() + ", name=" + grp.cacheOrGroupName() + ", partId=" + partId + ']'; } /** @@ -2866,10 +2855,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan */ private static class PartitionDestroyRequest { /** */ - private int cacheId; + private int grpId; /** */ - private String cacheName; + private String name; /** */ private int partId; @@ -2884,13 +2873,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private GridFutureAdapter<Void> destroyFut; /** - * @param cacheCtx Cache context. + * @param grp Cache group. * @param partId Partition ID. */ - private PartitionDestroyRequest(GridCacheContext<?, ?> cacheCtx, int partId) { - cacheId = cacheCtx.cacheId(); - cacheName = cacheCtx.name(); - allowFastEviction = cacheCtx.allowFastEviction(); + private PartitionDestroyRequest(CacheGroupInfrastructure grp, int partId) { + grpId = grp.groupId(); + name = grp.cacheOrGroupName(); + allowFastEviction = grp.allowFastEviction(); this.partId = partId; } @@ -2958,7 +2947,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** {@inheritDoc} */ @Override public String toString() { - return "PartitionDestroyRequest [cacheId=" + cacheId + ", cacheName=" + cacheName + + return "PartitionDestroyRequest [grpId=" + grpId + ", name=" + name + ", partId=" + partId + ']'; } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java index 5253818..f8f91df 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.database; import java.util.Iterator; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteCheckedException; @@ -39,6 +40,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSna import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl; @@ -80,60 +82,71 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple RootPage reuseListRoot = metas.reuseListRoot; - reuseList = new ReuseListImpl(cctx.cacheId(), - cctx.name(), - cctx.memoryPolicy().pageMemory(), - cctx.shared().wal(), + reuseList = new ReuseListImpl(grp.groupId(), + grp.cacheOrGroupName(), + grp.memoryPolicy().pageMemory(), + ctx.wal(), reuseListRoot.pageId().pageId(), reuseListRoot.isAllocated()); RootPage metastoreRoot = metas.treeRoot; - metaStore = new MetadataStorage(cctx.memoryPolicy().pageMemory(), - cctx.shared().wal(), + metaStore = new MetadataStorage(grp.memoryPolicy().pageMemory(), + ctx.wal(), globalRemoveId(), - cctx.cacheId(), + grp.groupId(), PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX, reuseList, metastoreRoot.pageId().pageId(), metastoreRoot.isAllocated()); - if (cctx.shared().ttl().eagerTtlEnabled()) { - final String name = "PendingEntries"; + ((GridCacheDatabaseSharedManager)ctx.database()).addCheckpointListener(this); + } - RootPage pendingRootPage = metaStore.getOrAllocateForTree(name); + /** {@inheritDoc} */ + @Override public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException { + if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && pendingEntries == null) { + ctx.database().checkpointReadLock(); - pendingEntries = new PendingEntriesTree( - cctx, - name, - cctx.memoryPolicy().pageMemory(), - pendingRootPage.pageId().pageId(), - reuseList, - pendingRootPage.isAllocated() - ); + try { + final String name = "PendingEntries"; + + // TODO IGNITE-5075: per cache? + RootPage pendingRootPage = metaStore.getOrAllocateForTree(name); + + pendingEntries = new PendingEntriesTree( + grp, + name, + grp.memoryPolicy().pageMemory(), + pendingRootPage.pageId().pageId(), + reuseList, + pendingRootPage.isAllocated() + ); + } + finally { + ctx.database().checkpointReadUnlock(); + } } - - ((GridCacheDatabaseSharedManager)cctx.shared().database()).addCheckpointListener(this); } /** {@inheritDoc} */ @Override protected CacheDataStore createCacheDataStore0(final int p) throws IgniteCheckedException { - GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.shared().database(); + GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ctx.database(); - if (!cctx.allowFastEviction()) - dbMgr.cancelOrWaitPartitionDestroy(cctx, p); + if (!grp.allowFastEviction()) + dbMgr.cancelOrWaitPartitionDestroy(grp.groupId(), p); - boolean exists = cctx.shared().pageStore() != null - && cctx.shared().pageStore().exists(cctx.cacheId(), p); + boolean exists = ctx.pageStore() != null + && ctx.pageStore().exists(grp.groupId(), p); return new GridCacheDataStore(p, exists); } /** {@inheritDoc} */ @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException { - assert cctx.memoryPolicy().pageMemory() instanceof PageMemoryEx; + assert grp.memoryPolicy().pageMemory() instanceof PageMemoryEx; reuseList.saveMetadata(); @@ -166,12 +179,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple freeList.saveMetadata(); + // TODO IGNITE-5075. long updCntr = store.updateCounter(); - int size = store.size(); + int size = store.fullSize(); long rmvId = globalRemoveId().get(); - PageMemoryEx pageMem = (PageMemoryEx)cctx.memoryPolicy().pageMemory(); - IgniteWriteAheadLogManager wal = cctx.shared().wal(); + PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory(); + IgniteWriteAheadLogManager wal = this.ctx.wal(); if (size > 0 || updCntr > 0) { int state = -1; @@ -180,7 +194,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple state = GridDhtPartitionState.EVICTED.ordinal(); else { // localPartition will not acquire writeLock here because create=false. - GridDhtLocalPartition part = cctx.topology().localPartition(store.partId(), + GridDhtLocalPartition part = grp.topology().localPartition(store.partId(), AffinityTopologyVersion.NONE, false); if (part != null && part.state() != GridDhtPartitionState.EVICTED) @@ -191,12 +205,12 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (state == -1) return false; - int cacheId = cctx.cacheId(); - long partMetaId = pageMem.partitionMetaPageId(cacheId, store.partId()); - long partMetaPage = pageMem.acquirePage(cacheId, partMetaId); + int grpId = grp.groupId(); + long partMetaId = pageMem.partitionMetaPageId(grpId, store.partId()); + long partMetaPage = pageMem.acquirePage(grpId, partMetaId); try { - long pageAddr = pageMem.writeLock(cacheId, partMetaId, partMetaPage); + long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); try { PagePartitionMetaIO io = PageIO.getPageIO(pageAddr); @@ -210,52 +224,52 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple int pageCount; if (beforeSnapshot) { - pageCount = cctx.shared().pageStore().pages(cctx.cacheId(), store.partId()); + pageCount = this.ctx.pageStore().pages(grpId, store.partId()); io.setCandidatePageCount(pageAddr, pageCount); if (saveMeta) { - long metaPageId = pageMem.metaPageId(cctx.cacheId()); - long metaPage = pageMem.acquirePage(cctx.cacheId(), metaPageId); + long metaPageId = pageMem.metaPageId(grpId); + long metaPage = pageMem.acquirePage(grpId, metaPageId); try { - long metaPageAddr = pageMem.writeLock(cctx.cacheId(), metaPageId, metaPage); + long metaPageAddr = pageMem.writeLock(grpId, metaPageId, metaPage); try { long nextSnapshotTag = io.getNextSnapshotTag(metaPageAddr); io.setNextSnapshotTag(metaPageAddr, nextSnapshotTag + 1); - if (PageHandler.isWalDeltaRecordNeeded(pageMem, cctx.cacheId(), metaPageId, + if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, metaPageId, metaPage, wal, null)) - wal.log(new MetaPageUpdateNextSnapshotId(cctx.cacheId(), metaPageId, + wal.log(new MetaPageUpdateNextSnapshotId(grpId, metaPageId, nextSnapshotTag + 1)); - addPartition(ctx.partitionStatMap(), metaPageAddr, io, cctx.cacheId(), PageIdAllocator.INDEX_PARTITION, - cctx.kernalContext().cache().context().pageStore().pages(cacheId, PageIdAllocator.INDEX_PARTITION)); + addPartition(ctx.partitionStatMap(), metaPageAddr, io, grpId, PageIdAllocator.INDEX_PARTITION, + this.ctx.kernalContext().cache().context().pageStore().pages(grpId, PageIdAllocator.INDEX_PARTITION)); } finally { - pageMem.writeUnlock(cctx.cacheId(), metaPageId, metaPage, null, true); + pageMem.writeUnlock(grpId, metaPageId, metaPage, null, true); } } finally { - pageMem.releasePage(cctx.cacheId(), metaPageId, metaPage); + pageMem.releasePage(grpId, metaPageId, metaPage); } wasSaveToMeta = true; } - GridDhtPartitionMap partMap = cctx.topology().localPartitionMap(); + GridDhtPartitionMap partMap = grp.topology().localPartitionMap(); if (partMap.containsKey(store.partId()) && partMap.get(store.partId()) == GridDhtPartitionState.OWNING) - addPartition(ctx.partitionStatMap(), pageAddr, io, cctx.cacheId(), store.partId(), - cctx.kernalContext().cache().context().pageStore().pages(cctx.cacheId(), store.partId())); + addPartition(ctx.partitionStatMap(), pageAddr, io, grpId, store.partId(), + this.ctx.pageStore().pages(grpId, store.partId())); } else pageCount = io.getCandidatePageCount(pageAddr); - if (PageHandler.isWalDeltaRecordNeeded(pageMem, cacheId, partMetaId, partMetaPage, wal, null)) + if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null)) wal.log(new MetaPageUpdatePartitionDataRecord( - cacheId, + grpId, partMetaId, updCntr, rmvId, @@ -265,11 +279,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple )); } finally { - pageMem.writeUnlock(cacheId, partMetaId, partMetaPage, null, true); + pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, true); } } finally { - pageMem.releasePage(cacheId, partMetaId, partMetaPage); + pageMem.releasePage(grpId, partMetaId, partMetaPage); } } } @@ -304,23 +318,23 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ @Override protected void destroyCacheDataStore0(CacheDataStore store) throws IgniteCheckedException { - cctx.shared().database().checkpointReadLock(); + ctx.database().checkpointReadLock(); try { int p = store.partId(); saveStoreMetadata(store, null, false, true); - PageMemoryEx pageMemory = (PageMemoryEx)cctx.memoryPolicy().pageMemory(); + PageMemoryEx pageMemory = (PageMemoryEx)grp.memoryPolicy().pageMemory(); - int tag = pageMemory.invalidate(cctx.cacheId(), p); + int tag = pageMemory.invalidate(grp.groupId(), p); - cctx.shared().wal().log(new PartitionDestroyRecord(cctx.cacheId(), p)); + ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), p)); - cctx.shared().pageStore().onPartitionDestroyed(cctx.cacheId(), p, tag); + ctx.pageStore().onPartitionDestroyed(grp.groupId(), p, tag); } finally { - cctx.shared().database().checkpointReadUnlock(); + ctx.database().checkpointReadUnlock(); } } @@ -355,11 +369,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ @Override public RootPage rootPageForIndex(String idxName) throws IgniteCheckedException { + // TODO IGNITE-5075: per cache? return metaStore.getOrAllocateForTree(idxName); } /** {@inheritDoc} */ @Override public void dropRootPageForIndex(String idxName) throws IgniteCheckedException { + // TODO IGNITE-5075: per cache? metaStore.dropRootPage(idxName); } @@ -369,10 +385,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override protected void destroyCacheDataStructures() { - assert cctx.affinityNode(); - - ((GridCacheDatabaseSharedManager)cctx.shared().database()).removeCheckpointListener(this); + @Override public void stop() { + if (grp.affinityNode()) + ((GridCacheDatabaseSharedManager)ctx.database()).removeCheckpointListener(this); } /** @@ -380,14 +395,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple * @throws IgniteCheckedException If failed. */ private Metas getOrAllocateCacheMetas() throws IgniteCheckedException { - PageMemoryEx pageMem = (PageMemoryEx) cctx.memoryPolicy().pageMemory(); - IgniteWriteAheadLogManager wal = cctx.shared().wal(); + PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory(); + IgniteWriteAheadLogManager wal = ctx.wal(); + + int grpId = grp.groupId(); + long metaId = pageMem.metaPageId(grpId); + long metaPage = pageMem.acquirePage(grpId, metaId); - int cacheId = cctx.cacheId(); - long metaId = pageMem.metaPageId(cacheId); - long metaPage = pageMem.acquirePage(cacheId, metaId); try { - final long pageAddr = pageMem.writeLock(cacheId, metaId, metaPage); + final long pageAddr = pageMem.writeLock(grpId, metaId, metaPage); boolean allocated = false; @@ -399,15 +415,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple pageIO.initNewPage(pageAddr, metaId, pageMem.pageSize()); - metastoreRoot = pageMem.allocatePage(cacheId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX); - reuseListRoot = pageMem.allocatePage(cacheId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX); + metastoreRoot = pageMem.allocatePage(grpId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX); + reuseListRoot = pageMem.allocatePage(grpId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX); pageIO.setTreeRoot(pageAddr, metastoreRoot); pageIO.setReuseListRoot(pageAddr, reuseListRoot); - if (PageHandler.isWalDeltaRecordNeeded(pageMem, cacheId, metaId, metaPage, wal, null)) + if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, metaId, metaPage, wal, null)) wal.log(new MetaPageInitRecord( - cacheId, + grpId, metaId, pageIO.getType(), pageIO.getVersion(), @@ -427,15 +443,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } return new Metas( - new RootPage(new FullPageId(metastoreRoot, cacheId), allocated), - new RootPage(new FullPageId(reuseListRoot, cacheId), allocated)); + new RootPage(new FullPageId(metastoreRoot, grpId), allocated), + new RootPage(new FullPageId(reuseListRoot, grpId), allocated)); } finally { - pageMem.writeUnlock(cacheId, metaId, metaPage, null, allocated); + pageMem.writeUnlock(grpId, metaId, metaPage, null, allocated); } } finally { - pageMem.releasePage(cacheId, metaId, metaPage); + pageMem.releasePage(grpId, metaId, metaPage); } } @@ -445,10 +461,10 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (partCntrSince == null) return super.rebalanceIterator(part, topVer, partCntrSince); - GridCacheDatabaseSharedManager database = (GridCacheDatabaseSharedManager)cctx.shared().database(); + GridCacheDatabaseSharedManager database = (GridCacheDatabaseSharedManager)ctx.database(); try { - WALPointer startPtr = database.searchPartitionCounter(cctx, part, partCntrSince); + WALPointer startPtr = database.searchPartitionCounter(grp.groupId(), part, partCntrSince); if (startPtr == null) { assert false : "partCntr=" + partCntrSince + ", reservations=" + S.toString(Map.class, database.reservedForPreloading()); @@ -456,9 +472,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple return super.rebalanceIterator(part, topVer, partCntrSince); } - WALIterator it = cctx.shared().wal().replay(startPtr); + WALIterator it = ctx.wal().replay(startPtr); - return new RebalanceIteratorAdapter(cctx, it, part); + return new RebalanceIteratorAdapter(grp, it, part); } catch (IgniteCheckedException e) { U.warn(log, "Failed to create WAL-based rebalance iterator (a full partition will transferred to a " + @@ -472,14 +488,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple * */ private static class RebalanceIteratorAdapter implements IgniteRebalanceIterator { - /** Cache context. */ - private GridCacheContext cctx; + /** Cache group caches. */ + private final Set<Integer> cacheGrpCaches; /** WAL iterator. */ - private WALIterator walIt; + private final WALIterator walIt; /** Partition to scan. */ - private int part; + private final int part; /** */ private Iterator<DataEntry> entryIt; @@ -488,12 +504,12 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple private CacheDataRow next; /** - * @param cctx Cache context. + * @param grp Cache group. * @param walIt WAL iterator. * @param part Partition ID. */ - private RebalanceIteratorAdapter(GridCacheContext cctx, WALIterator walIt, int part) { - this.cctx = cctx; + private RebalanceIteratorAdapter(CacheGroupInfrastructure grp, WALIterator walIt, int part) { + this.cacheGrpCaches = grp.cacheIds(); this.walIt = walIt; this.part = part; @@ -568,8 +584,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple while (entryIt.hasNext()) { DataEntry entry = entryIt.next(); - if (entry.cacheId() == cctx.cacheId() && - entry.partitionId() == part) { + if (entry.partitionId() == part && cacheGrpCaches.contains(entry.cacheId())) { next = new DataEntryRow(entry); @@ -742,7 +757,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple return null; } - IgniteCacheDatabaseSharedManager dbMgr = cctx.shared().database(); + IgniteCacheDatabaseSharedManager dbMgr = ctx.database(); dbMgr.checkpointReadLock(); @@ -753,12 +768,12 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple RootPage reuseRoot = metas.reuseListRoot; freeList = new FreeListImpl( - cctx.cacheId(), - cctx.name() + "-" + partId, - (MemoryMetricsImpl)cctx.memoryPolicy().memoryMetrics(), - cctx.memoryPolicy(), + grp.groupId(), + grp.cacheOrGroupName() + "-" + partId, + grp.memoryPolicy().memoryMetrics(), + grp.memoryPolicy(), null, - cctx.shared().wal(), + ctx.wal(), reuseRoot.pageId().pageId(), reuseRoot.isAllocated()) { @Override protected long allocatePageNoReuse() throws IgniteCheckedException { @@ -766,15 +781,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } }; - CacheDataRowStore rowStore = new CacheDataRowStore(cctx, freeList, partId); + CacheDataRowStore rowStore = new CacheDataRowStore(grp, freeList, partId); RootPage treeRoot = metas.treeRoot; CacheDataTree dataTree = new CacheDataTree( + grp, name, freeList, rowStore, - cctx, treeRoot.pageId().pageId(), treeRoot.isAllocated()) { @Override protected long allocatePageNoReuse() throws IgniteCheckedException { @@ -782,15 +797,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } }; - PageMemoryEx pageMem = (PageMemoryEx)cctx.memoryPolicy().pageMemory(); + PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory(); delegate0 = new CacheDataStoreImpl(partId, name, rowStore, dataTree); - int cacheId = cctx.cacheId(); - long partMetaId = pageMem.partitionMetaPageId(cacheId, partId); - long partMetaPage = pageMem.acquirePage(cacheId, partMetaId); + int grpId = grp.groupId(); + long partMetaId = pageMem.partitionMetaPageId(grpId, partId); + long partMetaPage = pageMem.acquirePage(grpId, partMetaId); try { - long pageAddr = pageMem.readLock(cacheId, partMetaId, partMetaPage); + long pageAddr = pageMem.readLock(grpId, partMetaId, partMetaPage); try { if (PageIO.getType(pageAddr) != 0) { @@ -798,15 +813,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple delegate0.init(io.getSize(pageAddr), io.getUpdateCounter(pageAddr)); - cctx.offheap().globalRemoveId().setIfGreater(io.getGlobalRemoveId(pageAddr)); + globalRemoveId().setIfGreater(io.getGlobalRemoveId(pageAddr)); } } finally { - pageMem.readUnlock(cacheId, partMetaId, partMetaPage); + pageMem.readUnlock(grpId, partMetaId, partMetaPage); } } finally { - pageMem.releasePage(cacheId, partMetaId, partMetaPage); + pageMem.releasePage(grpId, partMetaId, partMetaPage); } delegate = delegate0; @@ -835,15 +850,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple * @return Partition metas. */ private Metas getOrAllocatePartitionMetas() throws IgniteCheckedException { - PageMemoryEx pageMem = (PageMemoryEx)cctx.memoryPolicy().pageMemory(); - IgniteWriteAheadLogManager wal = cctx.shared().wal(); + PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory(); + IgniteWriteAheadLogManager wal = ctx.wal(); - int cacheId = cctx.cacheId(); - long partMetaId = pageMem.partitionMetaPageId(cacheId, partId); - long partMetaPage = pageMem.acquirePage(cacheId, partMetaId); + int grpId = grp.groupId(); + long partMetaId = pageMem.partitionMetaPageId(grpId, partId); + long partMetaPage = pageMem.acquirePage(grpId, partMetaId); try { boolean allocated = false; - long pageAddr = pageMem.writeLock(cacheId, partMetaId, partMetaPage); + long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); try { long treeRoot, reuseListRoot; @@ -854,8 +869,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple io.initNewPage(pageAddr, partMetaId, pageMem.pageSize()); - treeRoot = pageMem.allocatePage(cacheId, partId, PageMemory.FLAG_DATA); - reuseListRoot = pageMem.allocatePage(cacheId, partId, PageMemory.FLAG_DATA); + treeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA); + reuseListRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA); assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA; assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA; @@ -863,9 +878,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple io.setTreeRoot(pageAddr, treeRoot); io.setReuseListRoot(pageAddr, reuseListRoot); - if (PageHandler.isWalDeltaRecordNeeded(pageMem, cacheId, partMetaId, partMetaPage, wal, null)) + if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null)) wal.log(new MetaPageInitRecord( - cctx.cacheId(), + grpId, partMetaId, io.getType(), io.getVersion(), @@ -882,21 +897,21 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple reuseListRoot = io.getReuseListRoot(pageAddr); assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA : - U.hexLong(treeRoot) + ", part=" + partId + ", cacheId=" + cacheId; + U.hexLong(treeRoot) + ", part=" + partId + ", grpId=" + grpId; assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA : - U.hexLong(reuseListRoot) + ", part=" + partId + ", cacheId=" + cacheId; + U.hexLong(reuseListRoot) + ", part=" + partId + ", grpId=" + grpId; } return new Metas( - new RootPage(new FullPageId(treeRoot, cacheId), allocated), - new RootPage(new FullPageId(reuseListRoot, cacheId), allocated)); + new RootPage(new FullPageId(treeRoot, grpId), allocated), + new RootPage(new FullPageId(reuseListRoot, grpId), allocated)); } finally { - pageMem.writeUnlock(cacheId, partMetaId, partMetaPage, null, allocated); + pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, allocated); } } finally { - pageMem.releasePage(cacheId, partMetaId, partMetaPage); + pageMem.releasePage(grpId, partMetaId, partMetaPage); } } @@ -918,11 +933,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public int size() { + @Override public int fullSize() { try { CacheDataStore delegate0 = init0(true); - return delegate0 == null ? 0 : delegate0.size(); + return delegate0 == null ? 0 : delegate0.fullSize(); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -998,6 +1013,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ @Override public void update( + GridCacheContext cctx, KeyCacheObject key, CacheObject val, GridCacheVersion ver, @@ -1006,47 +1022,51 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple ) throws IgniteCheckedException { CacheDataStore delegate = init0(false); - delegate.update(key, val, ver, expireTime, oldRow); + delegate.update(cctx, key, val, ver, expireTime, oldRow); } /** {@inheritDoc} */ - @Override public void updateIndexes(KeyCacheObject key) throws IgniteCheckedException { + @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { CacheDataStore delegate = init0(false); - delegate.updateIndexes(key); + delegate.updateIndexes(cctx, key); } /** {@inheritDoc} */ - @Override public CacheDataRow createRow(KeyCacheObject key, + @Override public CacheDataRow createRow( + GridCacheContext cctx, + KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expireTime, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { CacheDataStore delegate = init0(false); - return delegate.createRow(key, val, ver, expireTime, oldRow); + return delegate.createRow(cctx, key, val, ver, expireTime, oldRow); } /** {@inheritDoc} */ - @Override public void invoke(KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException { + @Override public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c) + throws IgniteCheckedException { CacheDataStore delegate = init0(false); - delegate.invoke(key, c); + delegate.invoke(cctx, key, c); } /** {@inheritDoc} */ - @Override public void remove(KeyCacheObject key, int partId) throws IgniteCheckedException { + @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) + throws IgniteCheckedException { CacheDataStore delegate = init0(false); - delegate.remove(key, partId); + delegate.remove(cctx, key, partId); } /** {@inheritDoc} */ - @Override public CacheDataRow find(KeyCacheObject key) throws IgniteCheckedException { + @Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { CacheDataStore delegate = init0(true); if (delegate != null) - return delegate.find(key); + return delegate.find(cctx, key); return null; } @@ -1062,12 +1082,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public GridCursor<? extends CacheDataRow> cursor(KeyCacheObject lower, + @Override public GridCursor<? extends CacheDataRow> cursor( + int cacheId, + KeyCacheObject lower, KeyCacheObject upper) throws IgniteCheckedException { CacheDataStore delegate = init0(true); if (delegate != null) - return delegate.cursor(lower, upper); + return delegate.cursor(cacheId, lower, upper); return EMPTY_CURSOR; } @@ -1076,6 +1098,29 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple @Override public void destroy() throws IgniteCheckedException { // No need to destroy delegate. } + + /** {@inheritDoc} */ + @Override public int cacheSize(int cacheId) { + return 0; + } + + /** {@inheritDoc} */ + @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException { + CacheDataStore delegate = init0(true); + + if (delegate != null) + return delegate.cursor(cacheId); + + return EMPTY_CURSOR; + } + + /** {@inheritDoc} */ + @Override public void clear(int cacheId) throws IgniteCheckedException { + CacheDataStore delegate = init0(true); + + if (delegate != null) + delegate.clear(cacheId); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java index 1bb83d2..576e58a 100755 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java @@ -29,7 +29,7 @@ import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; -import java.util.HashSet; +import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -43,10 +43,10 @@ import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; import org.apache.ignite.internal.pagemem.store.PageStore; -import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; +import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; import org.apache.ignite.internal.processors.cache.database.IgniteCacheSnapshotManager; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.marshaller.jdk.JdkMarshaller; @@ -72,6 +72,9 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen public static final String CACHE_DIR_PREFIX = "cache-"; /** */ + public static final String CACHE_GRP_DIR_PREFIX = "cacheGroup-"; + + /** */ public static final String CACHE_CONF_FILENAME = "conf.dat"; /** Marshaller. */ @@ -93,7 +96,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen private final long metaPageId = PageIdUtils.pageId(-1, PageMemory.FLAG_IDX, 0); /** */ - private final Set<Integer> cachesWithoutIdx = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>()); + private final Set<Integer> grpsWithoutIdx = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>()); /** * @param ctx Kernal context. @@ -179,25 +182,57 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** {@inheritDoc} */ - @Override public void initializeForCache(CacheConfiguration ccfg) throws IgniteCheckedException { - int cacheId = CU.cacheId(ccfg.getName()); + @Override public void initializeForCache(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) + throws IgniteCheckedException { + int grpId = grpDesc.groupId(); - if (!idxCacheStores.containsKey(cacheId)) { - CacheStoreHolder holder = initForCache(ccfg); + if (!idxCacheStores.containsKey(grpId)) { + CacheStoreHolder holder = initForCache(grpDesc, ccfg); - CacheStoreHolder old = idxCacheStores.put(cacheId, holder); + CacheStoreHolder old = idxCacheStores.put(grpId, holder); assert old == null : "Non-null old store holder for cache: " + ccfg.getName(); } + + storeCacheConfiguration(grpDesc, ccfg); + } + + /** + * @param grpDesc Cache group descriptor. + * @param ccfg Cache configuration. + * @throws IgniteCheckedException If failed. + */ + private void storeCacheConfiguration(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) + throws IgniteCheckedException { + File cacheWorkDir = cacheWorkDirectory(grpDesc, ccfg); + File file; + + if (grpDesc.sharedGroup()) + file = new File(cacheWorkDir, ccfg.getName() + CACHE_CONF_FILENAME); + else + file = new File(cacheWorkDir, CACHE_CONF_FILENAME); + + if (!file.exists() || file.length() == 0) { + try { + file.createNewFile(); + + try (OutputStream stream = new BufferedOutputStream(new FileOutputStream(file))) { + marshaller.marshal(ccfg, stream); + } + } + catch (IOException ex) { + throw new IgniteCheckedException("Failed to persist cache configuration: " + ccfg.getName(), ex); + } + } } /** {@inheritDoc} */ - @Override public void shutdownForCache(GridCacheContext cacheCtx, boolean destroy) throws IgniteCheckedException { - cachesWithoutIdx.remove(cacheCtx.cacheId()); + @Override public void shutdownForCacheGroup(CacheGroupInfrastructure grp, boolean destroy) throws IgniteCheckedException { + grpsWithoutIdx.remove(grp.groupId()); - CacheStoreHolder old = idxCacheStores.remove(cacheCtx.cacheId()); + CacheStoreHolder old = idxCacheStores.remove(grp.groupId()); - assert old != null : "Missing cache store holder [cache=" + cacheCtx.name() + + assert old != null : "Missing cache store holder [cache=" + grp.cacheOrGroupName() + ", locNodeId=" + cctx.localNodeId() + ", gridName=" + cctx.igniteInstanceName() + ']'; IgniteCheckedException ex = shutdown(old, /*clean files if destroy*/destroy, null); @@ -207,17 +242,17 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** {@inheritDoc} */ - @Override public void onPartitionCreated(int cacheId, int partId) throws IgniteCheckedException { + @Override public void onPartitionCreated(int grpId, int partId) throws IgniteCheckedException { // No-op. } /** {@inheritDoc} */ - @Override public void onPartitionDestroyed(int cacheId, int partId, int tag) throws IgniteCheckedException { + @Override public void onPartitionDestroyed(int grpId, int partId, int tag) throws IgniteCheckedException { assert partId <= PageIdAllocator.MAX_PARTITION_ID; - PageStore store = getStore(cacheId, partId); + PageStore store = getStore(grpId, partId); - assert store instanceof FilePageStore; + assert store instanceof FilePageStore : store; ((FilePageStore)store).truncate(tag); } @@ -286,12 +321,31 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** + * @param grpDesc Cache group descriptor. + * @param ccfg Cache configuration. + * @return Cache work directory. + */ + private File cacheWorkDirectory(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) { + String dirName; + + if (grpDesc.sharedGroup()) + dirName = CACHE_GRP_DIR_PREFIX + ccfg.getGroupName(); + else + dirName = CACHE_DIR_PREFIX + ccfg.getName(); + + return new File(storeWorkDir, dirName); + } + + /** + * @param grpDesc Cache group descriptor. * @param ccfg Cache configuration. * @return Cache store holder. * @throws IgniteCheckedException If failed. */ - private CacheStoreHolder initForCache(CacheConfiguration ccfg) throws IgniteCheckedException { - File cacheWorkDir = new File(storeWorkDir, CACHE_DIR_PREFIX + ccfg.getName()); + private CacheStoreHolder initForCache(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) throws IgniteCheckedException { + assert !grpDesc.sharedGroup() || ccfg.getGroupName() != null : ccfg.getName(); + + File cacheWorkDir = cacheWorkDirectory(grpDesc, ccfg); boolean dirExisted = false; @@ -348,32 +402,17 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen dirExisted = true; } - File file = new File(cacheWorkDir, CACHE_CONF_FILENAME); - - if (!file.exists() || file.length() == 0) { - try { - file.createNewFile(); - - try (OutputStream stream = new BufferedOutputStream(new FileOutputStream(file))) { - marshaller.marshal(ccfg, stream); - } - } - catch (IOException ex) { - throw new IgniteCheckedException("Failed to persist cache configuration: " + ccfg.getName(), ex); - } - } - File idxFile = new File(cacheWorkDir, INDEX_FILE_NAME); if (dirExisted && !idxFile.exists()) - cachesWithoutIdx.add(CU.cacheId(ccfg.getName())); + grpsWithoutIdx.add(grpDesc.groupId()); FilePageStore idxStore = new FilePageStore( PageMemory.FLAG_IDX, idxFile, cctx.kernalContext().config().getMemoryConfiguration()); - FilePageStore[] partStores = new FilePageStore[ccfg.getAffinity().partitions()]; + FilePageStore[] partStores = new FilePageStore[grpDesc.config().getAffinity().partitions()]; for (int partId = 0; partId < partStores.length; partId++) { FilePageStore partStore = new FilePageStore( @@ -422,52 +461,74 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** {@inheritDoc} */ - @Override public Set<String> savedCacheNames() { + @Override public Map<String, CacheConfiguration> readCacheConfigurations() throws IgniteCheckedException { if (cctx.kernalContext().clientNode()) - return Collections.emptySet(); + return Collections.emptyMap(); File[] files = storeWorkDir.listFiles(); if (files == null) - return Collections.emptySet(); + return Collections.emptyMap(); - Set<String> cacheNames = new HashSet<>(); + Map<String, CacheConfiguration> ccfgs = new HashMap<>(); for (File file : files) { - if (file.isDirectory() && file.getName().startsWith(CACHE_DIR_PREFIX)) { - File conf = new File(file, CACHE_CONF_FILENAME); - if (conf.exists() && conf.length() > 0) { - String name = file.getName().substring(CACHE_DIR_PREFIX.length()); + if (file.isDirectory()) { + if (file.getName().startsWith(CACHE_DIR_PREFIX)) { + File conf = new File(file, CACHE_CONF_FILENAME); - // TODO remove when fixed null cache names. - if ("null".equals(name)) - name = null; + if (conf.exists() && conf.length() > 0) { + CacheConfiguration ccfg = readCacheConfig(conf); - cacheNames.add(name); + ccfgs.put(ccfg.getName(), ccfg); + } } + else if (file.getName().startsWith(CACHE_GRP_DIR_PREFIX)) + readCacheGroupCaches(file, ccfgs); } } - return cacheNames; + return ccfgs; } - /** {@inheritDoc} */ - @Override public CacheConfiguration readConfiguration(String cacheName) { - File file = new File(storeWorkDir, CACHE_DIR_PREFIX + cacheName); + /** + * @param grpDir Group directory. + * @param ccfgs Cache configurations. + * @throws IgniteCheckedException If failed. + */ + private void readCacheGroupCaches(File grpDir, Map<String, CacheConfiguration> ccfgs) throws IgniteCheckedException { + File[] files = grpDir.listFiles(); + + if (files == null) + return; + + for (File file : files) { + if (!file.isDirectory() && file.getName().endsWith(CACHE_CONF_FILENAME) && file.length() > 0) { + CacheConfiguration ccfg = readCacheConfig(file); - assert file.exists() && file.isDirectory(); + ccfgs.put(ccfg.getName(), ccfg); + } + } + } - try (InputStream stream = new BufferedInputStream(new FileInputStream(new File(file, CACHE_CONF_FILENAME)))) { + /** + * @param conf File with stored configuration. + * @return Cache configuration. + * @throws IgniteCheckedException If failed. + */ + private CacheConfiguration readCacheConfig(File conf) throws IgniteCheckedException { + try (InputStream stream = new BufferedInputStream(new FileInputStream(conf))) { return marshaller.unmarshal(stream, U.resolveClassLoader(igniteCfg)); } - catch (IOException | IgniteCheckedException e) { - throw new IllegalStateException("Failed to read cache configuration from disk for cache: " + cacheName, e); + catch (IOException e) { + throw new IgniteCheckedException("Failed to read cache configuration from disk for cache: " + + conf.getAbsolutePath(), e); } } /** {@inheritDoc} */ - @Override public boolean hasIndexStore(int cacheId) { - return !cachesWithoutIdx.contains(cacheId); + @Override public boolean hasIndexStore(int grpId) { + return !grpsWithoutIdx.contains(grpId); } /** @@ -537,19 +598,19 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param partId Partition ID. * @return Page store for the corresponding parameters. * @throws IgniteCheckedException If cache or partition with the given ID was not created. * * Note: visible for testing. */ - public PageStore getStore(int cacheId, int partId) throws IgniteCheckedException { - CacheStoreHolder holder = idxCacheStores.get(cacheId); + public PageStore getStore(int grpId, int partId) throws IgniteCheckedException { + CacheStoreHolder holder = idxCacheStores.get(grpId); if (holder == null) throw new IgniteCheckedException("Failed to get page store for the given cache ID " + - "(cache has not been started): " + cacheId); + "(cache has not been started): " + grpId); if (partId == PageIdAllocator.INDEX_PARTITION) return holder.idxStore; @@ -561,7 +622,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen if (store == null) throw new IgniteCheckedException("Failed to get page store for the given partition ID " + - "(partition has not been created) [cacheId=" + cacheId + ", partId=" + partId + ']'); + "(partition has not been created) [grpId=" + grpId + ", partId=" + partId + ']'); return store; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java index e8ae554..ef84d83 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java @@ -131,16 +131,16 @@ public interface PageMemoryEx extends PageMemory { public int invalidate(int cacheId, int partId); /** - * Clears internal metadata of destroyed cache. + * Clears internal metadata of destroyed cache group. * - * @param cacheId Cache ID. + * @param grpId Cache group ID. */ - public void onCacheDestroyed(int cacheId); + public void onCacheGroupDestroyed(int grpId); /** * Asynchronously clears pages satisfying the given predicate. * - * @param pred Predicate for cacheId, pageId and partition tag. + * @param pred Predicate for cache group id, pageId and partition tag. * @param cleanDirty Flag indicating that dirty pages collection should be cleaned. * @return Future that will be completed when all pages are cleared. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java index 21ebcbf..a791ec9 100755 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java @@ -970,12 +970,12 @@ public class PageMemoryImpl implements PageMemoryEx { } /** {@inheritDoc} */ - @Override public void onCacheDestroyed(int cacheId) { + @Override public void onCacheGroupDestroyed(int grpId) { for (Segment seg : segments) { seg.writeLock().lock(); try { - seg.resetPartTags(cacheId); + seg.resetPartTags(grpId); } finally { seg.writeLock().unlock(); @@ -1935,7 +1935,7 @@ public class PageMemoryImpl implements PageMemoryEx { } } - private void resetPartTags(int cacheId) { + private void resetPartTags(int grpId) { assert getWriteHoldCount() > 0; Iterator<T2<Integer, Integer>> iter = partitionTagMap.keySet().iterator(); @@ -1943,7 +1943,7 @@ public class PageMemoryImpl implements PageMemoryEx { while (iter.hasNext()) { T2<Integer, Integer> t = iter.next(); - if (t.get1() == cacheId) + if (t.get1() == grpId) iter.remove(); } }
