ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7ae45b4b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7ae45b4b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7ae45b4b Branch: refs/heads/ignite-5075-pds Commit: 7ae45b4b63c5374cd5a1bb770f3cff9d56fc0a9e Parents: 0840435 Author: sboikov <[email protected]> Authored: Mon May 22 13:58:41 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon May 22 18:11:28 2017 +0300 ---------------------------------------------------------------------- .../pagemem/store/IgnitePageStoreManager.java | 37 +-- .../pagemem/wal/record/CheckpointRecord.java | 22 +- .../cache/CacheGroupInfrastructure.java | 32 +- .../processors/cache/GridCacheMapEntry.java | 3 +- .../GridCachePartitionExchangeManager.java | 7 +- .../processors/cache/GridCacheProcessor.java | 54 ++-- .../processors/cache/GridCacheTtlManager.java | 2 +- .../cache/IgniteCacheOffheapManager.java | 11 +- .../cache/IgniteCacheOffheapManagerImpl.java | 55 ++-- .../cache/database/CacheDataRowAdapter.java | 13 +- .../IgniteCacheDatabaseSharedManager.java | 9 +- .../database/IgniteCacheSnapshotManager.java | 14 +- .../processors/cache/database/MetaStore.java | 6 +- .../processors/cache/database/RowStore.java | 15 +- .../dht/GridDhtPartitionTopologyImpl.java | 6 +- .../dht/preloader/GridDhtPartitionDemander.java | 45 +-- .../dht/preloader/GridDhtPartitionSupplier.java | 6 +- .../GridDhtPartitionSupplyMessage.java | 30 +- .../GridDhtPartitionsExchangeFuture.java | 13 +- .../preloader/GridDhtPartitionsFullMessage.java | 14 +- .../GridDhtPartitionsSingleMessage.java | 23 +- .../dht/preloader/GridDhtPreloader.java | 15 +- .../cluster/GridClusterStateProcessor.java | 5 +- .../ignite/testframework/GridTestUtils.java | 9 + .../processors/query/h2/IgniteH2Indexing.java | 2 +- .../GridCacheDatabaseSharedManager.java | 273 ++++++++-------- .../cache/database/GridCacheOffheapManager.java | 315 +++++++++++-------- .../database/file/FilePageStoreManager.java | 187 +++++++---- .../cache/database/pagemem/PageMemoryEx.java | 8 +- .../cache/database/pagemem/PageMemoryImpl.java | 8 +- .../wal/serializer/RecordV1Serializer.java | 6 +- .../IgnitePersistentStoreCacheGroupsTest.java | 161 ++++++++++ .../database/pagemem/NoOpPageStoreManager.java | 22 +- 33 files changed, 875 insertions(+), 553 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java index 0453ecb..503f4bae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java @@ -17,11 +17,12 @@ package org.apache.ignite.internal.pagemem.store; -import java.util.Set; +import java.util.Map; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.pagemem.PageMemory; -import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; +import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; import org.apache.ignite.internal.processors.cache.GridCacheSharedManager; import java.nio.ByteBuffer; @@ -44,39 +45,40 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh /** * Callback called when a cache is starting. * + * @param grpDesc Cache group descriptor. * @param ccfg Cache configuration of the cache being started. * @throws IgniteCheckedException If failed to handle cache start callback. */ - public void initializeForCache(CacheConfiguration ccfg) throws IgniteCheckedException; + public void initializeForCache(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) throws IgniteCheckedException; /** * Callback called when a cache is stopping. After this callback is invoked, no data associated with * the given cache will be stored on disk. * - * @param cacheCtx Cache context of the cache being stopped. + * @param grp Cache group being stopped. * @param destroy Flag indicating if the cache is being destroyed and data should be cleaned. * @throws IgniteCheckedException If failed to handle cache destroy callback. */ - public void shutdownForCache(GridCacheContext cacheCtx, boolean destroy) throws IgniteCheckedException; + public void shutdownForCacheGroup(CacheGroupInfrastructure grp, boolean destroy) throws IgniteCheckedException; /** * Callback called when a partition is created on the local node. * - * @param cacheId Cache ID where the partition is being created. + * @param grpId Cache group ID where the partition is being created. * @param partId ID of the partition being created. * @throws IgniteCheckedException If failed to handle partition create callback. */ - public void onPartitionCreated(int cacheId, int partId) throws IgniteCheckedException; + public void onPartitionCreated(int grpId, int partId) throws IgniteCheckedException; /** * Callback called when a partition for the given cache is evicted from the local node. * After this callback is invoked, no data associated with the partition will be stored on disk. * - * @param cacheId Cache ID of the evicted partition. + * @param grpId Cache group ID of the evicted partition. * @param partId Partition ID. * @throws IgniteCheckedException If failed to handle partition destroy callback. */ - public void onPartitionDestroyed(int cacheId, int partId, int tag) throws IgniteCheckedException; + public void onPartitionDestroyed(int grpId, int partId, int tag) throws IgniteCheckedException; /** * Reads a page for the given cache ID. Cache ID may be {@code 0} if the page is a meta page. @@ -174,19 +176,14 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh public long metaPageId(int cacheId); /** - * @return set of cache names which configurations were saved - */ - public Set<String> savedCacheNames(); - - /** - * @param cacheName Cache name. - * @return saved configuration for cache + * @return Saved cache configurations. + * @throws IgniteCheckedException If failed. */ - public CacheConfiguration readConfiguration(String cacheName); + public Map<String, CacheConfiguration> readCacheConfigurations() throws IgniteCheckedException; /** - * @param cacheId Cache ID. - * @return {@code True} if index store for given cache existed before node started. + * @param grpId Cache group ID. + * @return {@code True} if index store for given cache group existed before node started. */ - public boolean hasIndexStore(int cacheId); + public boolean hasIndexStore(int grpId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CheckpointRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CheckpointRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CheckpointRecord.java index 7aaf1c5..cfcd62a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CheckpointRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CheckpointRecord.java @@ -35,7 +35,7 @@ public class CheckpointRecord extends WALRecord { private boolean end; /** */ - private Map<Integer, CacheState> cacheStates; + private Map<Integer, CacheState> cacheGrpStates; /** Safe replay pointer. */ private WALPointer cpMark; @@ -65,28 +65,28 @@ public class CheckpointRecord extends WALRecord { } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param state Cache state. */ - public void addCacheState(int cacheId, CacheState state) { - if (cacheStates == null) - cacheStates = new HashMap<>(); + public void addCacheGroupState(int grpId, CacheState state) { + if (cacheGrpStates == null) + cacheGrpStates = new HashMap<>(); - cacheStates.put(cacheId, state); + cacheGrpStates.put(grpId, state); } /** - * @param cacheStates Cache states. + * @param cacheGrpStates Cache states. */ - public void cacheStates(Map<Integer, CacheState> cacheStates) { - this.cacheStates = cacheStates; + public void cacheGroupStates(Map<Integer, CacheState> cacheGrpStates) { + this.cacheGrpStates = cacheGrpStates; } /** * @return Cache states. */ - public Map<Integer, CacheState> cacheStates() { - return cacheStates != null ? cacheStates : Collections.<Integer, CacheState>emptyMap(); + public Map<Integer, CacheState> cacheGroupStates() { + return cacheGrpStates != null ? cacheGrpStates : Collections.<Integer, CacheState>emptyMap(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index 1fc548e..18f166a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -617,6 +618,20 @@ public class CacheGroupInfrastructure { } /** + * @return IDs of caches in this group. + */ + public Set<Integer> cacheIds() { + synchronized (caches) { + Set<Integer> ids = U.newHashSet(caches.size()); + + for (int i = 0; i < caches.size(); i++) + ids.add(caches.get(i).cacheId()); + + return ids; + } + } + + /** * @return {@code True} if group contains caches. */ boolean hasCaches() { @@ -684,8 +699,21 @@ public class CacheGroupInfrastructure { else preldr = new GridCachePreloaderAdapter(this); - // TODO IGNITE-5075 get from plugin. - offheapMgr = new IgniteCacheOffheapManagerImpl(); + if (ctx.kernalContext().config().getPersistenceConfiguration() != null) { + ClassLoader clsLdr = U.gridClassLoader(); + + try { + offheapMgr = (IgniteCacheOffheapManager) clsLdr + .loadClass("org.apache.ignite.internal.processors.cache.database.GridCacheOffheapManager") + .getConstructor() + .newInstance(); + } + catch (Exception e) { + throw new IgniteCheckedException("Failed to initialize offheap manager", e); + } + } + else + offheapMgr = new IgniteCacheOffheapManagerImpl(); offheapMgr.start(ctx, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index f0317ce..5e23995 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -3195,7 +3195,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme synchronized (this) { checkObsolete(); - cctx.offheap().updateIndexes(key, localPartition()); + if (cctx.queries().enabled()) + cctx.offheap().updateIndexes(cctx, key, localPartition()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 976b843..ecdb0b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1266,7 +1266,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana top = grp.topology(); if (top != null) - updated |= top.update(null, entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), cacheId)) != null; + updated |= top.update(null, entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), grpId)) != null; } if (!cctx.kernalContext().clientNode() && updated) @@ -1274,9 +1274,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean hasMovingParts = false; - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal() && cacheCtx.started() && cacheCtx.topology().hasMovingPartitions()) { + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + if (!grp.isLocal() && grp.topology().hasMovingPartitions()) { hasMovingParts = true; + break; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 7dd457b..3899cc2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -758,22 +758,20 @@ public class GridCacheProcessor extends GridProcessorAdapter { assert !ctx.config().isDaemon(); if (sharedCtx.pageStore() != null && sharedCtx.database().persistenceEnabled()) { - Set<String> savedCacheNames = sharedCtx.pageStore().savedCacheNames(); + Map<String, CacheConfiguration> ccfgs = sharedCtx.pageStore().readCacheConfigurations(); - savedCacheNames.removeAll(caches.keySet()); + for (String cache : caches.keySet()) + ccfgs.remove(cache); - savedCacheNames.removeAll(internalCaches); + for (String cache : internalCaches) + ccfgs.remove(cache); - if (!F.isEmpty(savedCacheNames)) { + if (!F.isEmpty(ccfgs)) { if (log.isInfoEnabled()) - log.info("Register persistent caches: " + savedCacheNames); + log.info("Register persistent caches: " + ccfgs.keySet()); - for (String name : savedCacheNames) { - CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name); - - if (cfg != null) - addCacheOnJoin(cfg, caches, templates); - } + for (CacheConfiguration ccfg : ccfgs.values()) + addCacheOnJoin(ccfg, caches, templates); } } } @@ -1177,18 +1175,19 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param grpDesc Cache group descriptor. * @param cache Cache to start. * @param schema Cache schema. * @throws IgniteCheckedException If failed to start cache. */ @SuppressWarnings({"TypeMayBeWeakened", "unchecked"}) - private void startCache(GridCacheAdapter<?, ?> cache, QuerySchema schema) throws IgniteCheckedException { + private void startCache(CacheGroupDescriptor grpDesc, GridCacheAdapter<?, ?> cache, QuerySchema schema) throws IgniteCheckedException { GridCacheContext<?, ?> cacheCtx = cache.context(); ctx.continuous().onCacheStart(cacheCtx); if (sharedCtx.pageStore() != null && !ctx.clientNode()) - sharedCtx.pageStore().initializeForCache(cacheCtx.config()); + sharedCtx.pageStore().initializeForCache(grpDesc, cacheCtx.config()); CacheConfiguration cfg = cacheCtx.config(); @@ -1926,7 +1925,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { caches.put(cacheCtx.name(), cache); - startCache(cache, schema != null ? schema : new QuerySchema()); + startCache(grpDesc, cache, schema != null ? schema : new QuerySchema()); grp.onCacheStarted(cacheCtx); @@ -2080,7 +2079,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { } if (exchActions != null && err == null) { - Collection<IgniteBiTuple<GridCacheContext, Boolean>> stopped = null; + Collection<IgniteBiTuple<CacheGroupInfrastructure, Boolean>> stopped = null; GridCacheContext<?, ?> stopCtx = null; boolean destroy = false; @@ -2102,7 +2101,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (stopped == null) stopped = new ArrayList<>(); - stopped.add(F.<GridCacheContext, Boolean>t(stopCtx, destroy)); + stopped.add(F.t(stopCtx.group(), destroy)); } } @@ -2123,7 +2122,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { jCacheProxies.putIfAbsent(cacheName, new IgniteCacheProxy(cache.context(), cache, null, false)); } else { - if (req.restart()) + if (req.request().restart()) proxy.restart(); proxy.context().gate().onStopped(); @@ -2131,9 +2130,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { sharedCtx.database().checkpointReadLock(); try { - stopCtx = prepareCacheStop(req); + stopCtx = prepareCacheStop(req.request()); - destroy = req.destroy(); + destroy = req.request().destroy(); if (stopCtx != null && !stopCtx.group().hasCaches()) stopCacheGroup(stopCtx.groupId()); @@ -2145,16 +2144,17 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } + // TODO IGNITE-5075 group descriptors. if (stopCtx != null) { if (stopped == null) stopped = new ArrayList<>(); - stopped.add(F.<GridCacheContext, Boolean>t(stopCtx, destroy)); + stopped.add(F.t(stopCtx.group(), destroy)); } } if (stopped != null && !sharedCtx.kernalContext().clientNode()) - sharedCtx.database().onCachesStopped(stopped); + sharedCtx.database().onCacheGroupsStopped(stopped); } } @@ -2668,17 +2668,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (!ctx.config().isDaemon() && sharedCtx.pageStore() != null && sharedCtx.database().persistenceEnabled()) { - Set<String> savedCacheNames = sharedCtx.pageStore().savedCacheNames(); + Map<String, CacheConfiguration> savedCaches = sharedCtx.pageStore().readCacheConfigurations(); - for (String name : savedCacheNames) { - CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name); - - if (cfg != null) - reqs.add(createRequest(cfg, false)); - } + for (CacheConfiguration cfg : savedCaches.values()) + reqs.add(createRequest(cfg, false)); for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) { - if (!savedCacheNames.contains(cfg.getName())) + if (!savedCaches.containsKey(cfg.getName())) reqs.add(createRequest(cfg, true)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java index 0e3869c..e7e6aec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java @@ -97,7 +97,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { /** * @return {@code True} if eager ttl is enabled for cache. */ - boolean eagerTtlEnabled() { + public boolean eagerTtlEnabled() { return eagerTtlEnabled; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 7ea1f9a..84b5e85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -182,11 +182,13 @@ public interface IgniteCacheOffheapManager { ) throws IgniteCheckedException; /** + * @param cctx Cache context. * @param key Key. * @param part Partition. * @throws IgniteCheckedException If failed. */ public void updateIndexes( + GridCacheContext cctx, KeyCacheObject key, GridDhtLocalPartition part ) throws IgniteCheckedException; @@ -331,6 +333,12 @@ public interface IgniteCacheOffheapManager { public long cacheEntriesCount(int cacheId); /** + * @param part Partition. + * @return Number of entries. + */ + public int totalPartitionEntriesCount(int part); + + /** * */ interface OffheapInvokeClosure extends IgniteTree.InvokeClosure<CacheDataRow> { @@ -427,10 +435,11 @@ public interface IgniteCacheOffheapManager { @Nullable CacheDataRow oldRow) throws IgniteCheckedException; /** + * @param cctx Cache context. * @param key Key. * @throws IgniteCheckedException If failed. */ - void updateIndexes(KeyCacheObject key) throws IgniteCheckedException; + void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException; /** * @param cctx Cache context. http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index db08801..086d65d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -96,9 +96,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager private CacheDataStore locCacheDataStore; /** */ - private boolean indexingEnabled; - - /** */ protected final ConcurrentMap<Integer, CacheDataStore> partDataStores = new ConcurrentHashMap<>(); /** */ @@ -130,8 +127,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager this.grp = grp; this.log = ctx.logger(getClass()); - indexingEnabled = QueryUtils.isEnabled(cctx.config()); - updateValSizeThreshold = ctx.database().pageSize() / 2; if (grp.affinityNode()) { @@ -176,7 +171,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ @Override public void stopCache(int cacheId, final boolean destroy) { if (destroy && grp.affinityNode()) - destroyCacheDataStructures(cacheId, destroy); + removeCacheData(cacheId); } /** {@inheritDoc} */ @@ -201,7 +196,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** * */ - protected void destroyCacheDataStructures(int cacheId, boolean destroy) { + private void removeCacheData(int cacheId) { assert grp.affinityNode(); try { @@ -253,6 +248,17 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager return size; } + /** {@inheritDoc} */ + @Override public int totalPartitionEntriesCount(int p) { + if (grp.isLocal()) + return locCacheDataStore.fullSize(); + else { + GridDhtLocalPartition part = grp.topology().localPartition(p, AffinityTopologyVersion.NONE, false, true); + + return part != null ? part.dataStore().fullSize() : 0; + } + } + /** * @param p Partition. * @return Partition data. @@ -359,8 +365,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public void updateIndexes(KeyCacheObject key, GridDhtLocalPartition part) throws IgniteCheckedException { - dataStore(part).updateIndexes(key); + @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key, GridDhtLocalPartition part) + throws IgniteCheckedException { + dataStore(part).updateIndexes(cctx, key); } /** {@inheritDoc} */ @@ -1261,23 +1268,25 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public void updateIndexes(KeyCacheObject key) throws IgniteCheckedException { - if (indexingEnabled) { - CacheDataRow row = dataTree.findOne(new SearchRow(key)); + @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { + int cacheId = grp.sharedGroup() ? cctx.cacheId() : UNDEFINED_CACHE_ID; + + CacheDataRow row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY); + + if (row != null) { + row.key(key); GridCacheQueryManager qryMgr = cctx.queries(); - if (row != null) { - qryMgr.store( - key, - partId, - null, - null, - row.value(), - row.version(), - row.expireTime(), - row.link()); - } + qryMgr.store( + key, + partId, + null, + null, + row.value(), + row.version(), + row.expireTime(), + row.link()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java index aef39a7..75909d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java @@ -131,11 +131,14 @@ public class CacheDataRowAdapter implements CacheDataRow { do { final long pageId = pageId(nextLink); - // TODO IGNITE-5075 Here must be "physical" cache ID (aka group ID) - final long page = pageMem.acquirePage(cacheId, pageId); + assert grp != null || !sharedCtx.database().persistenceEnabled(); + + int grpId = grp != null ? grp.groupId() : 0; + + final long page = pageMem.acquirePage(grpId, pageId); try { - long pageAddr = pageMem.readLock(cacheId, pageId, page); // Non-empty data page must not be recycled. + long pageAddr = pageMem.readLock(grpId, pageId, page); // Non-empty data page must not be recycled. assert pageAddr != 0L : nextLink; @@ -172,11 +175,11 @@ public class CacheDataRowAdapter implements CacheDataRow { return; } finally { - pageMem.readUnlock(cacheId, pageId, page); + pageMem.readUnlock(grpId, pageId, page); } } finally { - pageMem.releasePage(cacheId, pageId, page); + pageMem.releasePage(grpId, pageId, page); } } while(nextLink != 0); http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java index 9049a81..4cad942 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider; import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl; +import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; @@ -647,9 +648,9 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap } /** - * @param stoppedCtxs A collection of tuples (cache context, destroy flag). + * @param stoppedGrps A collection of tuples (cache group, destroy flag). */ - public void onCachesStopped(Collection<IgniteBiTuple<GridCacheContext, Boolean>> stoppedCtxs) { + public void onCacheGroupsStopped(Collection<IgniteBiTuple<CacheGroupInfrastructure, Boolean>> stoppedGrps) { // No-op. } @@ -685,12 +686,12 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap /** * Reserve update history for preloading. - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param partId Partition Id. * @param cntr Update counter. * @return True if successfully reserved. */ - public boolean reserveHistoryForPreloading(int cacheId, int partId, long cntr) { + public boolean reserveHistoryForPreloading(int grpId, int partId, long cntr) { return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheSnapshotManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheSnapshotManager.java index 5b87cf7..91957db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheSnapshotManager.java @@ -64,49 +64,49 @@ public class IgniteCacheSnapshotManager extends GridCacheSharedManagerAdapter { * */ public void restoreState() throws IgniteCheckedException { - + // No-op. } /** * */ public void onCheckPointBegin() { - + // No-op. } /** * */ public void beforeCheckpointPageWritten() { - + // No-op. } /** * */ public void afterCheckpointPageWritten() { - + // No-op. } /** * @param fullId Full id. */ public void beforePageWrite(FullPageId fullId) { - + // No-op. } /** * @param fullId Full id. */ public void onPageWrite(FullPageId fullId, ByteBuffer tmpWriteBuf) { - + // No-op. } /** * @param cctx Cctx. */ public void onCacheStop(GridCacheContext cctx) { - + // No-op. } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetaStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetaStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetaStore.java index 91fed4c..c21b818 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetaStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetaStore.java @@ -29,7 +29,7 @@ public interface MetaStore { * @param idxName Index name. * @return {@link RootPage} that keeps pageId, allocated flag that shows whether the page * was newly allocated, and rootId that is counter which increments each time new page allocated. - * @throws IgniteCheckedException + * @throws IgniteCheckedException If failed. */ public RootPage getOrAllocateForTree(String idxName) throws IgniteCheckedException; @@ -38,14 +38,14 @@ public interface MetaStore { * * @param idxName Index name. * @return Root ID or -1 if no page was removed. - * @throws IgniteCheckedException + * @throws IgniteCheckedException If failed. */ public RootPage dropRootPage(String idxName) throws IgniteCheckedException; /** * Destroy this meta store. * - * @throws IgniteCheckedException + * @throws IgniteCheckedException If failed. */ public void destroy() throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java index 45ddeb4..b6868dc0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java @@ -21,6 +21,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.database.freelist.FreeList; /** @@ -31,11 +32,16 @@ public class RowStore { private final FreeList freeList; /** */ + private final GridCacheSharedContext ctx; + + /** */ protected final PageMemory pageMem; /** */ protected final CacheObjectContext coctx; + + /** * @param grp Cache group. * @param freeList Free list. @@ -46,6 +52,7 @@ public class RowStore { this.freeList = freeList; + ctx = grp.shared(); coctx = grp.cacheObjectContext(); pageMem = grp.memoryPolicy().pageMemory(); } @@ -56,13 +63,13 @@ public class RowStore { */ public void removeRow(long link) throws IgniteCheckedException { assert link != 0; - cctx.shared().database().checkpointReadLock(); + ctx.database().checkpointReadLock(); try { freeList.removeDataRowByLink(link); } finally { - cctx.shared().database().checkpointReadUnlock(); + ctx.database().checkpointReadUnlock(); } } @@ -71,13 +78,13 @@ public class RowStore { * @throws IgniteCheckedException If failed. */ public void addRow(CacheDataRow row) throws IgniteCheckedException { - cctx.shared().database().checkpointReadLock(); + ctx.database().checkpointReadLock(); try { freeList.insertDataRow(row); } finally { - cctx.shared().database().checkpointReadUnlock(); + ctx.database().checkpointReadUnlock(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index f006c83..9c0e5c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -71,7 +71,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * Partition topology. */ @GridToStringExclude -publicclass GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { +public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** If true, then check consistency. */ private static final boolean CONSISTENCY_CHECK = false; @@ -152,7 +152,7 @@ publicclass GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { locParts = new AtomicReferenceArray<>(grp.affinityFunction().partitions()); - part2node = new HashMap<>(cctx.config().getAffinity().partitions(), 1.0f); + part2node = new HashMap<>(grp.affinityFunction().partitions(), 1.0f); } /** {@inheritDoc} */ @@ -1571,7 +1571,7 @@ publicclass GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { locPart.reload(true); - result.add(cctx.localNodeId()); + result.add(ctx.localNodeId()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 485baee..2514130 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -298,17 +298,18 @@ public class GridDhtPartitionDemander { fut.sendRebalanceStartedEvent(); - final boolean statsEnabled = cctx.config().isStatisticsEnabled(); - - if (statsEnabled) { - cctx.cache().metrics0().clearRebalanceCounters(); - - rebalanceFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> fut) { - cctx.cache().metrics0().clearRebalanceCounters(); - } - }); - } +// TODO 5075. +// final boolean statsEnabled = cctx.config().isStatisticsEnabled(); +// +// if (statsEnabled) { +// cctx.cache().metrics0().clearRebalanceCounters(); +// +// rebalanceFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { +// @Override public void apply(IgniteInternalFuture<Boolean> fut) { +// cctx.cache().metrics0().clearRebalanceCounters(); +// } +// }); +// } if (assigns.cancelled()) { // Pending exchange. if (log.isDebugEnabled()) @@ -608,14 +609,15 @@ public class GridDhtPartitionDemander { final GridDhtPartitionTopology top = grp.topology(); - final boolean statsEnabled = cctx.config().isStatisticsEnabled(); - - if (statsEnabled) { - if (supply.estimatedKeysCount() != -1) - cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supply.estimatedKeysCount()); - - cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize()); - } +// TODO 5075. +// final boolean statsEnabled = cctx.config().isStatisticsEnabled(); +// +// if (statsEnabled) { +// if (supply.estimatedKeysCount() != -1) +// cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supply.estimatedKeysCount()); +// +// cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize()); +// } try { AffinityAssignment aff = grp.affinity().cachedAffinity(topVer); @@ -659,8 +661,9 @@ public class GridDhtPartitionDemander { break; } - if (statsEnabled) - cctx.cache().metrics0().onRebalanceKeyReceived(); +// TODO 5075. +// if (statsEnabled) +// cctx.cache().metrics0().onRebalanceKeyReceived(); } // If message was last for this partition, http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 0c907f5..a7ae3c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -256,7 +256,7 @@ class GridDhtPartitionSupplier { if (loc == null || loc.state() != OWNING) continue; - keysCnt += cctx.offheap().entriesCount(part); + keysCnt += grp.offheap().totalPartitionEntriesCount(part); } s.estimatedKeysCount(keysCnt); @@ -304,12 +304,12 @@ class GridDhtPartitionSupplier { d.isHistorical(part) ? d.partitionCounter(part) : null); if (!iter.historical()) { - assert !cctx.shared().database().persistenceEnabled() || !d.isHistorical(part); + assert !grp.shared().database().persistenceEnabled() || !d.isHistorical(part); s.clean(part); } else - assert cctx.shared().database().persistenceEnabled() && d.isHistorical(part); + assert grp.shared().database().persistenceEnabled() && d.isHistorical(part); } else iter = (IgniteRebalanceIterator)sctx.entryIt; http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index 9f66491..92c462b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@ -283,46 +283,47 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple writer.incrementState(); case 4: - if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG)) + if (!writer.writeLong("estimatedKeysCnt", estimatedKeysCnt)) return false; writer.incrementState(); case 5: - if (!writer.writeCollection("last", last, MessageCollectionItemType.INT)) + if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 6: - if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT)) + if (!writer.writeCollection("last", last, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 7: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 8: - if (!writer.writeLong("updateSeq", updateSeq)) + if (!writer.writeInt("msgSize", msgSize)) return false; writer.incrementState(); case 9: - if (!writer.writeLong("estimatedKeysCnt", estimatedKeysCnt)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 10: - if (!writer.writeInt("msgSize", msgSize)) + if (!writer.writeLong("updateSeq", updateSeq)) return false; writer.incrementState(); + } return true; @@ -348,7 +349,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple reader.incrementState(); case 4: - infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false); + estimatedKeysCnt = reader.readLong("estimatedKeysCnt"); if (!reader.isLastRead()) return false; @@ -356,7 +357,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple reader.incrementState(); case 5: - last = reader.readCollection("last", MessageCollectionItemType.INT); + infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false); if (!reader.isLastRead()) return false; @@ -364,7 +365,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple reader.incrementState(); case 6: - missed = reader.readCollection("missed", MessageCollectionItemType.INT); + last = reader.readCollection("last", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -372,7 +373,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple reader.incrementState(); case 7: - topVer = reader.readMessage("topVer"); + missed = reader.readCollection("missed", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -380,7 +381,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple reader.incrementState(); case 8: - updateSeq = reader.readLong("updateSeq"); + msgSize = reader.readInt("msgSize"); if (!reader.isLastRead()) return false; @@ -388,7 +389,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple reader.incrementState(); case 9: - estimatedKeysCnt = reader.readLong("estimatedKeysCnt"); + topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) return false; @@ -396,12 +397,13 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple reader.incrementState(); case 10: - msgSize = reader.readInt("msgSize"); + updateSeq = reader.readLong("updateSeq"); if (!reader.isLastRead()) return false; reader.incrementState(); + } return reader.afterMessageRead(GridDhtPartitionSupplyMessage.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 65edd96..80c4dbe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -238,6 +238,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** */ private volatile IgniteDhtPartitionsToReloadMap partsToReload = new IgniteDhtPartitionsToReloadMap(); + /** */ private final AtomicBoolean done = new AtomicBoolean(); /** @@ -1713,7 +1714,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved; - Map<Integer, Long> localReserved = partHistReserved0 != null ? partHistReserved0.get(top.cacheId()) : null; + Map<Integer, Long> localReserved = partHistReserved0 != null ? partHistReserved0.get(top.groupId()) : null; Set<Integer> haveHistory = new HashSet<>(); @@ -1734,7 +1735,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (localCntr != null && localCntr <= minCntr && maxCntrObj.nodes.contains(cctx.localNodeId())) { - partHistSuppliers.put(cctx.localNodeId(), top.cacheId(), p, minCntr); + partHistSuppliers.put(cctx.localNodeId(), top.groupId(), p, minCntr); haveHistory.add(p); @@ -1743,10 +1744,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e0 : msgs.entrySet()) { - Long histCntr = e0.getValue().partitionHistoryCounters(top.cacheId()).get(p); + Long histCntr = e0.getValue().partitionHistoryCounters(top.groupId()).get(p); if (histCntr != null && histCntr <= minCntr && maxCntrObj.nodes.contains(e0.getKey())) { - partHistSuppliers.put(e0.getKey(), top.cacheId(), p, minCntr); + partHistSuppliers.put(e0.getKey(), top.groupId(), p, minCntr); haveHistory.add(p); @@ -1767,7 +1768,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT Set<UUID> nodesToReload = top.setOwners(p, e.getValue().nodes, haveHistory.contains(p), entryLeft == 0); for (UUID nodeId : nodesToReload) - partsToReload.put(nodeId, top.cacheId(), p); + partsToReload.put(nodeId, top.groupId(), p); } } @@ -2036,7 +2037,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (grp != null) grp.topology().update(this, entry.getValue(), cntrMap, - msg.partsToReload(cctx.localNodeId(), cacheId)); + msg.partsToReload(cctx.localNodeId(), grpId)); else { ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index b64a58c..2f8a531 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -213,11 +213,11 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa return partHistSuppliers; } - public Set<Integer> partsToReload(UUID nodeId, int cacheId) { + public Set<Integer> partsToReload(UUID nodeId, int grpId) { if (partsToReload == null) return Collections.emptySet(); - return partsToReload.get(nodeId, cacheId); + return partsToReload.get(nodeId, grpId); } /** @@ -432,13 +432,13 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa writer.incrementState(); - case 11: + case 10: if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes)) return false; writer.incrementState(); - case 12: + case 11: if (!writer.writeMessage("topVer", topVer)) return false; @@ -500,7 +500,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); - case 11: + case 10: partsToReloadBytes = reader.readByteArray("partsToReloadBytes"); if (!reader.isLastRead()) @@ -508,7 +508,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); - case 12: + case 11: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -528,7 +528,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 10; + return 12; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 9e399f1..1e5ea14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -172,35 +172,34 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param cntrMap Partition history counters. */ - public void partitionHistoryCounters(int cacheId, Map<Integer, Long> cntrMap) { + public void partitionHistoryCounters(int grpId, Map<Integer, Long> cntrMap) { if (cntrMap.isEmpty()) return; if (partHistCntrs == null) partHistCntrs = new HashMap<>(); - partHistCntrs.put(cacheId, cntrMap); + partHistCntrs.put(grpId, cntrMap); } /** * @param cntrMap Partition history counters. */ - public void partitionHistoryCounters(Map<Integer, Map<Integer, Long>> cntrMap) { - for (Map.Entry<Integer, Map<Integer, Long>> e : cntrMap.entrySet()) { + void partitionHistoryCounters(Map<Integer, Map<Integer, Long>> cntrMap) { + for (Map.Entry<Integer, Map<Integer, Long>> e : cntrMap.entrySet()) partitionHistoryCounters(e.getKey(), e.getValue()); - } } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @return Partition history counters. */ - public Map<Integer, Long> partitionHistoryCounters(int cacheId) { + Map<Integer, Long> partitionHistoryCounters(int grpId) { if (partHistCntrs != null) { - Map<Integer, Long> res = partHistCntrs.get(cacheId); + Map<Integer, Long> res = partHistCntrs.get(grpId); return res != null ? res : Collections.<Integer, Long>emptyMap(); } @@ -386,7 +385,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes writer.incrementState(); - case 11: + case 10: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; @@ -448,7 +447,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); - case 11: + case 10: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -468,7 +467,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 10; + return 11; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 5e84b3e..c700ab0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -26,9 +26,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; @@ -50,6 +52,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.GPC; import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; @@ -219,11 +222,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { ClusterNode histSupplier = null; - if (cctx.shared().database().persistenceEnabled()) { - UUID nodeId = exchFut.partitionHistorySupplier(cctx.cacheId(), p); + if (ctx.database().persistenceEnabled()) { + UUID nodeId = exchFut.partitionHistorySupplier(grp.groupId(), p); if (nodeId != null) - histSupplier = cctx.discovery().node(nodeId); + histSupplier = ctx.discovery().node(nodeId); } if (histSupplier != null) { @@ -234,7 +237,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { continue; // For. } - assert cctx.shared().database().persistenceEnabled(); + assert ctx.database().persistenceEnabled(); assert remoteOwners(p, topVer).contains(histSupplier) : remoteOwners(p, topVer); GridDhtPartitionDemandMessage msg = assigns.get(histSupplier); @@ -243,13 +246,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { assigns.put(histSupplier, msg = new GridDhtPartitionDemandMessage( top.updateSequence(), exchFut.exchangeId().topologyVersion(), - cctx.cacheId())); + grp.groupId())); } msg.addPartition(p, true); } else { - if (cctx.shared().database().persistenceEnabled()) { + if (ctx.database().persistenceEnabled()) { if (part.state() == RENTING || part.state() == EVICTED) { try { part.rent(false).get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 5dd3000..a7fcd31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -452,16 +452,17 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { sharedCtx.database().initDataBase(); + // TODO IGNITE-5075 group descriptors. for (CacheConfiguration cfg : cfgs) { if (CU.isSystemCache(cfg.getName())) if (pageStore != null) - pageStore.initializeForCache(cfg); + pageStore.initializeForCache(ctx.cache().cacheDescriptors().get(cfg.getName()).groupDescriptor(), cfg); } for (CacheConfiguration cfg : cfgs) { if (!CU.isSystemCache(cfg.getName())) if (pageStore != null) - pageStore.initializeForCache(cfg); + pageStore.initializeForCache(ctx.cache().cacheDescriptors().get(cfg.getName()).groupDescriptor(), cfg); } sharedCtx.database().onActivate(ctx); http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java index 1a821a1..ff82123 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java @@ -100,6 +100,8 @@ import org.apache.ignite.testframework.config.GridTestProperties; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static org.springframework.util.FileSystemUtils.deleteRecursively; + /** * Utility class for tests. */ @@ -1874,4 +1876,11 @@ public final class GridTestUtils { return b.toString(); } + + /** + * @throws Exception If failed. + */ + public static void deleteDbFiles() throws Exception { + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 5f16fdd..81ab7a7 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -2129,7 +2129,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId); - IgniteCacheOffheapManager offheapMgr = cctx.isNear() ? cctx.near().dht().context().offheap() : cctx.offheap(); + IgniteCacheOffheapManager offheapMgr = cctx.group().offheap(); for (int p = 0; p < cctx.affinity().partitions(); p++) { try (GridCloseableIterator<KeyCacheObject> keyIter = offheapMgr.keysIterator(p)) {
