Repository: ignite Updated Branches: refs/heads/ignite-5075 103f68160 -> fb3c784d7
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fb3c784d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fb3c784d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fb3c784d Branch: refs/heads/ignite-5075 Commit: fb3c784d7d0e8ca7b24e66a75e1a4ea75a33b34a Parents: 103f681 Author: sboikov <sboi...@gridgain.com> Authored: Thu May 18 10:55:25 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu May 18 11:16:44 2017 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 2 +- .../cache/CacheGroupInfrastructure.java | 8 +++++ .../processors/cache/GridCacheContext.java | 17 ----------- .../GridDhtPartitionsExchangeFuture.java | 32 +++++++++++--------- .../IgniteCachePartitionLossPolicySelfTest.java | 19 +----------- .../atomic/IgniteCacheAtomicProtocolTest.java | 7 +++-- 6 files changed, 31 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fb3c784d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index a46f616..ae5e3b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -406,7 +406,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } - return res; + return res == null ? Collections.<String, Map<UUID,Boolean>>emptyMap() : res; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/fb3c784d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index e112bbc..d176822 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; @@ -169,6 +170,13 @@ public class CacheGroupInfrastructure { } /** + * @return {@code True} if this is cache group for one of system caches. + */ + public boolean systemCache() { + return !sharedGroup() && CU.isSystemCache(ccfg.getName()); + } + + /** * @return Node ID initiated cache group start. */ public UUID receivedFrom() { http://git-wip-us.apache.org/repos/asf/ignite/blob/fb3c784d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 2d5a046..92e002d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -237,9 +237,6 @@ public class GridCacheContext<K, V> implements Externalizable { /** Updates allowed flag. */ private boolean updatesAllowed; - /** Flag indicating that this cache is in a recovery mode. */ - private boolean needsRecovery; - /** Deployment enabled flag for this specific cache */ private boolean depEnabled; @@ -1969,20 +1966,6 @@ public class GridCacheContext<K, V> implements Externalizable { } /** - * @return Current cache state. Must only be modified during exchange. - */ - public boolean needsRecovery() { - return needsRecovery; - } - - /** - * @param needsRecovery Needs recovery flag. - */ - public void needsRecovery(boolean needsRecovery) { - this.needsRecovery = needsRecovery; - } - - /** * Nulling references to potentially leak-prone objects. */ public void cleanup() { http://git-wip-us.apache.org/repos/asf/ignite/blob/fb3c784d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index d916e7d..b07159b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -194,8 +194,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** */ private CacheAffinityChangeMessage affChangeMsg; - /** Cache validation results. */ - private volatile Map<Integer, CacheValidation> cacheValidRes; + /** Cache groups validation results. */ + private volatile Map<Integer, CacheValidation> grpValidRes; /** Skip preload flag. */ private boolean skipPreload; @@ -1164,21 +1164,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT discoEvt.type() == EVT_NODE_JOINED) detectLostPartitions(); - Map<Integer, CacheValidation> m = new HashMap<>(cctx.cacheContexts().size()); + Map<Integer, CacheValidation> m = new HashMap<>(cctx.cache().cacheGroups().size()); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - Collection<Integer> lostParts = cacheCtx.isLocal() ? - Collections.<Integer>emptyList() : cacheCtx.topology().lostPartitions(); + for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { + Collection<Integer> lostParts = grp.isLocal() ? + Collections.<Integer>emptyList() : grp.topology().lostPartitions(); boolean valid = true; - if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name())) - valid = cacheCtx.config().getTopologyValidator().validate(discoEvt.topologyNodes()); + if (grp.config().getTopologyValidator() != null && !grp.systemCache()) + valid = grp.config().getTopologyValidator().validate(discoEvt.topologyNodes()); - m.put(cacheCtx.cacheId(), new CacheValidation(valid, lostParts)); + m.put(grp.groupId(), new CacheValidation(valid, lostParts)); } - cacheValidRes = m; + grpValidRes = m; } cctx.cache().onExchangeDone(exchId.topologyVersion(), exchActions, err); @@ -1235,16 +1235,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT return new CacheInvalidStateException( "Failed to perform cache operation (cluster is not activated): " + cctx.name()); - PartitionLossPolicy partLossPlc = cctx.config().getPartitionLossPolicy(); + CacheGroupInfrastructure grp = cctx.group(); + + PartitionLossPolicy partLossPlc = grp.config().getPartitionLossPolicy(); - if (cctx.needsRecovery() && !recovery) { + if (grp.needsRecovery() && !recovery) { if (!read && (partLossPlc == READ_ONLY_SAFE || partLossPlc == READ_ONLY_ALL)) return new IgniteCheckedException("Failed to write to cache (cache is moved to a read-only state): " + cctx.name()); } - if (cctx.needsRecovery() || cctx.config().getTopologyValidator() != null) { - CacheValidation validation = cacheValidRes.get(cctx.cacheId()); + if (grp.needsRecovery() || grp.config().getTopologyValidator() != null) { + CacheValidation validation = grpValidRes.get(grp.groupId()); if (validation == null) return null; @@ -1253,7 +1255,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT return new IgniteCheckedException("Failed to perform cache operation " + "(cache topology is not valid): " + cctx.name()); - if (recovery || !cctx.needsRecovery()) + if (recovery || !grp.needsRecovery()) return null; if (key != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/fb3c784d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java index 480dc20..36d4ac5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.util.Collection; import java.util.Collections; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.cache.CacheException; @@ -42,7 +41,6 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.ignite.util.TestTcpCommunicationSpi; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -67,23 +65,8 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - disco.setIpFinder(ipFinder); - cfg.setDiscoverySpi(disco); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); - if (gridName.matches(".*\\d")) { - String idStr = UUID.randomUUID().toString(); - - char[] chars = idStr.toCharArray(); - - chars[chars.length - 3] = '0'; - chars[chars.length - 2] = '0'; - chars[chars.length - 1] = gridName.charAt(gridName.length() - 1); - - cfg.setNodeId(UUID.fromString(new String(chars))); - } - - cfg.setCommunicationSpi(new TestTcpCommunicationSpi()); cfg.setClientMode(client); CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(CACHE_NAME); http://git-wip-us.apache.org/repos/asf/ignite/blob/fb3c784d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java index c397941..749ebe8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java @@ -34,12 +34,11 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; -import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; +import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteFuture; @@ -100,11 +99,13 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { * */ private void blockRebalance() { + final int grpId = groupIdForCache(ignite(0), TEST_CACHE); + for (Ignite node : G.allGrids()) { testSpi(node).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { @Override public boolean apply(ClusterNode node, Message msg) { return (msg instanceof GridDhtPartitionSupplyMessage) - && ((GridCacheIdMessage)msg).cacheId() == CU.cacheId(TEST_CACHE); + && ((GridCacheGroupIdMessage)msg).groupId() == grpId; } }); }