http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index bf4691b..45e6a8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -74,6 +74,7 @@ import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cache.database.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.database.IgniteCacheSnapshotManager; @@ -176,6 +177,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { private GridCacheSharedContext<?, ?> sharedCtx; /** */ + private final ConcurrentMap<Integer, CacheGroupContext> cacheGrps = new ConcurrentHashMap<>(); + + /** */ private final Map<String, GridCacheAdapter<?, ?>> caches; /** Caches stopped from onKernalStop callback. */ @@ -526,7 +530,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ private List<GridCacheManager> dhtManagers(GridCacheContext ctx) { return F.asList(ctx.store(), ctx.events(), ctx.evicts(), ctx.queries(), ctx.continuousQueries(), - ctx.dr(), ctx.offheap()); + ctx.dr()); } /** @@ -538,7 +542,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (ctx.config().getCacheMode() == LOCAL || !isNearEnabled(ctx)) return Collections.emptyList(); else - return F.asList(ctx.queries(), ctx.continuousQueries(), ctx.store(), ctx.offheap()); + return F.asList(ctx.queries(), ctx.continuousQueries(), ctx.store()); } /** @@ -552,7 +556,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { prepare(cfg, cfg.getAffinityMapper(), false); prepare(cfg, cfg.getEvictionFilter(), false); prepare(cfg, cfg.getInterceptor(), false); - prepare(cfg, cfg.getTopologyValidator(), false); NearCacheConfiguration nearCfg = cfg.getNearConfiguration(); @@ -590,7 +593,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { cleanup(cfg, cfg.getAffinityMapper(), false); cleanup(cfg, cfg.getEvictionFilter(), false); cleanup(cfg, cfg.getInterceptor(), false); - cleanup(cfg, cfg.getTopologyValidator(), false); cleanup(cfg, cctx.store().configuredStore(), false); if (!CU.isUtilityCache(cfg.getName()) && !CU.isSystemCache(cfg.getName())) { @@ -607,6 +609,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param grp Cache group. + */ + private void cleanup(CacheGroupContext grp) { + CacheConfiguration cfg = grp.config(); + + for (Object obj : grp.configuredUserObjects()) + cleanup(cfg, obj, false); + } + + /** * @param cfg Cache configuration. * @param rsrc Resource. * @param near Near flag. @@ -720,10 +732,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { else stopSeq.addFirst(cfg.getName()); - caches.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, cacheType, sql, (byte)0)); + caches.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, cacheType, sql, 0)); } else - templates.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, CacheType.USER, false, (byte)0)); + templates.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, CacheType.USER, false, 0)); } /** @@ -760,23 +772,21 @@ public class GridCacheProcessor extends GridProcessorAdapter { assert !ctx.config().isDaemon(); if (sharedCtx.pageStore() != null && sharedCtx.database().persistenceEnabled()) { - Set<String> savedCacheNames = sharedCtx.pageStore().savedCacheNames(); + Map<String, CacheConfiguration> ccfgs = sharedCtx.pageStore().readCacheConfigurations(); - savedCacheNames.removeAll(caches.keySet()); + for (String cache : caches.keySet()) + ccfgs.remove(cache); - savedCacheNames.removeAll(internalCaches); + for (String cache : internalCaches) + ccfgs.remove(cache); - if (!F.isEmpty(savedCacheNames)) { + if (!F.isEmpty(ccfgs)) { if (log.isInfoEnabled()) - log.info("Register persistent caches: " + savedCacheNames); - - for (String name : savedCacheNames) { - CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name); + log.info("Register persistent caches: " + ccfgs.keySet()); - // TODO IGNITE-5306 - set correct SQL flag below. - if (cfg != null) - addCacheOnJoin(cfg, false, caches, templates); - } + // TODO IGNITE-5306 - set correct SQL flag below. + for (CacheConfiguration ccfg : ccfgs.values()) + addCacheOnJoin(ccfg, false, caches, templates); } } } @@ -800,6 +810,21 @@ public class GridCacheProcessor extends GridProcessorAdapter { internalCaches.add(CU.ATOMICS_CACHE_NAME); } + /** + * @param grpId Group ID. + * @return Cache group. + */ + @Nullable public CacheGroupContext cacheGroup(int grpId) { + return cacheGrps.get(grpId); + } + + /** + * @return Cache groups. + */ + public Collection<CacheGroupContext> cacheGroups() { + return cacheGrps.values(); + } + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void onKernalStart(boolean activeOnStart) throws IgniteCheckedException { @@ -824,14 +849,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (CacheConfiguration conf : ctx.config().getCacheConfiguration()) { assert conf.getName() != null; - for (DynamicCacheDescriptor desc : cacheDescriptors()) { + for (DynamicCacheDescriptor desc : cacheDescriptors().values()) { CacheConfiguration c = desc.cacheConfiguration(); - IgnitePredicate filter = c.getNodeFilter(); + IgnitePredicate filter = desc.groupDescriptor().config().getNodeFilter(); if (c.getName().equals(conf.getName()) && ((desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter)) || CU.isSystemCache(c.getName()))) { - tmpCacheCfg.add(c); break; @@ -968,6 +992,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { stopCache(cache, cancel, false); } + for (CacheGroupContext grp : cacheGrps.values()) + stopCacheGroup(grp.groupId()); + cachesInfo.clearCaches(); } @@ -990,6 +1017,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { // No new caches should be added after this point. exch.onKernalStop(cancel); + for (CacheGroupContext grp : cacheGrps.values()) + grp.onKernalStop(); + onKernalStopCaches(cancel); cancelFutures(); @@ -1009,11 +1039,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param cancel Cancel. */ public void onKernalStopCaches(boolean cancel){ - for (GridCacheAdapter<?, ?> cache : caches.values()) { - GridCacheAffinityManager aff = cache.context().affinity(); + IgniteCheckedException affErr = + new IgniteCheckedException("Failed to wait for topology update, node is stopping."); - if (aff != null) - aff.cancelFutures(); + for (CacheGroupContext grp : cacheGrps.values()) { + GridAffinityAssignmentCache aff = grp.affinity(); + + aff.cancelFutures(affErr); } for (String cacheName : stopSeq) { @@ -1049,6 +1081,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (IgniteInternalFuture fut : pendingTemplateFuts.values()) ((GridFutureAdapter)fut).onDone(err); + for (CacheGroupContext grp : cacheGrps.values()) + grp.onDisconnected(reconnectFut); + for (GridCacheAdapter cache : caches.values()) { GridCacheContext cctx = cache.context(); @@ -1068,37 +1103,35 @@ public class GridCacheProcessor extends GridProcessorAdapter { cachesInfo.onDisconnect(); } - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException { - List<GridCacheAdapter> reconnected = new ArrayList<>(caches.size()); - - GridCompoundFuture<?, ?> stopFut = null; + /** + * @param cctx Cache context. + * @param stoppedCaches List where stopped cache should be added. + */ + private void stopCacheOnReconnect(GridCacheContext cctx, List<GridCacheAdapter> stoppedCaches) { + cctx.gate().reconnected(true); - Set<String> stoppedCaches = cachesInfo.onReconnected(); + sharedCtx.removeCacheContext(cctx); - for (final GridCacheAdapter cache : caches.values()) { - boolean stopped = stoppedCaches.contains(cache.name()); + caches.remove(cctx.name()); + jCacheProxies.remove(cctx.name()); - if (stopped) { - cache.context().gate().reconnected(true); + stoppedCaches.add(cctx.cache()); + } - sharedCtx.removeCacheContext(cache.ctx); + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException { + List<GridCacheAdapter> reconnected = new ArrayList<>(caches.size()); - caches.remove(cache.name()); - jCacheProxies.remove(cache.name()); + ClusterCachesReconnectResult reconnectRes = cachesInfo.onReconnected(); - IgniteInternalFuture<?> fut = ctx.closure().runLocalSafe(new Runnable() { - @Override public void run() { - onKernalStop(cache, true); - stopCache(cache, true, false); - } - }); + final List<GridCacheAdapter> stoppedCaches = new ArrayList<>(); - if (stopFut == null) - stopFut = new GridCompoundFuture<>(); + for (final GridCacheAdapter cache : caches.values()) { + boolean stopped = reconnectRes.stoppedCacheGroups().contains(cache.context().groupId()) + || reconnectRes.stoppedCaches().contains(cache.name()); - stopFut.add((IgniteInternalFuture)fut); - } + if (stopped) + stopCacheOnReconnect(cache.context(), stoppedCaches); else { cache.onReconnected(); @@ -1110,7 +1143,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor desc = cacheDescriptor(cctx.name()); - assert desc != null; + assert desc != null : cctx.name(); ctx.query().onCacheStop0(cctx.name()); ctx.query().onCacheStart0(cctx, desc.schema()); @@ -1118,13 +1151,37 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } + final Set<Integer> stoppedGrps = reconnectRes.stoppedCacheGroups(); + + for (CacheGroupContext grp : cacheGrps.values()) { + if (stoppedGrps.contains(grp.groupId())) + cacheGrps.remove(grp.groupId()); + else + grp.onReconnected(); + } + sharedCtx.onReconnected(); for (GridCacheAdapter cache : reconnected) cache.context().gate().reconnected(false); - if (stopFut != null) - stopFut.markInitialized(); + IgniteInternalFuture<?> stopFut = null; + + if (!stoppedCaches.isEmpty()) { + stopFut = ctx.closure().runLocalSafe(new Runnable() { + @Override public void run() { + for (GridCacheAdapter cache : stoppedCaches) { + CacheGroupContext grp = cache.context().group(); + + onKernalStop(cache, true); + stopCache(cache, true, false); + + if (!grp.hasCaches()) + stopCacheGroup(grp); + } + } + }); + } return stopFut; } @@ -1138,11 +1195,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { private void startCache(GridCacheAdapter<?, ?> cache, QuerySchema schema) throws IgniteCheckedException { GridCacheContext<?, ?> cacheCtx = cache.context(); - ctx.continuous().onCacheStart(cacheCtx); - - if (sharedCtx.pageStore() != null && !ctx.clientNode()) - sharedCtx.pageStore().initializeForCache(cacheCtx.config()); - CacheConfiguration cfg = cacheCtx.config(); // Intentionally compare Boolean references using '!=' below to check if the flag has been explicitly set. @@ -1173,18 +1225,22 @@ public class GridCacheProcessor extends GridProcessorAdapter { log.debug("Started DHT cache: " + dhtCtx.cache().name()); } + ctx.continuous().onCacheStart(cacheCtx); + cacheCtx.cache().start(); ctx.query().onCacheStart(cacheCtx, schema); cacheCtx.onStarted(); - if (log.isInfoEnabled()) - log.info("Started cache [name=" + U.maskName(cfg.getName()) + + + if (log.isInfoEnabled()) { + log.info("Started cache [name=" + cfg.getName() + + (cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") + ", memoryPolicyName=" + cfg.getMemoryPolicyName() + ", mode=" + cfg.getCacheMode() + - ", atomicity=" + cfg.getAtomicityMode() + ']' - ); + ", atomicity=" + cfg.getAtomicityMode() + ']'); +} } /** @@ -1248,10 +1304,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.kernalContext().cache().context().snapshot().onCacheStop(ctx); - U.stopLifecycleAware(log, lifecycleAwares(cache.configuration(), ctx.store().configuredStore())); + ctx.group().stopCache(ctx, destroy); - if (log.isInfoEnabled()) - log.info("Stopped cache: " + cache.name()); + U.stopLifecycleAware(log, lifecycleAwares(ctx.group(), cache.configuration(), ctx.store().configuredStore())); + + if (log.isInfoEnabled()) { + if (ctx.group().sharedGroup()) + log.info("Stopped cache [cacheName=" + cache.name() + ", group=" + ctx.group().name() + ']'); + else + log.info("Stopped cache [cacheName=" + cache.name() + ']'); + } cleanup(ctx); } @@ -1340,6 +1402,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @param cfg Cache configuration to use to create cache. + * @param grp Cache group. * @param pluginMgr Cache plugin manager. * @param desc Cache descriptor. * @param locStartTopVer Current topology version. @@ -1350,6 +1413,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed to create cache. */ private GridCacheContext createCache(CacheConfiguration<?, ?> cfg, + CacheGroupContext grp, @Nullable CachePluginManager pluginMgr, DynamicCacheDescriptor desc, AffinityTopologyVersion locStartTopVer, @@ -1394,7 +1458,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { prepare(cfg, toPrepare); - U.startLifecycleAware(lifecycleAwares(cfg, cfgStore)); + U.startLifecycleAware(lifecycleAwares(grp, cfg, cfgStore)); boolean nearEnabled = GridCacheUtils.isNearEnabled(cfg); @@ -1410,46 +1474,17 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCacheDrManager drMgr = pluginMgr.createComponent(GridCacheDrManager.class); CacheStoreManager storeMgr = pluginMgr.createComponent(CacheStoreManager.class); - IgniteCacheOffheapManager offheapMgr; - - if (ctx.config().getPersistentStoreConfiguration() != null) { - ClassLoader clsLdr = U.gridClassLoader(); - - try { - offheapMgr = (IgniteCacheOffheapManager) clsLdr - .loadClass("org.apache.ignite.internal.processors.cache.database.GridCacheOffheapManager") - .getConstructor() - .newInstance(); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to initialize offheap manager", e); - } - } - else - offheapMgr = new IgniteCacheOffheapManagerImpl(); - - storeMgr.initialize(cfgStore, sesHolders); - String memPlcName = cfg.getMemoryPolicyName(); - - MemoryPolicy memPlc = sharedCtx.database().memoryPolicy(memPlcName); - FreeList freeList = sharedCtx.database().freeList(memPlcName); - ReuseList reuseList = sharedCtx.database().reuseList(memPlcName); - GridCacheContext<?, ?> cacheCtx = new GridCacheContext( ctx, sharedCtx, cfg, + grp, desc.cacheType(), locStartTopVer, - desc.receivedFrom(), affNode, updatesAllowed, - memPlc, - freeList, - reuseList, - /* * Managers in starting order! * =========================== @@ -1462,7 +1497,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { dataStructuresMgr, ttlMgr, drMgr, - offheapMgr, rslvrMgr, pluginMgr, affMgr @@ -1518,14 +1552,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { case TRANSACTIONAL: { cache = cacheCtx.affinityNode() ? new GridDhtColocatedCache(cacheCtx) : - new GridDhtColocatedCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); + new GridDhtColocatedCache(cacheCtx, new GridNoStorageCacheMap()); break; } case ATOMIC: { cache = cacheCtx.affinityNode() ? new GridDhtAtomicCache(cacheCtx) : - new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); + new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap()); break; } @@ -1574,15 +1608,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx, sharedCtx, cfg, + grp, desc.cacheType(), locStartTopVer, - desc.receivedFrom(), affNode, true, - memPlc, - freeList, - reuseList, - /* * Managers in starting order! * =========================== @@ -1595,7 +1625,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { dataStructuresMgr, ttlMgr, drMgr, - offheapMgr, rslvrMgr, pluginMgr, affMgr @@ -1613,7 +1642,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridDhtCache dhtCache = cacheCtx.affinityNode() ? new GridDhtCache(cacheCtx) : - new GridDhtCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); + new GridDhtCache(cacheCtx, new GridNoStorageCacheMap()); dhtCache.near(near); @@ -1630,7 +1659,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridDhtAtomicCache dhtCache = cacheCtx.affinityNode() ? new GridDhtAtomicCache(cacheCtx) : - new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx)); + new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap()); dhtCache.near(near); @@ -1663,11 +1692,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Collection of started cache names. */ public Collection<String> cacheNames() { - return F.viewReadOnly(cacheDescriptors(), new IgniteClosure<DynamicCacheDescriptor, String>() { - @Override public String apply(DynamicCacheDescriptor desc) { - return desc.cacheConfiguration().getName(); - } - }); + return F.viewReadOnly(cacheDescriptors().values(), + new IgniteClosure<DynamicCacheDescriptor, String>() { + @Override public String apply(DynamicCacheDescriptor desc) { + return desc.cacheConfiguration().getName(); + } + }); } /** @@ -1712,7 +1742,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Collection of currently started public cache names */ public Collection<String> publicCacheNames() { - return F.viewReadOnly(cacheDescriptors(), + return F.viewReadOnly(cacheDescriptors().values(), new IgniteClosure<DynamicCacheDescriptor, String>() { @Override public String apply(DynamicCacheDescriptor desc) { return desc.cacheConfiguration().getName(); @@ -1751,6 +1781,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { AffinityTopologyVersion exchTopVer) throws IgniteCheckedException { prepareCacheStart( + cacheDesc.groupDescriptor(), cacheDesc.cacheConfiguration(), nearCfg, cacheDesc, @@ -1771,6 +1802,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor desc = t.get1(); prepareCacheStart( + desc.groupDescriptor(), desc.cacheConfiguration(), t.get2(), desc, @@ -1795,10 +1827,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (started != null) { for (DynamicCacheDescriptor desc : started) { - IgnitePredicate<ClusterNode> filter = desc.cacheConfiguration().getNodeFilter(); + IgnitePredicate<ClusterNode> filter = desc.groupDescriptor().config().getNodeFilter(); if (CU.affinityNode(ctx.discovery().localNode(), filter)) { prepareCacheStart( + desc.groupDescriptor(), desc.cacheConfiguration(), null, desc, @@ -1813,6 +1846,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param grpDesc Cache group descriptor. * @param startCfg Start configuration. * @param reqNearCfg Near configuration if specified for client cache start request. * @param desc Cache descriptor. @@ -1821,6 +1855,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed. */ private void prepareCacheStart( + CacheGroupDescriptor grpDesc, CacheConfiguration startCfg, @Nullable NearCacheConfiguration reqNearCfg, DynamicCacheDescriptor desc, @@ -1844,7 +1879,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ccfg.setNearConfiguration(null); } - else if (CU.affinityNode(ctx.discovery().localNode(), ccfg.getNodeFilter())) + else if (CU.affinityNode(ctx.discovery().localNode(), grpDesc.config().getNodeFilter())) affNode = true; else { affNode = false; @@ -1852,7 +1887,40 @@ public class GridCacheProcessor extends GridProcessorAdapter { ccfg.setNearConfiguration(reqNearCfg); } + if (sharedCtx.pageStore() != null && affNode) + sharedCtx.pageStore().initializeForCache(grpDesc, startCfg); + + String grpName = startCfg.getGroupName(); + + CacheGroupContext grp = null; + + if (grpName != null) { + for (CacheGroupContext grp0 : cacheGrps.values()) { + if (grp0.sharedGroup() && grpName.equals(grp0.name())) { + grp = grp0; + + break; + } + } + + if (grp == null) { + grp = startCacheGroup(grpDesc, + desc.cacheType(), + affNode, + cacheObjCtx, + exchTopVer); + } + } + else { + grp = startCacheGroup(grpDesc, + desc.cacheType(), + affNode, + cacheObjCtx, + exchTopVer); + } + GridCacheContext cacheCtx = createCache(ccfg, + grp, null, desc, exchTopVer, @@ -1870,6 +1938,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { startCache(cache, schema != null ? schema : new QuerySchema()); + grp.onCacheStarted(cacheCtx); + onKernalStart(cache); if (proxyRestart) @@ -1877,6 +1947,56 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param desc Group descriptor. + * @param cacheType Cache type. + * @param affNode Affinity node flag. + * @param cacheObjCtx Cache object context. + * @param exchTopVer Current topology version. + * @return Started cache group. + * @throws IgniteCheckedException If failed. + */ + private CacheGroupContext startCacheGroup( + CacheGroupDescriptor desc, + CacheType cacheType, + boolean affNode, + CacheObjectContext cacheObjCtx, + AffinityTopologyVersion exchTopVer) + throws IgniteCheckedException { + CacheConfiguration cfg = new CacheConfiguration(desc.config()); + + String memPlcName = cfg.getMemoryPolicyName(); + + MemoryPolicy memPlc = sharedCtx.database().memoryPolicy(memPlcName); + FreeList freeList = sharedCtx.database().freeList(memPlcName); + ReuseList reuseList = sharedCtx.database().reuseList(memPlcName); + + CacheGroupContext grp = new CacheGroupContext(sharedCtx, + desc.groupId(), + desc.receivedFrom(), + cacheType, + cfg, + affNode, + memPlc, + cacheObjCtx, + freeList, + reuseList, + exchTopVer); + + for (Object obj : grp.configuredUserObjects()) + prepare(cfg, obj, false); + + U.startLifecycleAware(grp.configuredUserObjects()); + + grp.start(); + + CacheGroupContext old = cacheGrps.put(desc.groupId(), grp); + + assert old == null : old.name(); + + return grp; + } + + /** * @param req Stop request. */ void blockGateway(DynamicCacheChangeRequest req) { @@ -1958,8 +2078,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { Throwable err ) { ExchangeActions actions = new ExchangeActions(){ - @Override List<DynamicCacheChangeRequest> closeRequests(UUID nodeId) { - return Collections.singletonList(act.request()); + @Override List<ActionData> closeRequests(UUID nodeId) { + return Collections.singletonList(act); } }; @@ -1992,11 +2112,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { } if (exchActions != null && (err == null || forceClose)) { - Collection<IgniteBiTuple<GridCacheContext, Boolean>> stopped = null; + Collection<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGrps = null; - GridCacheContext<?, ?> stopCtx = null; - boolean destroy = false; for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) { + GridCacheContext<?, ?> stopCtx; + boolean destroy; + stopGateway(action.request()); sharedCtx.database().checkpointReadLock(); @@ -2009,16 +2130,19 @@ public class GridCacheProcessor extends GridProcessorAdapter { sharedCtx.database().checkpointReadUnlock(); } - if (stopCtx != null) { - if (stopped == null) - stopped = new ArrayList<>(); + if (stopCtx != null && !stopCtx.group().hasCaches()) { + if (stoppedGrps == null) + stoppedGrps = new ArrayList<>(); - stopped.add(F.<GridCacheContext, Boolean>t(stopCtx, destroy)); + stoppedGrps.add(F.t(stopCtx.group(), destroy)); } } - for (DynamicCacheChangeRequest req : exchActions.closeRequests(ctx.localNodeId())) { - String cacheName = req.cacheName(); + for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStop()) + stopCacheGroup(grpDesc.groupId()); + + for (ExchangeActions.ActionData req : exchActions.closeRequests(ctx.localNodeId())) { + String cacheName = req.request().cacheName(); IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(cacheName); @@ -2031,7 +2155,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { jCacheProxies.putIfAbsent(cacheName, new IgniteCacheProxy(cache.context(), cache, null, false)); } else { - if (req.restart()) + if (req.request().restart()) proxy.restart(); proxy.context().gate().onStopped(); @@ -2039,8 +2163,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { sharedCtx.database().checkpointReadLock(); try { - stopCtx = prepareCacheStop(req, forceClose); - destroy = req.destroy(); + GridCacheContext<?, ?> stopCtx = prepareCacheStop(req.request(), forceClose); + + if (stopCtx != null && !stopCtx.group().hasCaches()) { + assert !stopCtx.group().affinityNode() : stopCtx.name(); + + stopCacheGroup(stopCtx.groupId()); + } + } finally { sharedCtx.database().checkpointReadUnlock(); @@ -2048,24 +2178,37 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } - if (stopCtx != null) { - if (stopped == null) - stopped = new ArrayList<>(); - - stopped.add(F.<GridCacheContext, Boolean>t(stopCtx, destroy)); - } - if (forceClose) - completeCacheStartFuture(req, false, err); - + completeCacheStartFuture(req.request(), false, err); } - if (stopped != null && !sharedCtx.kernalContext().clientNode()) - sharedCtx.database().onCachesStopped(stopped); + if (stoppedGrps != null && !sharedCtx.kernalContext().clientNode()) + sharedCtx.database().onCacheGroupsStopped(stoppedGrps); } } /** + * @param grpId Group ID. + */ + private void stopCacheGroup(int grpId) { + CacheGroupContext grp = cacheGrps.remove(grpId); + + if (grp != null) + stopCacheGroup(grp); + } + + /** + * @param grp Cache group. + */ + private void stopCacheGroup(CacheGroupContext grp) { + grp.stopGroup(); + + U.stopLifecycleAware(log, grp.configuredUserObjects()); + + cleanup(grp); + } + + /** * @param cacheName Cache name. * @param deploymentId Future deployment ID. */ @@ -2607,17 +2750,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (!ctx.config().isDaemon() && sharedCtx.pageStore() != null && sharedCtx.database().persistenceEnabled()) { - Set<String> savedCacheNames = sharedCtx.pageStore().savedCacheNames(); + Map<String, CacheConfiguration> savedCaches = sharedCtx.pageStore().readCacheConfigurations(); - for (String name : savedCacheNames) { - CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name); - - if (cfg != null) - reqs.add(createRequest(cfg, false)); - } + for (CacheConfiguration cfg : savedCaches.values()) + reqs.add(createRequest(cfg, false)); for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) { - if (!savedCacheNames.contains(cfg.getName())) + if (!savedCaches.containsKey(cfg.getName())) reqs.add(createRequest(cfg, true)); } } @@ -2716,6 +2855,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } } + if (req.start() && req.startCacheConfiguration() != null) { + CacheConfiguration ccfg = req.startCacheConfiguration(); + + try { + cachesInfo.validateStartCacheConfiguration(ccfg); + } + catch (IgniteCheckedException e) { + fut.onDone(e); + } + } if (fut.isDone()) continue; @@ -2831,7 +2980,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { } else if (rebalanceOrder < 0) throw new IgniteCheckedException("Rebalance order cannot be negative for cache (fix configuration and restart " + - "the node) [cacheName=" + U.maskName(cfg.getName()) + ", rebalanceOrder=" + rebalanceOrder + ']'); + "the node) [cacheName=" + cfg.getName() + ", rebalanceOrder=" + rebalanceOrder + ']'); } return maxOrder; @@ -2848,7 +2997,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ @Nullable private IgniteNodeValidationResult validateHashIdResolvers(ClusterNode node) { if (!node.isClient()) { - for (DynamicCacheDescriptor desc : cacheDescriptors()) { + for (DynamicCacheDescriptor desc : cacheDescriptors().values()) { CacheConfiguration cfg = desc.cacheConfiguration(); if (cfg.getAffinity() instanceof RendezvousAffinityFunction) { @@ -2862,11 +3011,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (nodeHashObj.hashCode() == topNodeHashObj.hashCode()) { String errMsg = "Failed to add node to topology because it has the same hash code for " + "partitioned affinity as one of existing nodes [cacheName=" + - U.maskName(cfg.getName()) + ", existingNodeId=" + topNode.id() + ']'; + cfg.getName() + ", existingNodeId=" + topNode.id() + ']'; String sndMsg = "Failed to add node to topology because it has the same hash code for " + "partitioned affinity as one of existing nodes [cacheName=" + - U.maskName(cfg.getName()) + ", existingNodeId=" + topNode.id() + ']'; + cfg.getName() + ", existingNodeId=" + topNode.id() + ']'; return new IgniteNodeValidationResult(topNode.id(), errMsg, sndMsg); } @@ -3125,8 +3274,15 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @return Cache descriptors. */ - public Collection<DynamicCacheDescriptor> cacheDescriptors() { - return cachesInfo.registeredCaches().values(); + public Map<String, DynamicCacheDescriptor> cacheDescriptors() { + return cachesInfo.registeredCaches(); + } + + /** + * @return Cache group descriptors. + */ + public Map<Integer, CacheGroupDescriptor> cacheGroupDescriptors() { + return cachesInfo.registeredCacheGroups(); } /** @@ -3134,7 +3290,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Cache descriptor. */ @Nullable public DynamicCacheDescriptor cacheDescriptor(int cacheId) { - for (DynamicCacheDescriptor cacheDesc : cacheDescriptors()) { + for (DynamicCacheDescriptor cacheDesc : cacheDescriptors().values()) { CacheConfiguration ccfg = cacheDesc.cacheConfiguration(); assert ccfg != null : cacheDesc; @@ -3428,19 +3584,21 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param grp Cache group. * @param ccfg Cache configuration. * @param objs Extra components. * @return Components provided in cache configuration which can implement {@link LifecycleAware} interface. */ - private Iterable<Object> lifecycleAwares(CacheConfiguration ccfg, Object... objs) { + private Iterable<Object> lifecycleAwares(CacheGroupContext grp, CacheConfiguration ccfg, Object... objs) { Collection<Object> ret = new ArrayList<>(7 + objs.length); - ret.add(ccfg.getAffinity()); + if (grp.affinityFunction() != ccfg.getAffinity()) + ret.add(ccfg.getAffinity()); + ret.add(ccfg.getAffinityMapper()); ret.add(ccfg.getEvictionFilter()); ret.add(ccfg.getEvictionPolicy()); ret.add(ccfg.getInterceptor()); - ret.add(ccfg.getTopologyValidator()); NearCacheConfiguration nearCfg = ccfg.getNearConfiguration(); @@ -3811,12 +3969,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.closure().runLocalSafe(new Runnable() { @Override public void run() { try { - for (GridCacheContext cacheCtx : sharedCtx.cacheContexts()) { - if (!cacheCtx.isLocal() && cacheCtx.affinityNode()) { + for (CacheGroupContext grp : sharedCtx.cache().cacheGroups()) { + if (!grp.isLocal() && grp.affinityNode()) { GridDhtPartitionTopology top = null; try { - top = cacheCtx.topology(); + top = grp.topology(); } catch (IllegalStateException ignore) { // Cache stopped.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 04f5b85..be1ab3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -415,7 +415,7 @@ public class GridCacheSharedContext<K, V> { /** * @param cacheCtx Cache context to remove. */ - public void removeCacheContext(GridCacheContext cacheCtx) { + void removeCacheContext(GridCacheContext cacheCtx) { int cacheId = cacheCtx.cacheId(); ctxMap.remove(cacheId, cacheCtx); @@ -426,7 +426,7 @@ public class GridCacheSharedContext<K, V> { locStoreCnt.decrementAndGet(); // Safely clean up the message listeners. - ioMgr.removeHandlers(cacheId); + ioMgr.removeCacheHandlers(cacheId); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java index 614b3e3..e7e6aec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java @@ -38,7 +38,13 @@ import org.jsr166.LongAdder8; */ public class GridCacheTtlManager extends GridCacheManagerAdapter { /** Entries pending removal. */ - private GridConcurrentSkipListSetEx pendingEntries; + private GridConcurrentSkipListSetEx pendingEntries; + + /** */ + private boolean eagerTtlEnabled; + + /** */ + private GridCacheContext dhtCtx; /** */ private final IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> expireC = @@ -70,6 +76,8 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { + dhtCtx = cctx.isNear() ? cctx.near().dht().context() : cctx; + boolean cleanupDisabled = cctx.kernalContext().isDaemon() || !cctx.config().isEagerTtl() || CU.isAtomicsCache(cctx.name()) || @@ -79,11 +87,20 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { if (cleanupDisabled) return; + eagerTtlEnabled = true; + cctx.shared().ttl().register(this); pendingEntries = (!cctx.isLocal() && cctx.config().getNearConfiguration() != null) ? new GridConcurrentSkipListSetEx() : null; } + /** + * @return {@code True} if eager ttl is enabled for cache. + */ + public boolean eagerTtlEnabled() { + return eagerTtlEnabled; + } + /** {@inheritDoc} */ @Override protected void onKernalStop0(boolean cancel) { if (pendingEntries != null) @@ -153,7 +170,6 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { try { if (pendingEntries != null) { - //todo may be not only for near? may be for local too. GridNearCacheAdapter nearCache = cctx.near(); GridCacheVersion obsoleteVer = null; @@ -178,7 +194,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { } } - boolean more = cctx.offheap().expire(expireC, amount); + boolean more = cctx.offheap().expire(dhtCtx, expireC, amount); if (more) return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 4dc5f8e..0b11900 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -467,7 +467,7 @@ public class GridCacheUtils { * @return All nodes on which cache with the same name is started. */ public static Collection<ClusterNode> affinityNodes(final GridCacheContext ctx) { - return ctx.discovery().cacheAffinityNodes(ctx.cacheId(), AffinityTopologyVersion.NONE); + return ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), AffinityTopologyVersion.NONE); } /** @@ -478,7 +478,7 @@ public class GridCacheUtils { * @return Affinity nodes. */ public static Collection<ClusterNode> affinityNodes(GridCacheContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().cacheAffinityNodes(ctx.cacheId(), topOrder); + return ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), topOrder); } /** @@ -989,6 +989,44 @@ public class GridCacheUtils { } } } + /** + * @param cfg1 Existing configuration. + * @param cfg2 Cache configuration to start. + * @param attrName Short attribute name for error message. + * @param attrMsg Full attribute name for error message. + * @param val1 Attribute value in existing configuration. + * @param val2 Attribute value in starting configuration. + * @param fail If true throws IgniteCheckedException in case of attribute values mismatch, otherwise logs warning. + * @throws IgniteCheckedException If validation failed. + */ + public static void validateCacheGroupsAttributesMismatch(IgniteLogger log, + CacheConfiguration cfg1, + CacheConfiguration cfg2, + String attrName, + String attrMsg, + Object val1, + Object val2, + boolean fail) throws IgniteCheckedException { + if (F.eq(val1, val2)) + return; + + if (fail) { + throw new IgniteCheckedException(attrMsg + " mismatch for caches related to the same group " + + "[groupName=" + cfg1.getGroupName() + + ", existingCache=" + cfg1.getName() + + ", existing" + capitalize(attrName) + "=" + val1 + + ", startingCache=" + cfg2.getName() + + ", starting" + capitalize(attrName) + "=" + val2 + ']'); + } + else { + U.warn(log, attrMsg + " mismatch for caches related to the same group " + + "[groupName=" + cfg1.getGroupName() + + ", existingCache=" + cfg1.getName() + + ", existing" + capitalize(attrName) + "=" + val1 + + ", startingCache=" + cfg2.getName() + + ", starting" + capitalize(attrName) + "=" + val2 + ']'); + } + } /** * @param str String. http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java index 0d04ccb..e49be49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridChangeGlobalStateMessageResponse.java @@ -59,6 +59,16 @@ public class GridChangeGlobalStateMessageResponse extends GridCacheMessage { this.err = err; } + /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + + /** {@inheritDoc} */ + @Override public int handlerId() { + return 0; + } + /** * */ @@ -106,13 +116,13 @@ public class GridChangeGlobalStateMessageResponse extends GridCacheMessage { } switch (writer.state()) { - case 3: + case 2: if (!writer.writeByteArray("errBytes", errBytes)) return false; writer.incrementState(); - case 4: + case 3: if (!writer.writeUuid("requestId", requestId)) return false; @@ -134,7 +144,7 @@ public class GridChangeGlobalStateMessageResponse extends GridCacheMessage { return false; switch (reader.state()) { - case 3: + case 2: errBytes = reader.readByteArray("errBytes"); if (!reader.isLastRead()) @@ -142,7 +152,7 @@ public class GridChangeGlobalStateMessageResponse extends GridCacheMessage { reader.incrementState(); - case 4: + case 3: requestId = reader.readUuid("requestId"); if (!reader.isLastRead()) @@ -162,7 +172,7 @@ public class GridChangeGlobalStateMessageResponse extends GridCacheMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 5; + return 4; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java index 5e52c8b..77a9ba4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java @@ -22,30 +22,19 @@ import java.util.Collections; import java.util.Set; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.jetbrains.annotations.Nullable; /** * Empty cache map that will never store any entries. */ public class GridNoStorageCacheMap implements GridCacheConcurrentMap { - /** Context. */ - private final GridCacheContext ctx; - - /** - * @param ctx Cache context. - */ - public GridNoStorageCacheMap(GridCacheContext ctx) { - this.ctx = ctx; - } - /** {@inheritDoc} */ - @Nullable @Override public GridCacheMapEntry getEntry(KeyCacheObject key) { + @Nullable @Override public GridCacheMapEntry getEntry(GridCacheContext ctx, KeyCacheObject key) { return null; } /** {@inheritDoc} */ - @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, + @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(GridCacheContext ctx, AffinityTopologyVersion topVer, KeyCacheObject key, boolean create, boolean touch) { @@ -66,32 +55,27 @@ public class GridNoStorageCacheMap implements GridCacheConcurrentMap { } /** {@inheritDoc} */ - @Override public int publicSize() { + @Override public int publicSize(int cacheId) { return 0; } /** {@inheritDoc} */ - @Override public void incrementPublicSize(GridCacheEntryEx e) { + @Override public void incrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) { // No-op. } /** {@inheritDoc} */ - @Override public void decrementPublicSize(GridCacheEntryEx e) { + @Override public void decrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) { // No-op. } /** {@inheritDoc} */ - @Override public Iterable<GridCacheMapEntry> entries(CacheEntryPredicate... filter) { - return Collections.emptySet(); - } - - /** {@inheritDoc} */ - @Override public Iterable<GridCacheMapEntry> allEntries(CacheEntryPredicate... filter) { + @Override public Iterable<GridCacheMapEntry> entries(int cacheId, CacheEntryPredicate... filter) { return Collections.emptySet(); } /** {@inheritDoc} */ - @Override public Set<GridCacheMapEntry> entrySet(CacheEntryPredicate... filter) { + @Override public Set<GridCacheMapEntry> entrySet(int cacheId, CacheEntryPredicate... filter) { return Collections.emptySet(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 5bcefda..8951396 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import java.util.Map; import javax.cache.Cache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -38,7 +39,36 @@ import org.jetbrains.annotations.Nullable; * */ @SuppressWarnings("WeakerAccess") -public interface IgniteCacheOffheapManager extends GridCacheManager { +public interface IgniteCacheOffheapManager { + /** + * @param ctx Context. + * @param grp Cache group. + * @throws IgniteCheckedException If failed. + */ + public void start(GridCacheSharedContext ctx, CacheGroupContext grp) throws IgniteCheckedException;; + + /** + * @param cctx Cache context. + * @throws IgniteCheckedException If failed. + */ + public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException; + + /** + * + */ + public void onKernalStop(); + + /** + * @param cacheId Cache ID. + * @param destroy Destroy data flag. + */ + public void stopCache(int cacheId, boolean destroy); + + /** + * + */ + public void stop(); + /** * Partition counter update callback. May be overridden by plugin-provided subclasses. * @@ -71,11 +101,12 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { @Nullable public CacheDataRow read(GridCacheMapEntry entry) throws IgniteCheckedException; /** + * @param cctx Cache context. * @param key Key. * @return Cached row, if available, null otherwise. * @throws IgniteCheckedException If failed. */ - @Nullable public CacheDataRow read(KeyCacheObject key) throws IgniteCheckedException; + @Nullable public CacheDataRow read(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException; /** * @param p Partition. @@ -97,6 +128,7 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { /** * @param store Data store. + * @throws IgniteCheckedException If failed. */ public void destroyCacheDataStore(CacheDataStore store) throws IgniteCheckedException; @@ -106,10 +138,12 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { public boolean containsKey(GridCacheMapEntry entry); /** + * @param cctx Cache context. * @param c Closure. * @throws IgniteCheckedException If failed. */ - public boolean expire(IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, int amount) throws IgniteCheckedException; + public boolean expire(GridCacheContext cctx, IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, int amount) + throws IgniteCheckedException; /** * Gets the number of entries pending expire. @@ -120,15 +154,17 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { public long expiredSize() throws IgniteCheckedException; /** + * @param cctx Cache context. * @param key Key. * @param part Partition. * @param c Tree update closure. * @throws IgniteCheckedException If failed. */ - public void invoke(KeyCacheObject key, GridDhtLocalPartition part, OffheapInvokeClosure c) + public void invoke(GridCacheContext cctx, KeyCacheObject key, GridDhtLocalPartition part, OffheapInvokeClosure c) throws IgniteCheckedException; /** + * @param cctx Cache context. * @param key Key. * @param val Value. * @param ver Version. @@ -138,6 +174,7 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { * @throws IgniteCheckedException If failed. */ public void update( + GridCacheContext cctx, KeyCacheObject key, CacheObject val, GridCacheVersion ver, @@ -147,22 +184,26 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { ) throws IgniteCheckedException; /** + * @param cctx Cache context. * @param key Key. * @param part Partition. * @throws IgniteCheckedException If failed. */ public void updateIndexes( + GridCacheContext cctx, KeyCacheObject key, GridDhtLocalPartition part ) throws IgniteCheckedException; /** + * @param cctx Cache context. * @param key Key. * @param partId Partition number. * @param part Partition. * @throws IgniteCheckedException If failed. */ public void remove( + GridCacheContext cctx, KeyCacheObject key, int partId, GridDhtLocalPartition part @@ -175,24 +216,37 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { public int onUndeploy(ClassLoader ldr); /** + * @param cacheId Cache ID. * @param primary Primary entries flag. * @param backup Backup entries flag. * @param topVer Topology version. * @return Rows iterator. * @throws IgniteCheckedException If failed. */ - public GridIterator<CacheDataRow> iterator(boolean primary, boolean backup, final AffinityTopologyVersion topVer) + public GridIterator<CacheDataRow> cacheIterator(int cacheId, + boolean primary, + boolean backup, + final AffinityTopologyVersion topVer) throws IgniteCheckedException; /** + * @param cacheId Cache ID. * @param part Partition. * @return Partition data iterator. * @throws IgniteCheckedException If failed. */ - public GridIterator<CacheDataRow> iterator(final int part) throws IgniteCheckedException; + public GridIterator<CacheDataRow> cachePartitionIterator(int cacheId, final int part) throws IgniteCheckedException; + + /** + * @param part Partition number. + * @return Iterator for given partition. + * @throws IgniteCheckedException If failed. + */ + public GridIterator<CacheDataRow> partitionIterator(final int part) throws IgniteCheckedException; /** * @param part Partition. + * @param topVer Topology version. * @param partCntr Partition counter to get historical data if available. * @return Partition data iterator. * @throws IgniteCheckedException If failed. @@ -201,6 +255,7 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { throws IgniteCheckedException; /** + * @param cctx Cache context. * @param primary Primary entries flag. * @param backup Backup entries flag. * @param topVer Topology version. @@ -208,40 +263,47 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { * @return Entries iterator. * @throws IgniteCheckedException If failed. */ - public <K, V> GridCloseableIterator<Cache.Entry<K, V>> entriesIterator(final boolean primary, + public <K, V> GridCloseableIterator<Cache.Entry<K, V>> cacheEntriesIterator( + GridCacheContext cctx, + final boolean primary, final boolean backup, final AffinityTopologyVersion topVer, final boolean keepBinary) throws IgniteCheckedException; /** + * @param cacheId Cache ID. * @param part Partition. * @return Iterator. * @throws IgniteCheckedException If failed. */ - public GridCloseableIterator<KeyCacheObject> keysIterator(final int part) throws IgniteCheckedException; + public GridCloseableIterator<KeyCacheObject> cacheKeysIterator(int cacheId, final int part) + throws IgniteCheckedException; /** + * @param cacheId Cache ID. * @param primary Primary entries flag. * @param backup Backup entries flag. * @param topVer Topology version. * @return Entries count. * @throws IgniteCheckedException If failed. */ - public long entriesCount(boolean primary, boolean backup, AffinityTopologyVersion topVer) + public long cacheEntriesCount(int cacheId, boolean primary, boolean backup, AffinityTopologyVersion topVer) throws IgniteCheckedException; /** * Clears offheap entries. * + * @param cctx Cache context. * @param readers {@code True} to clear readers. */ - public void clear(boolean readers); + public void clearCache(GridCacheContext cctx, boolean readers); /** + * @param cacheId Cache ID. * @param part Partition. * @return Number of entries in given partition. */ - public long entriesCount(int part); + public long cacheEntriesCount(int cacheId, int part); /** * @return Offheap allocated size. @@ -254,28 +316,38 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { public GridAtomicLong globalRemoveId(); /** + * @param cacheId Cache ID. * @param idxName Index name. * @return Root page for index tree. * @throws IgniteCheckedException If failed. */ - public RootPage rootPageForIndex(String idxName) throws IgniteCheckedException; + public RootPage rootPageForIndex(int cacheId, String idxName) throws IgniteCheckedException; /** + * @param cacheId Cache ID. * @param idxName Index name. * @throws IgniteCheckedException If failed. */ - public void dropRootPageForIndex(String idxName) throws IgniteCheckedException; + public void dropRootPageForIndex(int cacheId, String idxName) throws IgniteCheckedException; /** + * @param idxName Index name. * @return Reuse list for index tree. + * @throws IgniteCheckedException If failed. */ public ReuseList reuseListForIndex(String idxName) throws IgniteCheckedException; /** - * + * @param cacheId Cache ID. + * @return Number of entries. + */ + public long cacheEntriesCount(int cacheId); + + /** + * @param part Partition. * @return Number of entries. */ - public long entriesCount(); + public int totalPartitionEntriesCount(int part); /** * @@ -304,13 +376,25 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { /** * @param size Size to init. * @param updCntr Update counter to init. + * @param cacheSizes Cache sizes if store belongs to group containing multiple caches. */ - void init(long size, long updCntr); + void init(long size, long updCntr, @Nullable Map<Integer, Long> cacheSizes); /** + * @param cacheId Cache ID. * @return Size. */ - int size(); + int cacheSize(int cacheId); + + /** + * @return Cache sizes if store belongs to group containing multiple caches. + */ + Map<Integer, Long> cacheSizes(); + + /** + * @return Total size. + */ + int fullSize(); /** * @return Update counter. @@ -333,6 +417,7 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { public Long initialUpdateCounter(); /** + * @param cctx Cache context. * @param key Key. * @param val Value. * @param ver Version. @@ -341,13 +426,16 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { * @return New row. * @throws IgniteCheckedException If failed. */ - CacheDataRow createRow(KeyCacheObject key, + CacheDataRow createRow( + GridCacheContext cctx, + KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expireTime, @Nullable CacheDataRow oldRow) throws IgniteCheckedException; /** + * @param cctx Cache context. * @param key Key. * @param val Value. * @param ver Version. @@ -355,38 +443,44 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { * @param oldRow Old row if available. * @throws IgniteCheckedException If failed. */ - void update(KeyCacheObject key, + void update( + GridCacheContext cctx, + KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expireTime, @Nullable CacheDataRow oldRow) throws IgniteCheckedException; /** + * @param cctx Cache context. * @param key Key. * @throws IgniteCheckedException If failed. */ - void updateIndexes(KeyCacheObject key) throws IgniteCheckedException; + void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException; /** + * @param cctx Cache context. * @param key Key. * @param c Closure. * @throws IgniteCheckedException If failed. */ - public void invoke(KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException; + public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException; /** + * @param cctx Cache context. * @param key Key. * @param partId Partition number. * @throws IgniteCheckedException If failed. */ - public void remove(KeyCacheObject key, int partId) throws IgniteCheckedException; + public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) throws IgniteCheckedException; /** + * @param cctx Cache context. * @param key Key. * @return Data row. * @throws IgniteCheckedException If failed. */ - public CacheDataRow find(KeyCacheObject key) throws IgniteCheckedException; + public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException; /** * @return Data cursor. @@ -395,15 +489,34 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { public GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException; /** + * @param cacheId Cache ID. + * @return Data cursor. + * @throws IgniteCheckedException If failed. + */ + public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException; + + /** + * @param cacheId Cache ID. * @param lower Lower bound. * @param upper Upper bound. * @return Data cursor. * @throws IgniteCheckedException If failed. */ - public GridCursor<? extends CacheDataRow> cursor(KeyCacheObject lower, + public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower, KeyCacheObject upper) throws IgniteCheckedException; /** + * @param cacheId Cache ID. + * @param lower Lower bound. + * @param upper Upper bound. + * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. + * @return Data cursor. + * @throws IgniteCheckedException If failed. + */ + public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower, + KeyCacheObject upper, Object x) throws IgniteCheckedException; + + /** * Destroys the tree associated with the store. * * @throws IgniteCheckedException If failed. @@ -411,6 +524,14 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { public void destroy() throws IgniteCheckedException; /** + * Clears all the records associated with logical cache with given ID. + * + * @param cacheId Cache ID. + * @throws IgniteCheckedException If failed. + */ + public void clear(int cacheId) throws IgniteCheckedException; + + /** * @return Row store. */ public RowStore rowStore();
