ignite-1811
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/56b9da06 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/56b9da06 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/56b9da06 Branch: refs/heads/ignite-1811 Commit: 56b9da06314598e6fe1bb41d8a86de241e3b3095 Parents: a728ecc Author: sboikov <[email protected]> Authored: Fri Jan 15 14:08:59 2016 +0300 Committer: sboikov <[email protected]> Committed: Fri Jan 15 14:08:59 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheContext.java | 19 +- .../dht/GridPartitionedGetFuture.java | 2 +- .../dht/GridPartitionedSingleGetFuture.java | 2 +- .../distributed/near/GridNearGetFuture.java | 2 +- .../internal/TestRecordingCommunicationSpi.java | 82 ++++- ...idCacheConfigurationConsistencySelfTest.java | 58 +--- ...niteCacheClientNodeChangingTopologyTest.java | 4 +- .../IgniteCacheReadFromBackupTest.java | 343 ++++++++++++++++++- .../IgniteCrossCacheTxStoreSelfTest.java | 1 + .../near/GridCacheGetStoreErrorSelfTest.java | 9 +- 10 files changed, 440 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/56b9da06/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index df72774..fc48b9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1435,6 +1435,13 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return {@code True} if store and read-through mode are enabled in configuration. + */ + public boolean readThroughConfigured() { + return store().configured() && cacheCfg.isReadThrough(); + } + + /** * @return {@code True} if {@link CacheConfiguration#isLoadPreviousValue()} flag is set. */ public boolean loadPreviousValue() { @@ -1981,16 +1988,10 @@ public class GridCacheContext<K, V> implements Externalizable { private boolean hasPartition(int part, List<ClusterNode> affNodes, AffinityTopologyVersion topVer) { assert affinityNode(); - return (topology().rebalanceFinished(topVer) && (isReplicated() || affNodes.contains(locNode))) - || partitionOwned(part); - } + GridDhtPartitionTopology top = topology(); - /** - * @param part Partition. - * @return {@code True} if partition is in owned state. - */ - private boolean partitionOwned(int part) { - return topology().partitionState(localNodeId(), part) == OWNING; + return (top.rebalanceFinished(topVer) && (isReplicated() || affNodes.contains(locNode))) + || (top.partitionState(localNodeId(), part) == OWNING); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/56b9da06/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index be50384..2bc6869 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -507,7 +507,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion()); // Entry not found, do not continue search if topology did not change and there is no store. - if (!cctx.store().configured() && (topStable || partitionOwned(part))) { + if (!cctx.readThroughConfigured() && (topStable || partitionOwned(part))) { if (!skipVals && cctx.config().isStatisticsEnabled()) cache.metrics0().onRead(false); http://git-wip-us.apache.org/repos/asf/ignite/blob/56b9da06/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index ba14151..96c20e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -427,7 +427,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion()); // Entry not found, complete future with null result if topology did not change and there is no store. - if (!cctx.store().configured() && (topStable || partitionOwned(part))) { + if (!cctx.readThroughConfigured() && (topStable || partitionOwned(part))) { if (!skipVals && cctx.config().isStatisticsEnabled()) colocated.metrics0().onRead(false); http://git-wip-us.apache.org/repos/asf/ignite/blob/56b9da06/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 4802539..12a1b74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -620,7 +620,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion()); // Entry not found, do not continue search if topology did not change and there is no store. - return !cctx.store().configured() && (topStable || partitionOwned(part)); + return !cctx.readThroughConfigured() && (topStable || partitionOwned(part)); } } catch (GridCacheEntryRemovedException ignored) { http://git-wip-us.apache.org/repos/asf/ignite/blob/56b9da06/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java index bf84387..8a602ad 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java @@ -18,16 +18,24 @@ package org.apache.ignite.internal; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME; + /** * */ @@ -38,15 +46,46 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { /** */ private List<Object> recordedMsgs = new ArrayList<>(); + /** */ + private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>(); + + /** */ + private Map<Class<?>, Set<String>> blockCls = new HashMap<>(); + + /** */ + private IgnitePredicate<GridIoMessage> blockP; + /** {@inheritDoc} */ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { if (msg instanceof GridIoMessage) { - Object msg0 = ((GridIoMessage)msg).message(); + GridIoMessage ioMsg = (GridIoMessage)msg; + + Object msg0 = ioMsg.message(); synchronized (this) { if (recordCls != null && msg0.getClass().equals(recordCls)) recordedMsgs.add(msg0); + + boolean block = false; + + if (blockP != null && blockP.apply(ioMsg)) + block = true; + else { + Set<String> blockNodes = blockCls.get(msg0.getClass()); + + if (blockNodes != null) { + String nodeName = (String)node.attributes().get(ATTR_GRID_NAME); + + block = blockNodes.contains(nodeName); + } + } + + if (block) { + blockedMsgs.add(new T2<>(node, ioMsg)); + + return; + } } } @@ -74,4 +113,45 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { return msgs; } } + + /** + * @param blockP Message block predicate. + */ + public void blockMessages(IgnitePredicate<GridIoMessage> blockP) { + synchronized (this) { + this.blockP = blockP; + } + } + + /** + * @param cls Message class. + * @param nodeName Node name. + */ + public void blockMessages(Class<?> cls, String nodeName) { + synchronized (this) { + Set<String> set = blockCls.get(cls); + + if (set == null) { + set = new HashSet<>(); + + blockCls.put(cls, set); + } + + set.add(nodeName); + } + } + + /** + * Stops block messages and sends all already blocked messages. + */ + public void stopBlock() { + synchronized (this) { + blockCls.clear(); + + for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) + super.sendMessage(msg.get1(), msg.get2()); + + blockedMsgs.clear(); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/56b9da06/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java index e28e89f..a1f917f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java @@ -19,25 +19,19 @@ package org.apache.ignite.internal.processors.cache; import java.io.Externalizable; import java.io.Serializable; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; import java.util.concurrent.Callable; import javax.cache.Cache; -import javax.cache.integration.CacheLoaderException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheInterceptorAdapter; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.affinity.AffinityNodeIdHashResolver; -import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.eviction.EvictionFilter; import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy; import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy; import org.apache.ignite.cache.eviction.random.RandomEvictionPolicy; -import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DeploymentMode; @@ -46,7 +40,6 @@ import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -54,7 +47,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridStringLogger; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; @@ -862,49 +854,9 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac }, IgniteCheckedException.class, null); } - /** */ - private static class TestStore implements CacheStore<Object,Object> { - /** {@inheritDoc} */ - @Nullable @Override public Object load(Object key) { - return null; - } - - /** {@inheritDoc} */ - @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public Map<Object, Object> loadAll(Iterable<?> keys) throws CacheLoaderException { - return Collections.emptyMap(); - } - - /** {@inheritDoc} */ - @Override public void write(Cache.Entry<?, ?> entry) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void delete(Object key) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void deleteAll(Collection<?> keys) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void sessionEnd(boolean commit) { - // No-op. - } - } - + /** + * + */ private static class TestRendezvousAffinityFunction extends RendezvousAffinityFunction { /** * Empty constructor required by {@link Externalizable}. @@ -941,6 +893,10 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac // No-op, just different class. } + /** + * + */ private static class TestCacheDefaultAffinityKeyMapper extends GridCacheDefaultAffinityKeyMapper { + // No-op, just different class. } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/56b9da06/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java index e7657a6..13f2598 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -2010,7 +2010,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac private List<Object> recordedMsgs = new ArrayList<>(); /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { if (msg instanceof GridIoMessage) { Object msg0 = ((GridIoMessage)msg).message(); @@ -2032,7 +2032,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac } } - super.sendMessage(node, msg, ackClosure); + super.sendMessage(node, msg, ackC); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/56b9da06/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java index ee72909..af018cc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java @@ -17,17 +17,42 @@ package org.apache.ignite.internal.processors.cache.distributed; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; + /** * */ @@ -50,6 +75,7 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest { return cfg; } + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); @@ -67,48 +93,335 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testGetFromBackupStoreReadThroughEnabled() throws Exception { + for (CacheConfiguration<Object, Object> ccfg : cacheConfigurations()) { + ccfg.setCacheStoreFactory(new TestStoreFactory()); + ccfg.setReadThrough(true); + + boolean near = (ccfg.getNearConfiguration() != null); + + log.info("Test cache [mode=" + ccfg.getCacheMode() + + ", atomicity=" + ccfg.getAtomicityMode() + + ", backups=" + ccfg.getBackups() + + ", near=" + near + "]"); + + ignite(0).createCache(ccfg); + + awaitPartitionMapExchange(); + + try { + for (int i = 0; i < NODES; i++) { + Ignite ignite = ignite(i); + + log.info("Check node: " + ignite.name()); + + IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName()); + + TestRecordingCommunicationSpi spi = recordGetRequests(ignite, near); + + Integer key = backupKey(cache); + + assertNull(cache.get(key)); + + List<Object> msgs = spi.recordedMessages(); + + assertEquals(1, msgs.size()); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testGetFromBackupStoreReadThroughDisabled() throws Exception { + for (CacheConfiguration<Object, Object> ccfg : cacheConfigurations()) { + ccfg.setCacheStoreFactory(new TestStoreFactory()); + ccfg.setReadThrough(false); + + boolean near = (ccfg.getNearConfiguration() != null); + + log.info("Test cache [mode=" + ccfg.getCacheMode() + + ", atomicity=" + ccfg.getAtomicityMode() + + ", backups=" + ccfg.getBackups() + + ", near=" + near + "]"); + + ignite(0).createCache(ccfg); + + awaitPartitionMapExchange(); + + try { + checkLocalRead(NODES, ccfg); + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testGetFromPrimaryPreloadInProgress() throws Exception { + for (final CacheConfiguration<Object, Object> ccfg : cacheConfigurations()) { + boolean near = (ccfg.getNearConfiguration() != null); + + log.info("Test cache [mode=" + ccfg.getCacheMode() + + ", atomicity=" + ccfg.getAtomicityMode() + + ", backups=" + ccfg.getBackups() + + ", near=" + near + "]"); + + ignite(0).createCache(ccfg); + + awaitPartitionMapExchange(); + + try { + Map<Ignite, Integer> backupKeys = new HashMap<>(); + Map<Ignite, Integer> nearKeys = new HashMap<>(); + + for (int i = 0; i < NODES; i++) { + Ignite ignite = ignite(i); + + IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName()); + + backupKeys.put(ignite, backupKey(cache)); + + if (ccfg.getCacheMode() == PARTITIONED) + nearKeys.put(ignite, nearKey(cache)); + + TestRecordingCommunicationSpi spi = + (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + spi.blockMessages(new IgnitePredicate<GridIoMessage>() { + @Override public boolean apply(GridIoMessage ioMsg) { + if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessageV2.class)) + return false; + + GridDhtPartitionSupplyMessageV2 msg = (GridDhtPartitionSupplyMessageV2)ioMsg.message(); + + return msg.cacheId() == CU.cacheId(ccfg.getName()); + } + }); + } + + try (Ignite newNode = startGrid(NODES)) { + IgniteCache<Integer, Integer> cache = newNode.cache(ccfg.getName()); + + TestRecordingCommunicationSpi newNodeSpi = recordGetRequests(newNode, near); + + Integer key = backupKey(cache); + + assertNull(cache.get(key)); + + List<Object> msgs = newNodeSpi.recordedMessages(); + + assertEquals(1, msgs.size()); + + for (int i = 0; i < NODES; i++) { + Ignite ignite = ignite(i); + + log.info("Check node: " + ignite.name()); + + checkLocalRead(ignite, ccfg, backupKeys.get(ignite), nearKeys.get(ignite)); + } + + for (int i = 0; i < NODES; i++) { + Ignite ignite = ignite(i); + + TestRecordingCommunicationSpi spi = + (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + spi.stopBlock(); + } + + awaitPartitionMapExchange(); + + checkLocalRead(NODES + 1, ccfg); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ public void testNoPrimaryReadPreloadFinished() throws Exception { + for (CacheConfiguration<Object, Object> ccfg : cacheConfigurations()) { + boolean near = (ccfg.getNearConfiguration() != null); + + log.info("Test cache [mode=" + ccfg.getCacheMode() + + ", atomicity=" + ccfg.getAtomicityMode() + + ", backups=" + ccfg.getBackups() + + ", near=" + near + "]"); + + ignite(0).createCache(ccfg); + + awaitPartitionMapExchange(); + + try { + checkLocalRead(NODES, ccfg); + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } } /** + * @param nodes Number of nodes. * @param ccfg Cache configuration. * @throws Exception If failed. */ - private void checkNoPrimaryRead(CacheConfiguration<Object, Object> ccfg) throws Exception { - ignite(0).createCache(ccfg); + private void checkLocalRead(int nodes, CacheConfiguration<Object, Object> ccfg) throws Exception { + for (int i = 0; i < nodes; i++) { + Ignite ignite = ignite(i); - try { - for (int i = 0; i < NODES; i++) { - Ignite ignite = ignite(i); + log.info("Check node: " + ignite.name()); - IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName()); + IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName()); - TestRecordingCommunicationSpi spi = - (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); + List<Integer> backupKeys = backupKeys(cache, 2, 0); - Integer key = nearKey(cache); + Integer backupKey = backupKeys.get(0); - assertNull(cache.get(key)); - } + Integer nearKey = ccfg.getCacheMode() == PARTITIONED ? nearKey(cache) : null; + + checkLocalRead(ignite, ccfg, backupKey, nearKey); + + Set<Integer> keys = new HashSet<>(backupKeys); + + Map<Integer, Integer> vals = cache.getAll(keys); + + for (Integer key : keys) + assertNull(vals.get(key)); + + TestRecordingCommunicationSpi spi = + (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + List<Object> msgs = spi.recordedMessages(); + + assertEquals(0, msgs.size()); } - finally { - ignite(0).destroyCache(ccfg.getName()); + } + + /** + * @param ignite Node. + * @param ccfg Cache configuration. + * @param backupKey Backup key. + * @param nearKey Near key. + * @throws Exception If failed. + */ + private void checkLocalRead(Ignite ignite, + CacheConfiguration<Object, Object> ccfg, + Integer backupKey, + Integer nearKey) throws Exception { + IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName()); + + TestRecordingCommunicationSpi spi = recordGetRequests(ignite, ccfg.getNearConfiguration() != null); + + List<Object> msgs; + + if (nearKey != null) { + assertNull(cache.get(nearKey)); + + msgs = spi.recordedMessages(); + + assertEquals(1, msgs.size()); } + + assertNull(cache.get(backupKey)); + + msgs = spi.recordedMessages(); + + assertTrue(msgs.isEmpty()); + } + + /** + * @param ignite Node. + * @param near Near cache flag. + * @return Communication SPI. + */ + private TestRecordingCommunicationSpi recordGetRequests(Ignite ignite, boolean near) { + TestRecordingCommunicationSpi spi = + (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + spi.record(near ? GridNearGetRequest.class : GridNearSingleGetRequest.class); + + return spi; + } + + /** + * @return Cache configurations to test. + */ + private List<CacheConfiguration<Object, Object>> cacheConfigurations() { + List<CacheConfiguration<Object, Object>> ccfgs = new ArrayList<>(); + + ccfgs.add(cacheConfiguration(REPLICATED, ATOMIC, 0, false)); + ccfgs.add(cacheConfiguration(REPLICATED, TRANSACTIONAL, 0, false)); + + ccfgs.add(cacheConfiguration(PARTITIONED, ATOMIC, 1, false)); + ccfgs.add(cacheConfiguration(PARTITIONED, ATOMIC, 1, true)); + ccfgs.add(cacheConfiguration(PARTITIONED, ATOMIC, 2, false)); + + ccfgs.add(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 1, false)); + ccfgs.add(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 1, true)); + ccfgs.add(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 2, false)); + + return ccfgs; } /** * @param cacheMode Cache mode. + * @param atomicityMode Cache atomicity mode. * @param backups Number of backups. + * @param nearEnabled {@code True} if near cache should be enabled. * @return Cache configuration. */ - private CacheConfiguration<Object, Object> cacheConfiguration(CacheMode cacheMode, int backups) { + private CacheConfiguration<Object, Object> cacheConfiguration(CacheMode cacheMode, + CacheAtomicityMode atomicityMode, + int backups, + boolean nearEnabled) { CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(atomicityMode); - if (cacheMode != CacheMode.REPLICATED) + if (cacheMode != REPLICATED) { ccfg.setBackups(backups); + if (nearEnabled) + ccfg.setNearConfiguration(new NearCacheConfiguration<>()); + } + return ccfg; } + + /** + * + */ + private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> { + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public CacheStore<Object, Object> create() { + return new CacheStoreAdapter() { + @Override public Object load(Object key) throws CacheLoaderException { + return null; + } + + @Override public void write(Cache.Entry entry) throws CacheWriterException { + // No-op. + } + + @Override public void delete(Object key) throws CacheWriterException { + // No-op. + } + }; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/56b9da06/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java index 68cac17..94613db 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java @@ -300,6 +300,7 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest { throws CacheLoaderException { } + /** {@inheritDoc} */ @Override public void sessionEnd(boolean commit) throws CacheWriterException { evts.offer("sessionEnd " + commit); } http://git-wip-us.apache.org/repos/asf/ignite/blob/56b9da06/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java index 7bd845a..3e6a245 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java @@ -107,7 +107,11 @@ public class GridCacheGetStoreErrorSelfTest extends GridCommonAbstractTest { checkGetError(false, LOCAL); } - /** @throws Exception If failed. */ + /** + * @param nearEnabled Near cache flag. + * @param cacheMode Cache mode. + * @throws Exception If failed. + */ private void checkGetError(boolean nearEnabled, CacheMode cacheMode) throws Exception { this.nearEnabled = nearEnabled; this.cacheMode = cacheMode; @@ -147,14 +151,17 @@ public class GridCacheGetStoreErrorSelfTest extends GridCommonAbstractTest { */ @SuppressWarnings("PublicInnerClass") public static class TestStore extends CacheStoreAdapter<Object, Object> { + /** {@inheritDoc} */ @Override public Object load(Object key) { throw new IgniteException("Failed to get key from store: " + key); } + /** {@inheritDoc} */ @Override public void write(Cache.Entry<?, ?> entry) { // No-op. } + /** {@inheritDoc} */ @Override public void delete(Object key) { // No-op. }
