Repository: ignite Updated Branches: refs/heads/ignite-5075 [created] e6927e577
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e6927e57 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e6927e57 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e6927e57 Branch: refs/heads/ignite-5075 Commit: e6927e577602fbaf5aa6126fb3214611514c15ce Parents: 1dc9e69 Author: sboikov <[email protected]> Authored: Tue Apr 25 18:07:22 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Apr 25 18:07:22 2017 +0300 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 12 ++ .../org/apache/ignite/internal/IgnitionEx.java | 6 +- .../affinity/GridAffinityAssignmentCache.java | 39 ++-- .../cache/CacheAffinitySharedManager.java | 208 ++++++++++--------- .../cache/CacheGroupInfrastructure.java | 132 ++++++++++++ .../cache/DynamicCacheChangeRequest.java | 11 + .../cache/DynamicCacheDescriptor.java | 9 + .../cache/GridCacheAffinityManager.java | 25 +-- .../processors/cache/GridCacheContext.java | 13 +- .../processors/cache/GridCacheProcessor.java | 133 +++++++++--- .../dht/preloader/GridDhtPreloader.java | 2 +- .../cluster/GridClusterStateProcessor.java | 3 +- .../datastructures/DataStructuresProcessor.java | 4 + .../service/GridServiceProcessor.java | 4 + .../processors/cache/IgniteCacheGroupsTest.java | 157 ++++++++++++++ .../loadtests/hashmap/GridCacheTestContext.java | 1 + 16 files changed, 589 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e6927e57/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index d378343..a92cb8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -213,6 +213,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Cache name. */ private String name; + /** */ + private String grpName; + /** Name of {@link MemoryPolicyConfiguration} for this cache */ private String memPlcName; @@ -421,6 +424,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { evictFilter = cc.getEvictionFilter(); evictPlc = cc.getEvictionPolicy(); expiryPolicyFactory = cc.getExpiryPolicyFactory(); + grpName = cc.getGroupName(); indexedTypes = cc.getIndexedTypes(); interceptor = cc.getInterceptor(); invalidate = cc.isInvalidate(); @@ -468,6 +472,14 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { writeSync = cc.getWriteSynchronizationMode(); } + public String getGroupName() { + return grpName; + } + + public void setGroupName(String groupName) { + this.grpName = groupName; + } + /** * Cache name or {@code null} if not provided, then this will be considered a default * cache which can be accessed via {@link Ignite#cache(String)} method. Otherwise, if name http://git-wip-us.apache.org/repos/asf/ignite/blob/e6927e57/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 4b34891..65570ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -2196,12 +2196,14 @@ public class IgnitionEx { public void initializeDefaultCacheConfiguration(IgniteConfiguration cfg) throws IgniteCheckedException { List<CacheConfiguration> cacheCfgs = new ArrayList<>(); - cacheCfgs.add(utilitySystemCache()); + // TODO IGNITE-5075. + //cacheCfgs.add(utilitySystemCache()); if (IgniteComponentType.HADOOP.inClassPath()) cacheCfgs.add(CU.hadoopSystemCache()); - cacheCfgs.add(atomicsSystemCache(cfg.getAtomicConfiguration())); + // TODO IGNITE-5075. + //cacheCfgs.add(atomicsSystemCache(cfg.getAtomicConfiguration())); CacheConfiguration[] userCaches = cfg.getCacheConfiguration(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e6927e57/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 2399493..7867e52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -60,11 +60,11 @@ public class GridAffinityAssignmentCache { /** Cleanup history size. */ private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 500); - /** Cache name. */ - private final String cacheName; + /** Group name. */ + private final String grpName; /** */ - private final int cacheId; + private final int grpId; /** Number of backups. */ private final int backups; @@ -115,7 +115,8 @@ public class GridAffinityAssignmentCache { * Constructs affinity cached calculations. * * @param ctx Kernal context. - * @param cacheName Cache name. + * @param grpName Cache group name. + * @param grpId Group ID. * @param aff Affinity function. * @param nodeFilter Node filter. * @param backups Number of backups. @@ -123,7 +124,8 @@ public class GridAffinityAssignmentCache { */ @SuppressWarnings("unchecked") public GridAffinityAssignmentCache(GridKernalContext ctx, - String cacheName, + String grpName, + int grpId, AffinityFunction aff, IgnitePredicate<ClusterNode> nodeFilter, int backups, @@ -132,16 +134,16 @@ public class GridAffinityAssignmentCache { assert ctx != null; assert aff != null; assert nodeFilter != null; + assert grpId != 0; this.ctx = ctx; this.aff = aff; this.nodeFilter = nodeFilter; - this.cacheName = cacheName; + this.grpName = grpName; + this.grpId = grpId; this.backups = backups; this.locCache = locCache; - cacheId = CU.cacheId(cacheName); - log = ctx.log(GridAffinityAssignmentCache.class); partsCnt = aff.partitions(); @@ -160,18 +162,12 @@ public class GridAffinityAssignmentCache { return similarAffKey; } - /** - * @return Cache name. - */ - public String cacheName() { - return cacheName; + public String groupName() { + return grpName; } - /** - * @return Cache ID. - */ - public int cacheId() { - return cacheId; + public int groupId() { + return 0; } /** @@ -432,7 +428,7 @@ public class GridAffinityAssignmentCache { */ public void dumpDebugInfo() { if (!readyFuts.isEmpty()) { - U.warn(log, "Pending affinity ready futures [cache=" + cacheName + ", lastVer=" + lastVersion() + "]:"); + U.warn(log, "Pending affinity ready futures [grp=" + grpName + ", lastVer=" + lastVersion() + "]:"); for (AffinityReadyFuture fut : readyFuts.values()) U.warn(log, ">>> " + fut); @@ -461,7 +457,7 @@ public class GridAffinityAssignmentCache { if (cache == null) { throw new IllegalStateException("Getting affinity for topology version earlier than affinity is " + "calculated [locNode=" + ctx.discovery().localNode() + - ", cache=" + cacheName + + ", group=" + grpName + ", topVer=" + topVer + ", head=" + head.get().topologyVersion() + ", history=" + affCache.keySet() + @@ -590,9 +586,6 @@ public class GridAffinityAssignmentCache { */ private class AffinityReadyFuture extends GridFutureAdapter<AffinityTopologyVersion> { /** */ - private static final long serialVersionUID = 0L; - - /** */ private AffinityTopologyVersion reqTopVer; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e6927e57/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 0958208..b221e6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -72,7 +72,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private boolean lateAffAssign; /** Affinity information for all started caches (initialized on coordinator). */ - private ConcurrentMap<Integer, CacheHolder> caches = new ConcurrentHashMap<>(); + private ConcurrentMap<Integer, CacheGroupHolder> cacheGrps = new ConcurrentHashMap<>(); /** Last topology version when affinity was calculated (updated from exchange thread). */ private AffinityTopologyVersion affCalcVer; @@ -154,7 +154,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (msg.exchangeId() != null) { if (log.isDebugEnabled()) { - log.debug("Need process affinity change message [lastAffVer=" + lastAffVer + + log.debug("Ignore affinity change message [lastAffVer=" + lastAffVer + ", msgExchId=" + msg.exchangeId() + ", msgVer=" + msg.topologyVersion() + ']'); } @@ -234,7 +234,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap boolean rebalanced = true; if (partWait != null) { - CacheHolder cache = caches.get(checkCacheId); + CacheGroupHolder cache = cacheGrps.get(checkCacheId); if (cache != null) { for (Iterator<Map.Entry<Integer, UUID>> it = partWait.entrySet().iterator(); it.hasNext(); ) { @@ -308,14 +308,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap public void onCacheCreated(GridCacheContext cctx) { final Integer cacheId = cctx.cacheId(); - if (!caches.containsKey(cctx.cacheId())) { - cctx.io().addHandler(cacheId, GridDhtAffinityAssignmentResponse.class, - new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { - @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { - processAffinityAssignmentResponse(cacheId, nodeId, res); - } - }); - } + // TODO IGNITE-5075: move to group initialization? +// if (!caches.containsKey(cctx.cacheId())) { +// cctx.io().addHandler(cacheId, GridDhtAffinityAssignmentResponse.class, +// new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>() { +// @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentResponse res) { +// processAffinityAssignmentResponse(cacheId, nodeId, res); +// } +// }); +// } } /** @@ -345,6 +346,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cctx.kernalContext(), req.startCacheConfiguration(), req.cacheType(), + req.cacheDescriptor().groupId(), false, req.deploymentId(), req.schema()); @@ -358,7 +360,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap boolean clientOnly = true; // Affinity did not change for existing caches. - forAllCaches(crd && lateAffAssign, new IgniteInClosureX<GridAffinityAssignmentCache>() { + forAllCacheGroups(crd && lateAffAssign, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { if (fut.stopping(aff.cacheId())) return; @@ -407,7 +409,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } } else - initStartedCacheOnCoordinator(fut, cacheId); + initStartedCacheOnCoordinator(fut, req.cacheDescriptor()); } else if (req.stop() || req.close()) { cctx.cache().blockGateway(req); @@ -424,28 +426,29 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap rmvCache = true; if (rmvCache) { - CacheHolder cache = caches.remove(cacheId); - - if (cache != null) { - if (!req.stop()) { - assert !cache.client(); - - cache = CacheHolder2.create(cctx, - cctx.cache().cacheDescriptor(cacheId), - fut, - cache.affinity()); - - caches.put(cacheId, cache); - } - else { - if (stoppedCaches == null) - stoppedCaches = new HashSet<>(); - - stoppedCaches.add(cache.cacheId()); - - cctx.io().removeHandler(cacheId, GridDhtAffinityAssignmentResponse.class); - } - } + // TODO IGNITE-5075. +// CacheHolder cache = caches.remove(cacheId); +// +// if (cache != null) { +// if (!req.stop()) { +// assert !cache.client(); +// +// cache = CacheHolder2.create(cctx, +// cctx.cache().cacheDescriptor(cacheId), +// fut, +// cache.affinity()); +// +// caches.put(cacheId, cache); +// } +// else { +// if (stoppedCaches == null) +// stoppedCaches = new HashSet<>(); +// +// stoppedCaches.add(cache.cacheId()); +// +// cctx.io().removeHandler(cacheId, GridDhtAffinityAssignmentResponse.class); +// } +// } } } } @@ -486,7 +489,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * */ public void removeAllCacheInfo(){ - caches.clear(); + cacheGrps.clear(); registeredCaches.clear(); } @@ -516,7 +519,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); - forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { + forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { List<List<ClusterNode>> idealAssignment = aff.idealAssignment(); @@ -637,7 +640,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (lateAffAssign) { if (!locJoin) { - forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { + forAllCacheGroups(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { AffinityTopologyVersion topVer = fut.topologyVersion(); @@ -650,7 +653,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } else { if (!locJoin) { - forAllCaches(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { + forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { AffinityTopologyVersion topVer = fut.topologyVersion(); @@ -719,56 +722,58 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param crd Coordinator flag. * @param c Closure. */ - private void forAllCaches(boolean crd, IgniteInClosureX<GridAffinityAssignmentCache> c) { + private void forAllCacheGroups(boolean crd, IgniteInClosureX<GridAffinityAssignmentCache> c) { if (crd) { - for (CacheHolder cache : caches.values()) + for (CacheGroupHolder cache : cacheGrps.values()) c.apply(cache.affinity()); } else { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupInfrastructure grp : cctx.kernalContext().cache().cacheGroups()) { + if (grp.isLocal()) continue; - c.apply(cacheCtx.affinity().affinityCache()); + c.apply(grp.affinity()); } } } /** * @param fut Exchange future. - * @param cacheId Cache ID. + * @param desc Cache descriptor. * @throws IgniteCheckedException If failed. */ - private void initStartedCacheOnCoordinator(GridDhtPartitionsExchangeFuture fut, final Integer cacheId) + private void initStartedCacheOnCoordinator(GridDhtPartitionsExchangeFuture fut, final DynamicCacheDescriptor desc) throws IgniteCheckedException { - CacheHolder cache = caches.get(cacheId); + assert desc != null && desc.groupId() != 0 : desc; - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + int grpId = desc.groupId(); - if (cache == null) { - DynamicCacheDescriptor desc = cctx.cache().cacheDescriptor(cacheId); + CacheGroupHolder grpHolder = cacheGrps.get(desc.groupId()); - assert desc != null : cacheId; + CacheGroupInfrastructure grp = cctx.kernalContext().cache().cacheGroup(grpId); + if (grpHolder == null) { if (desc.cacheConfiguration().getCacheMode() == LOCAL) return; - cache = cacheCtx != null ? new CacheHolder1(cacheCtx, null) : CacheHolder2.create(cctx, desc, fut, null); + grpHolder = grp != null ? new CacheGroupHolder1(grp, null) : CacheGroupHolder2.create(cctx, desc, fut, null); - CacheHolder old = caches.put(cacheId, cache); + CacheGroupHolder old = cacheGrps.put(grpId, grpHolder); assert old == null : old; - List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + List<List<ClusterNode>> newAff = grpHolder.affinity().calculate(fut.topologyVersion(), + fut.discoveryEvent(), + fut.discoCache()); - cache.affinity().initialize(fut.topologyVersion(), newAff); + grpHolder.affinity().initialize(fut.topologyVersion(), newAff); } - else if (cache.client() && cacheCtx != null) { - assert cache.affinity().idealAssignment() != null; + else if (grpHolder.client() && grp != null) { + assert grp.affinity().idealAssignment() != null; - cache = new CacheHolder1(cacheCtx, cache.affinity()); + grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity()); - caches.put(cacheId, cache); + cacheGrps.put(grpId, grpHolder); } } @@ -793,7 +798,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (crd && lateAffAssign) { forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() { @Override public void applyx(DynamicCacheDescriptor desc) throws IgniteCheckedException { - CacheHolder cache = cache(fut, desc); + CacheGroupHolder cache = cache(fut, desc); if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) { List<List<ClusterNode>> assignment = @@ -805,7 +810,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap }); } else { - forAllCaches(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { + forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { if (aff.lastVersion().equals(AffinityTopologyVersion.NONE)) initAffinity(aff, fut, false); @@ -883,7 +888,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException { AffinityTopologyVersion topVer = fut.topologyVersion(); - CacheHolder cache = cache(fut, cacheDesc); + CacheGroupHolder cache = cache(fut, cacheDesc); List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent(), fut.discoCache()); @@ -1271,7 +1276,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap AffinityTopologyVersion affTopVer = aff.lastVersion(); - assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [cache=" + aff.cacheName() + + assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" + aff.groupId() + ", topVer=" + affTopVer + ", node=" + cctx.localNodeId() + ']'; List<List<ClusterNode>> curAff = aff.assignments(affTopVer); @@ -1578,7 +1583,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * */ - abstract static class CacheHolder { + abstract static class CacheGroupHolder { /** */ private final GridAffinityAssignmentCache aff; @@ -1590,7 +1595,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param aff Affinity cache. * @param initAff Existing affinity cache. */ - CacheHolder(boolean rebalanceEnabled, + CacheGroupHolder(boolean rebalanceEnabled, GridAffinityAssignmentCache aff, @Nullable GridAffinityAssignmentCache initAff) { this.aff = aff; @@ -1606,13 +1611,17 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap */ abstract boolean client(); - /** - * @return Cache ID. - */ - int cacheId() { - return aff.cacheId(); + int groupId() { + return aff.groupId(); } +// /** +// * @return Cache ID. +// */ +// int cacheId() { +// return aff.cacheId(); +// } + /** * @return Partitions number. */ @@ -1620,12 +1629,12 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap return aff.partitions(); } - /** - * @return Cache name. - */ - String name() { - return aff.cacheName(); - } +// /** +// * @return Cache name. +// */ +// String name() { +// return aff.cacheName(); +// } /** * @param fut Exchange future. @@ -1644,20 +1653,20 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * Created cache is started on coordinator. */ - private class CacheHolder1 extends CacheHolder { + private class CacheGroupHolder1 extends CacheGroupHolder { /** */ - private final GridCacheContext cctx; + private final CacheGroupInfrastructure grp; /** - * @param cctx Cache context. + * @param grp Cache group. * @param initAff Current affinity. */ - CacheHolder1(GridCacheContext cctx, @Nullable GridAffinityAssignmentCache initAff) { - super(cctx.rebalanceEnabled(), cctx.affinity().affinityCache(), initAff); + CacheGroupHolder1(CacheGroupInfrastructure grp, @Nullable GridAffinityAssignmentCache initAff) { + super(grp.rebalanceEnabled(), grp.affinity(), initAff); - assert !cctx.isLocal() : cctx.name(); + assert !grp.isLocal() : grp; - this.cctx = cctx; + this.grp = grp; } /** {@inheritDoc} */ @@ -1665,31 +1674,26 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap return false; } - /** {@inheritDoc} */ - @Override public int partitions() { - return cctx.affinity().partitions(); - } - - /** {@inheritDoc} */ - @Override public String name() { - return cctx.name(); - } - - /** {@inheritDoc} */ - @Override public int cacheId() { - return cctx.cacheId(); - } +// /** {@inheritDoc} */ +// @Override public String name() { +// return cctx.name(); +// } +// +// /** {@inheritDoc} */ +// @Override public int cacheId() { +// return cctx.cacheId(); +// } /** {@inheritDoc} */ @Override public GridDhtPartitionTopology topology(GridDhtPartitionsExchangeFuture fut) { - return cctx.topology(); + return grp.topology(); } } /** * Created if cache is not started on coordinator. */ - private static class CacheHolder2 extends CacheHolder { + private static class CacheGroupHolder2 extends CacheGroupHolder { /** */ private final GridCacheSharedContext cctx; @@ -1701,7 +1705,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @return Cache holder. * @throws IgniteCheckedException If failed. */ - static CacheHolder2 create( + static CacheGroupHolder2 create( GridCacheSharedContext cctx, DynamicCacheDescriptor cacheDesc, GridDhtPartitionsExchangeFuture fut, @@ -1730,7 +1734,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap ccfg.getBackups(), ccfg.getCacheMode() == LOCAL); - return new CacheHolder2(ccfg.getRebalanceMode() != NONE, cctx, aff, initAff); + return new CacheGroupHolder2(ccfg.getRebalanceMode() != NONE, cctx, aff, initAff); } /** @@ -1739,7 +1743,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param aff Affinity. * @param initAff Current affinity. */ - CacheHolder2( + CacheGroupHolder2( boolean rebalanceEnabled, GridCacheSharedContext cctx, GridAffinityAssignmentCache aff, http://git-wip-us.apache.org/repos/asf/ignite/blob/e6927e57/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java new file mode 100644 index 0000000..4a0b3de --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; +import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.lang.IgniteFuture; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheRebalanceMode.NONE; + +/** + * + */ +public class CacheGroupInfrastructure { + /** */ + private GridAffinityAssignmentCache aff; + + /** */ + private final int id; + + /** */ + private final CacheConfiguration ccfg; + + /** */ + private final GridCacheSharedContext ctx; + + private GridDhtPartitionTopology top; + + /** + * @param ctx Context. + * @param ccfg Cache configuration. + */ + public CacheGroupInfrastructure(int id, GridCacheSharedContext ctx, CacheConfiguration ccfg) { + assert id != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", grpName=" + ccfg.getGroupName() + ']'; + assert ccfg != null; + + this.id = id; + this.ctx = ctx; + this.ccfg = ccfg; + } + + /** + * @return {@code True} if cache is local. + */ + public boolean isLocal() { + return ccfg.getCacheMode() == LOCAL; + } + + public CacheConfiguration config() { + return ccfg; + } + + public GridAffinityAssignmentCache affinity() { + return aff; + } + + @Nullable public String groupName() { + return ccfg.getGroupName(); + } + + public int groupId() { + return id; + } + + public boolean sharedGroup() { + return ccfg.getGroupName() != null; + } + + public void start() throws IgniteCheckedException { + aff = new GridAffinityAssignmentCache(ctx.kernalContext(), + groupName(), + id, + ccfg.getAffinity(), + ccfg.getNodeFilter(), + ccfg.getBackups(), + ccfg.getCacheMode() == LOCAL); + } + + /** + * @param reconnectFut + */ + public void onDisconnected(IgniteFuture reconnectFut) { + // TODO IGNITE-5075. + IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, + "Failed to wait for topology update, client disconnected."); + + if (aff != null) + aff.cancelFutures(err); + } + + /** + * @return {@code True} if rebalance is enabled. + */ + public boolean rebalanceEnabled() { + return ccfg.getRebalanceMode() != NONE; + } + + /** + * + */ + public void onReconnected() { + // TODO IGNITE-5075. + aff.onReconnected(); + } + + public GridDhtPartitionTopology topology() { + return top; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e6927e57/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index 9d2563d..b3067cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -94,6 +94,9 @@ public class DynamicCacheChangeRequest implements Serializable { /** */ private transient AffinityTopologyVersion cacheFutTopVer; + /** */ + private transient DynamicCacheDescriptor cacheDesc; + /** * Constructor creates cache stop request. * @@ -372,6 +375,14 @@ public class DynamicCacheChangeRequest implements Serializable { this.schema = schema != null ? schema.copy() : null; } + public DynamicCacheDescriptor cacheDescriptor() { + return cacheDesc; + } + + public void cacheDescriptor(DynamicCacheDescriptor cacheDesc) { + this.cacheDesc = cacheDesc; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheChangeRequest.class, this, "cacheName", cacheName()); http://git-wip-us.apache.org/repos/asf/ignite/blob/e6927e57/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index 92a7af3..471335d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -92,6 +92,9 @@ public class DynamicCacheDescriptor { /** Current schema. */ private QuerySchema schema; + /** */ + private int grpId; + /** * @param ctx Context. * @param cacheCfg Cache configuration. @@ -103,6 +106,7 @@ public class DynamicCacheDescriptor { public DynamicCacheDescriptor(GridKernalContext ctx, CacheConfiguration cacheCfg, CacheType cacheType, + int grpId, boolean template, IgniteUuid deploymentId, QuerySchema schema) { @@ -111,6 +115,7 @@ public class DynamicCacheDescriptor { this.cacheCfg = cacheCfg; this.cacheType = cacheType; + this.grpId = grpId; this.template = template; this.deploymentId = deploymentId; @@ -123,6 +128,10 @@ public class DynamicCacheDescriptor { } } + public int groupId() { + return grpId; + } + /** * @return Cache ID. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e6927e57/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index 571efc3..56ceefb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -60,15 +60,10 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { /** {@inheritDoc} */ @Override public void start0() throws IgniteCheckedException { - affFunction = cctx.config().getAffinity(); - affMapper = cctx.config().getAffinityMapper(); + affFunction = cctx.group().config().getAffinity(); + affMapper = cctx.group().config().getAffinityMapper(); - aff = new GridAffinityAssignmentCache(cctx.kernalContext(), - cctx.namex(), - affFunction, - cctx.config().getNodeFilter(), - cctx.config().getBackups(), - cctx.isLocal()); + aff = cctx.group().affinity(); } /** {@inheritDoc} */ @@ -100,18 +95,20 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { /** {@inheritDoc} */ @Override public void onDisconnected(IgniteFuture reconnectFut) { - IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, - "Failed to wait for topology update, client disconnected."); - - if (aff != null) - aff.cancelFutures(err); + // TODO IGNITE-5075. +// IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, +// "Failed to wait for topology update, client disconnected."); +// +// if (aff != null) +// aff.cancelFutures(err); } /** * */ public void onReconnected() { - aff.onReconnected(); + // TODO IGNITE-5075. +// aff.onReconnected(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e6927e57/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 92c144c..a62badf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -213,6 +213,9 @@ public class GridCacheContext<K, V> implements Externalizable { /** Cache type. */ private CacheType cacheType; + /** */ + private CacheGroupInfrastructure grp; + /** IO policy. */ private byte plc; @@ -288,6 +291,7 @@ public class GridCacheContext<K, V> implements Externalizable { GridKernalContext ctx, GridCacheSharedContext sharedCtx, CacheConfiguration cacheCfg, + CacheGroupInfrastructure grp, CacheType cacheType, boolean affNode, boolean updatesAllowed, @@ -317,6 +321,8 @@ public class GridCacheContext<K, V> implements Externalizable { assert sharedCtx != null; assert cacheCfg != null; + assert grp != null; + assert evtMgr != null; assert storeMgr != null; assert evictMgr != null; @@ -332,6 +338,7 @@ public class GridCacheContext<K, V> implements Externalizable { this.ctx = ctx; this.sharedCtx = sharedCtx; this.cacheCfg = cacheCfg; + this.grp = grp; this.cacheType = cacheType; this.affNode = affNode; this.updatesAllowed = updatesAllowed; @@ -378,6 +385,10 @@ public class GridCacheContext<K, V> implements Externalizable { itHolder = new CacheWeakQueryIteratorsHolder(log); } + public CacheGroupInfrastructure group() { + return grp; + } + /** * @return {@code True} if custom {@link AffinityKeyMapper} is configured for cache. */ @@ -778,7 +789,7 @@ public class GridCacheContext<K, V> implements Externalizable { * @return {@code True} if rebalance is enabled. */ public boolean rebalanceEnabled() { - return cacheCfg.getRebalanceMode() != NONE; + return grp.rebalanceEnabled(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e6927e57/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 4b79361..abb2652 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -215,6 +215,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Internal cache names. */ private final Set<String> internalCaches; + /** */ + private ConcurrentMap<Integer, CacheGroupInfrastructure> cacheGrps = new ConcurrentHashMap<>(); + /** * @param ctx Kernal context. */ @@ -389,8 +392,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param cc Cache Configuration. * @return {@code true} if cache is starting on client node and this node is affinity node for the cache. */ - private boolean storesLocallyOnClient(IgniteConfiguration c, - CacheConfiguration cc) { + private boolean storesLocallyOnClient(IgniteConfiguration c, CacheConfiguration cc) { if (c.isClientMode() && c.getMemoryConfiguration() == null) { if (cc.getCacheMode() == LOCAL) return true; @@ -643,7 +645,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheSharedManager mgr : sharedCtx.managers()) mgr.start(sharedCtx); - //if inActivate on start then skip registrate caches + //if inActivate on start then skip caches registration. if (!activeOnStart) return; @@ -662,10 +664,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed. */ private void registerCacheFromConfig(CacheConfiguration[] cfgs) throws IgniteCheckedException { - for (int i = 0; i < cfgs.length; i++) { - if (ctx.config().isDaemon()) - continue; + if (ctx.config().isDaemon()) + return; + for (int i = 0; i < cfgs.length; i++) { CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]); cfgs[i] = cfg; // Replace original configuration value. @@ -682,7 +684,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (sharedCtx.pageStore() != null && sharedCtx.database().persistenceEnabled() && !ctx.config().isDaemon()) { - Set<String> savedCacheNames = sharedCtx.pageStore().savedCacheNames(); for (CacheConfiguration cfg : cfgs) @@ -692,7 +693,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { savedCacheNames.remove(name); if (!F.isEmpty(savedCacheNames)) { - log.info("Registrate persistent caches: " + savedCacheNames); + if (log.isInfoEnabled()) + log.info("Register persistent caches: " + savedCacheNames); for (String name : savedCacheNames) { CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name); @@ -704,6 +706,25 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } + @Nullable public CacheGroupInfrastructure cacheGroup(int grpId) { + return cacheGrps.get(grpId); + } + + public Collection<CacheGroupInfrastructure> cacheGroups() { + return cacheGrps.values(); + } + + private int nextCacheGroupId(CacheConfiguration ccfg) { + if (ccfg.getGroupName() != null) { + for (DynamicCacheDescriptor cacheDesc : registeredCaches.values()) { + if (ccfg.getGroupName().equals(cacheDesc.cacheConfiguration().getGroupName())) + return cacheDesc.groupId(); + } + } + + return registeredCaches.size() + 1; + } + /** * @param cfg Cache configuration. * @throws IgniteCheckedException If failed. @@ -731,12 +752,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheType cacheType; - if (CU.isUtilityCache(cfg.getName())) - cacheType = CacheType.UTILITY; - else if (internalCaches.contains(maskNull(cfg.getName()))) - cacheType = CacheType.INTERNAL; - else - cacheType = CacheType.USER; + if (CU.isUtilityCache(cfg.getName())) + cacheType = CacheType.UTILITY; + else if (internalCaches.contains(maskNull(cfg.getName()))) + cacheType = CacheType.INTERNAL; + else + cacheType = CacheType.USER; if (cacheType != CacheType.USER && cfg.getMemoryPolicyName() == null) cfg.setMemoryPolicyName(sharedCtx.database().systemMemoryPolicyName()); @@ -746,6 +767,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, cfg, cacheType, + 0, template, IgniteUuid.randomUuid(), new QuerySchema(cfg.getQueryEntities())); @@ -783,6 +805,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx, cfg, cacheType, + 0, true, IgniteUuid.randomUuid(), new QuerySchema(cfg.getQueryEntities())); @@ -886,7 +909,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { CachePluginManager pluginMgr = desc.pluginManager(); GridCacheContext ctx = createCache( - ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed()); + ccfg, null, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed()); ctx.dynamicDeploymentId(desc.deploymentId()); @@ -909,9 +932,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { } // Must call onKernalStart on shared managers after creation of fetched caches. - for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) + for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) { if (sharedCtx.database() != mgr) mgr.onKernalStart(false); + } // Escape if start active on start false if (!activeOnStart) @@ -945,7 +969,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } - assert ctx.config().isDaemon() || caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started"; + // TODO IGNITE-5075. + // assert ctx.config().isDaemon() || caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started"; if (!ctx.clientNode() && !ctx.isDaemon()) addRemovedItemsCleanupTask(Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000)); @@ -1442,6 +1467,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed to create cache. */ private GridCacheContext createCache(CacheConfiguration<?, ?> cfg, + CacheGroupInfrastructure grp, @Nullable CachePluginManager pluginMgr, CacheType cacheType, CacheObjectContext cacheObjCtx, @@ -1517,6 +1543,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx, sharedCtx, cfg, + grp, cacheType, affNode, updatesAllowed, @@ -1648,6 +1675,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx, sharedCtx, cfg, + grp, cacheType, affNode, true, @@ -1826,20 +1854,22 @@ public class GridCacheProcessor extends GridProcessorAdapter { assert req.start() : req; assert req.cacheType() != null : req; - DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName()); + DynamicCacheDescriptor desc = req.cacheDescriptor(); - if (desc != null) - desc.onStart(); + assert desc != null; + + desc.onStart(); prepareCacheStart( req.startCacheConfiguration(), req.nearCacheConfiguration(), req.cacheType(), + desc.groupId(), req.clientStartOnly(), req.initiatingNodeId(), req.deploymentId(), topVer, - desc != null ? desc.schema() : null + desc.schema() ); } @@ -1873,6 +1903,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { desc.cacheConfiguration(), null, desc.cacheType(), + desc.groupId(), false, null, desc.deploymentId(), @@ -1886,6 +1917,20 @@ public class GridCacheProcessor extends GridProcessorAdapter { return started; } + private CacheGroupInfrastructure startCacheGroup(CacheConfiguration cfg0, int grpId) throws IgniteCheckedException { + CacheConfiguration ccfg = new CacheConfiguration(cfg0); + + CacheGroupInfrastructure grp = new CacheGroupInfrastructure(grpId, sharedCtx, ccfg); + + grp.start(); + + CacheGroupInfrastructure old = cacheGrps.put(grpId, grp); + + assert old == null; + + return grp; + } + /** * @param cfg Start configuration. * @param nearCfg Near configuration. @@ -1901,6 +1946,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheConfiguration cfg, NearCacheConfiguration nearCfg, CacheType cacheType, + int grpId, boolean clientStartOnly, UUID initiatingNodeId, IgniteUuid deploymentId, @@ -1920,6 +1966,25 @@ public class GridCacheProcessor extends GridProcessorAdapter { return; if (affNodeStart || clientNodeStart || CU.isSystemCache(cfg.getName())) { + String grpName = cfg.getGroupName(); + + CacheGroupInfrastructure grp = null; + + if (grpName != null) { + for (CacheGroupInfrastructure grp0 : cacheGrps.values()) { + if (grp0.sharedGroup() && grpName.equals(grp0.groupName())) { + grp = grp0; + + break; + } + } + + if (grp == null) + grp = startCacheGroup(cfg, grpId); + } + else + grp = startCacheGroup(cfg, grpId); + if (clientNodeStart && !affNodeStart) { if (nearCfg != null) ccfg.setNearConfiguration(nearCfg); @@ -1929,7 +1994,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); - GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true); + GridCacheContext cacheCtx = createCache(ccfg, grp, null, cacheType, cacheObjCtx, true); cacheCtx.startTopologyVersion(topVer); @@ -2307,6 +2372,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx, ccfg, req.cacheType(), + 0, true, req.deploymentId(), req.schema()); @@ -2349,6 +2415,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx, ccfg, req.cacheType(), + nextCacheGroupId(ccfg), false, req.deploymentId(), req.schema()); @@ -3026,8 +3093,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor desc = registeredTemplates.get(maskNull(req.cacheName())); if (desc == null) { - DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), true, - req.deploymentId(), req.schema()); + DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx, + ccfg, + req.cacheType(), + 0, + true, + req.deploymentId(), + req.schema()); DynamicCacheDescriptor old = registeredTemplates.put(maskNull(ccfg.getName()), templateDesc); @@ -3070,8 +3142,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { assert req.cacheType() != null : req; assert F.eq(ccfg.getName(), req.cacheName()) : req; - DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, - req.deploymentId(), req.schema()); + DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx, + ccfg, + req.cacheType(), + nextCacheGroupId(ccfg), + false, + req.deploymentId(), + req.schema()); if (newTopVer == null) { newTopVer = new AffinityTopologyVersion(topVer.topologyVersion(), @@ -3095,6 +3172,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.initiatingNodeId(), req.nearCacheConfiguration() != null); + req.cacheDescriptor(startDesc); + needExchange = true; } } @@ -3135,6 +3214,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { } desc.clientCacheStartVersion(newTopVer); + + req.cacheDescriptor(desc); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e6927e57/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 9f1b96e..43e6af7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -511,7 +511,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { continue; } - GridCacheEntryEx entry = null; + GridCacheEntryEx entry; while (true) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/e6927e57/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 1286ba9..4eb94ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -338,7 +338,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { assert !F.isEmpty(reqs); assert topVer != null; - for (DynamicCacheChangeRequest req : reqs) + for (DynamicCacheChangeRequest req : reqs) { if (req.globalStateChange()) { ChangeGlobalStateContext cgsCtx = lastCgsCtx; @@ -348,6 +348,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { return true; } + } return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/e6927e57/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index eb0981b..326429f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -224,6 +224,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen * */ private void onKernalStart0(boolean activeOnStart){ + // TODO IGNITE-5075. + if (true) + return; + if (!activeOnStart && ctx.state().active()) ctx.event().addLocalEventListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); http://git-wip-us.apache.org/repos/asf/ignite/blob/e6927e57/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index afe9fea..7d7f766 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -218,6 +218,10 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void onKernalStart(boolean activeOnStart) throws IgniteCheckedException { + // TODO IGNITE-5075. + if (true) + return; + if (ctx.isDaemon() || !ctx.state().active()) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/e6927e57/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java new file mode 100644 index 0000000..58e8722 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; + +/** + * + */ +@SuppressWarnings("unchecked") +public class IgniteCacheGroupsTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testCreateCache1() throws Exception { + Ignite ignite = ignite(0); + + IgniteCache<Object, Object> cache1 = ignite.createCache(cacheConfiguration("grp1", "cache1", ATOMIC)); + //IgniteCache<Object, Object> cache2 = ignite.createCache(cacheConfiguration("grp1", "cache2", ATOMIC)); + + cache1.put(new Key1(1), 1); + assertEquals(1, cache1.get(new Key1(1))); + + //assertFalse(cache2.iterator().hasNext()); + +// cache2.put(new Key2(1), 2); +// assertEquals(2, cache2.get(new Key2(1))); + } + + /** + * + */ + static class Key1 implements Serializable { + /** */ + private int id; + + /** + * @param id ID. + */ + Key1(int id) { + this.id = id; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Key1 key = (Key1)o; + + return id == key.id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + } + + /** + * + */ + static class Key2 implements Serializable { + /** */ + private int id; + + /** + * @param id ID. + */ + Key2(int id) { + this.id = id; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Key2 key = (Key2)o; + + return id == key.id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + } + + private CacheConfiguration cacheConfiguration(String grpName, String name, CacheAtomicityMode atomicityMode) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setGroupName(grpName); + ccfg.setAtomicityMode(atomicityMode); + + return ccfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e6927e57/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index 0f4aa87..57db6c3 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -77,6 +77,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { null ), defaultCacheConfiguration(), + null, CacheType.USER, true, true,
