Repository: ignite Updated Branches: refs/heads/master d85616b9b -> 83b2bf5e1
http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java new file mode 100644 index 0000000..b14109b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; + +/** + * + */ +public class IgniteCacheGetRestartTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final long TEST_TIME = 60_000; + + /** */ + private static final int SRVS = 3; + + /** */ + private static final int CLIENTS = 1; + + /** */ + private static final int KEYS = 100_000; + + /** */ + private ThreadLocal<Boolean> client = new ThreadLocal<>(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + Boolean clientMode = client.get(); + + if (clientMode != null) { + cfg.setClientMode(clientMode); + + client.remove(); + } + + cfg.setConsistentId(gridName); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(SRVS); + + for (int i = 0; i < CLIENTS; i++) { + client.set(true); + + Ignite client = startGrid(SRVS); + + assertTrue(client.configuration().isClientMode()); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIME + 60_000; + } + + /** + * @throws Exception If failed. + */ + public void testGetRestartReplicated() throws Exception { + CacheConfiguration<Object, Object> cache = cacheConfiguration(REPLICATED, 0, false); + + checkRestart(cache, 3); + } + + /** + * @throws Exception If failed. + */ + public void testGetRestartPartitioned1() throws Exception { + CacheConfiguration<Object, Object> cache = cacheConfiguration(PARTITIONED, 1, false); + + checkRestart(cache, 1); + } + + /** + * @throws Exception If failed. + */ + public void testGetRestartPartitioned2() throws Exception { + CacheConfiguration<Object, Object> cache = cacheConfiguration(PARTITIONED, 2, false); + + checkRestart(cache, 2); + } + + /** + * @throws Exception If failed. + */ + public void testGetRestartPartitionedNearEnabled() throws Exception { + CacheConfiguration<Object, Object> cache = cacheConfiguration(PARTITIONED, 1, true); + + checkRestart(cache, 1); + } + + /** + * @param ccfg Cache configuration. + * @param restartCnt Number of nodes to restart. + * @throws Exception If failed. + */ + private void checkRestart(final CacheConfiguration ccfg, final int restartCnt) throws Exception { + ignite(0).createCache(ccfg); + + try { + if (ccfg.getNearConfiguration() != null) + ignite(SRVS).createNearCache(ccfg.getName(), new NearCacheConfiguration<>()); + + try (IgniteDataStreamer<Object, Object> streamer = ignite(0).dataStreamer(ccfg.getName())) { + for (int i = 0; i < KEYS; i++) + streamer.addData(i, i); + } + + final long stopTime = U.currentTimeMillis() + TEST_TIME; + + final AtomicInteger nodeIdx = new AtomicInteger(); + + IgniteInternalFuture<?> fut1 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + Ignite ignite = ignite(nodeIdx.getAndIncrement()); + + log.info("Check get [node=" + ignite.name() + + ", client=" + ignite.configuration().isClientMode() + ']'); + + IgniteCache<Object, Object> cache = ignite.cache(ccfg.getName()); + + while (U.currentTimeMillis() < stopTime) + checkGet(cache); + + return null; + } + }, SRVS + CLIENTS, "get-thread"); + + final AtomicInteger restartNodeIdx = new AtomicInteger(SRVS + CLIENTS); + + final AtomicBoolean clientNode = new AtomicBoolean(); + + IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int nodeIdx = restartNodeIdx.getAndIncrement(); + + boolean clientMode = clientNode.compareAndSet(false, true); + + while (U.currentTimeMillis() < stopTime) { + if (clientMode) + client.set(true); + + log.info("Restart node [node=" + nodeIdx + ", client=" + clientMode + ']'); + + Ignite ignite = startGrid(nodeIdx); + + IgniteCache<Object, Object> cache; + + if (clientMode && ccfg.getNearConfiguration() != null) + cache = ignite.createNearCache(ccfg.getName(), new NearCacheConfiguration<>()); + else + cache = ignite.cache(ccfg.getName()); + + checkGet(cache); + + IgniteInternalFuture<?> syncFut = ((IgniteCacheProxy)cache).context().preloader().syncFuture(); + + while (!syncFut.isDone()) + checkGet(cache); + + checkGet(cache); + + stopGrid(nodeIdx); + } + + return null; + } + }, restartCnt + 1, "restart-thread"); + + fut1.get(); + fut2.get(); + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param cache Cache. + */ + private void checkGet(IgniteCache<Object, Object> cache) { + for (int i = 0; i < KEYS; i++) + assertEquals(i, cache.get(i)); + + Set<Integer> keys = new HashSet<>(); + + for (int i = 0; i < KEYS; i++) { + keys.add(i); + + if (keys.size() == 100) { + Map<Object, Object> vals = cache.getAll(keys); + + for (Object key : keys) + assertEquals(key, vals.get(key)); + + keys.clear(); + } + } + } + + /** + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @param near If {@code true} near cache is enabled. + * @return Cache configuration. + */ + private CacheConfiguration<Object, Object> cacheConfiguration(CacheMode cacheMode, int backups, boolean near) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setCacheMode(cacheMode); + + if (cacheMode != REPLICATED) + ccfg.setBackups(backups); + + if (near) + ccfg.setNearConfiguration(new NearCacheConfiguration<>()); + + ccfg.setRebalanceMode(ASYNC); + + return ccfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java new file mode 100644 index 0000000..af018cc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; + +/** + * + */ +public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 4; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); + + cfg.setCommunicationSpi(commSpi); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testGetFromBackupStoreReadThroughEnabled() throws Exception { + for (CacheConfiguration<Object, Object> ccfg : cacheConfigurations()) { + ccfg.setCacheStoreFactory(new TestStoreFactory()); + ccfg.setReadThrough(true); + + boolean near = (ccfg.getNearConfiguration() != null); + + log.info("Test cache [mode=" + ccfg.getCacheMode() + + ", atomicity=" + ccfg.getAtomicityMode() + + ", backups=" + ccfg.getBackups() + + ", near=" + near + "]"); + + ignite(0).createCache(ccfg); + + awaitPartitionMapExchange(); + + try { + for (int i = 0; i < NODES; i++) { + Ignite ignite = ignite(i); + + log.info("Check node: " + ignite.name()); + + IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName()); + + TestRecordingCommunicationSpi spi = recordGetRequests(ignite, near); + + Integer key = backupKey(cache); + + assertNull(cache.get(key)); + + List<Object> msgs = spi.recordedMessages(); + + assertEquals(1, msgs.size()); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testGetFromBackupStoreReadThroughDisabled() throws Exception { + for (CacheConfiguration<Object, Object> ccfg : cacheConfigurations()) { + ccfg.setCacheStoreFactory(new TestStoreFactory()); + ccfg.setReadThrough(false); + + boolean near = (ccfg.getNearConfiguration() != null); + + log.info("Test cache [mode=" + ccfg.getCacheMode() + + ", atomicity=" + ccfg.getAtomicityMode() + + ", backups=" + ccfg.getBackups() + + ", near=" + near + "]"); + + ignite(0).createCache(ccfg); + + awaitPartitionMapExchange(); + + try { + checkLocalRead(NODES, ccfg); + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testGetFromPrimaryPreloadInProgress() throws Exception { + for (final CacheConfiguration<Object, Object> ccfg : cacheConfigurations()) { + boolean near = (ccfg.getNearConfiguration() != null); + + log.info("Test cache [mode=" + ccfg.getCacheMode() + + ", atomicity=" + ccfg.getAtomicityMode() + + ", backups=" + ccfg.getBackups() + + ", near=" + near + "]"); + + ignite(0).createCache(ccfg); + + awaitPartitionMapExchange(); + + try { + Map<Ignite, Integer> backupKeys = new HashMap<>(); + Map<Ignite, Integer> nearKeys = new HashMap<>(); + + for (int i = 0; i < NODES; i++) { + Ignite ignite = ignite(i); + + IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName()); + + backupKeys.put(ignite, backupKey(cache)); + + if (ccfg.getCacheMode() == PARTITIONED) + nearKeys.put(ignite, nearKey(cache)); + + TestRecordingCommunicationSpi spi = + (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + spi.blockMessages(new IgnitePredicate<GridIoMessage>() { + @Override public boolean apply(GridIoMessage ioMsg) { + if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessageV2.class)) + return false; + + GridDhtPartitionSupplyMessageV2 msg = (GridDhtPartitionSupplyMessageV2)ioMsg.message(); + + return msg.cacheId() == CU.cacheId(ccfg.getName()); + } + }); + } + + try (Ignite newNode = startGrid(NODES)) { + IgniteCache<Integer, Integer> cache = newNode.cache(ccfg.getName()); + + TestRecordingCommunicationSpi newNodeSpi = recordGetRequests(newNode, near); + + Integer key = backupKey(cache); + + assertNull(cache.get(key)); + + List<Object> msgs = newNodeSpi.recordedMessages(); + + assertEquals(1, msgs.size()); + + for (int i = 0; i < NODES; i++) { + Ignite ignite = ignite(i); + + log.info("Check node: " + ignite.name()); + + checkLocalRead(ignite, ccfg, backupKeys.get(ignite), nearKeys.get(ignite)); + } + + for (int i = 0; i < NODES; i++) { + Ignite ignite = ignite(i); + + TestRecordingCommunicationSpi spi = + (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + spi.stopBlock(); + } + + awaitPartitionMapExchange(); + + checkLocalRead(NODES + 1, ccfg); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testNoPrimaryReadPreloadFinished() throws Exception { + for (CacheConfiguration<Object, Object> ccfg : cacheConfigurations()) { + boolean near = (ccfg.getNearConfiguration() != null); + + log.info("Test cache [mode=" + ccfg.getCacheMode() + + ", atomicity=" + ccfg.getAtomicityMode() + + ", backups=" + ccfg.getBackups() + + ", near=" + near + "]"); + + ignite(0).createCache(ccfg); + + awaitPartitionMapExchange(); + + try { + checkLocalRead(NODES, ccfg); + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + } + + /** + * @param nodes Number of nodes. + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + private void checkLocalRead(int nodes, CacheConfiguration<Object, Object> ccfg) throws Exception { + for (int i = 0; i < nodes; i++) { + Ignite ignite = ignite(i); + + log.info("Check node: " + ignite.name()); + + IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName()); + + List<Integer> backupKeys = backupKeys(cache, 2, 0); + + Integer backupKey = backupKeys.get(0); + + Integer nearKey = ccfg.getCacheMode() == PARTITIONED ? nearKey(cache) : null; + + checkLocalRead(ignite, ccfg, backupKey, nearKey); + + Set<Integer> keys = new HashSet<>(backupKeys); + + Map<Integer, Integer> vals = cache.getAll(keys); + + for (Integer key : keys) + assertNull(vals.get(key)); + + TestRecordingCommunicationSpi spi = + (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + List<Object> msgs = spi.recordedMessages(); + + assertEquals(0, msgs.size()); + } + } + + /** + * @param ignite Node. + * @param ccfg Cache configuration. + * @param backupKey Backup key. + * @param nearKey Near key. + * @throws Exception If failed. + */ + private void checkLocalRead(Ignite ignite, + CacheConfiguration<Object, Object> ccfg, + Integer backupKey, + Integer nearKey) throws Exception { + IgniteCache<Integer, Integer> cache = ignite.cache(ccfg.getName()); + + TestRecordingCommunicationSpi spi = recordGetRequests(ignite, ccfg.getNearConfiguration() != null); + + List<Object> msgs; + + if (nearKey != null) { + assertNull(cache.get(nearKey)); + + msgs = spi.recordedMessages(); + + assertEquals(1, msgs.size()); + } + + assertNull(cache.get(backupKey)); + + msgs = spi.recordedMessages(); + + assertTrue(msgs.isEmpty()); + } + + /** + * @param ignite Node. + * @param near Near cache flag. + * @return Communication SPI. + */ + private TestRecordingCommunicationSpi recordGetRequests(Ignite ignite, boolean near) { + TestRecordingCommunicationSpi spi = + (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); + + spi.record(near ? GridNearGetRequest.class : GridNearSingleGetRequest.class); + + return spi; + } + + /** + * @return Cache configurations to test. + */ + private List<CacheConfiguration<Object, Object>> cacheConfigurations() { + List<CacheConfiguration<Object, Object>> ccfgs = new ArrayList<>(); + + ccfgs.add(cacheConfiguration(REPLICATED, ATOMIC, 0, false)); + ccfgs.add(cacheConfiguration(REPLICATED, TRANSACTIONAL, 0, false)); + + ccfgs.add(cacheConfiguration(PARTITIONED, ATOMIC, 1, false)); + ccfgs.add(cacheConfiguration(PARTITIONED, ATOMIC, 1, true)); + ccfgs.add(cacheConfiguration(PARTITIONED, ATOMIC, 2, false)); + + ccfgs.add(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 1, false)); + ccfgs.add(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 1, true)); + ccfgs.add(cacheConfiguration(PARTITIONED, TRANSACTIONAL, 2, false)); + + return ccfgs; + } + + /** + * @param cacheMode Cache mode. + * @param atomicityMode Cache atomicity mode. + * @param backups Number of backups. + * @param nearEnabled {@code True} if near cache should be enabled. + * @return Cache configuration. + */ + private CacheConfiguration<Object, Object> cacheConfiguration(CacheMode cacheMode, + CacheAtomicityMode atomicityMode, + int backups, + boolean nearEnabled) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(atomicityMode); + + if (cacheMode != REPLICATED) { + ccfg.setBackups(backups); + + if (nearEnabled) + ccfg.setNearConfiguration(new NearCacheConfiguration<>()); + } + + return ccfg; + } + + /** + * + */ + private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> { + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public CacheStore<Object, Object> create() { + return new CacheStoreAdapter() { + @Override public Object load(Object key) throws CacheLoaderException { + return null; + } + + @Override public void write(Cache.Entry entry) throws CacheWriterException { + // No-op. + } + + @Override public void delete(Object key) throws CacheWriterException { + // No-op. + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java index 42b5ee3..48fc961 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java @@ -21,26 +21,19 @@ import java.util.ArrayList; import java.util.List; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; -import org.apache.ignite.lang.IgniteInClosure; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.spi.IgniteSpiException; -import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; -import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; @@ -69,7 +62,7 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest { cfg.setClientMode(client); - TestCommunicationSpi commSpi = new TestCommunicationSpi(); + TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); cfg.setCommunicationSpi(commSpi); @@ -156,7 +149,7 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest { Ignite node = cache.unwrap(Ignite.class); - TestCommunicationSpi spi = (TestCommunicationSpi)node.configuration().getCommunicationSpi(); + TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi)node.configuration().getCommunicationSpi(); spi.record(GridNearSingleGetRequest.class); @@ -164,17 +157,24 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest { assertNotSame(node, primary); - TestCommunicationSpi primarySpi = (TestCommunicationSpi)primary.configuration().getCommunicationSpi(); + TestRecordingCommunicationSpi primarySpi = + (TestRecordingCommunicationSpi)primary.configuration().getCommunicationSpi(); primarySpi.record(GridNearSingleGetResponse.class); assertNull(cache.get(key)); - checkMessages(spi, primarySpi); + if (backup) + checkNoMessages(spi, primarySpi); + else + checkMessages(spi, primarySpi); assertFalse(cache.containsKey(key)); - checkMessages(spi, primarySpi); + if (backup) + checkNoMessages(spi, primarySpi); + else + checkMessages(spi, primarySpi); cache.put(key, 1); @@ -201,7 +201,10 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest { tx.commit(); } - checkMessages(spi, primarySpi); + if (backup) + checkNoMessages(spi, primarySpi); + else + checkMessages(spi, primarySpi); try (Transaction tx = node.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) { assertFalse(cache.containsKey(key)); @@ -209,7 +212,10 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest { tx.commit(); } - checkMessages(spi, primarySpi); + if (backup) + checkNoMessages(spi, primarySpi); + else + checkMessages(spi, primarySpi); cache.put(key, 1); @@ -241,7 +247,7 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest { * @param spi Near node SPI. * @param primarySpi Primary node SPI. */ - private void checkMessages(TestCommunicationSpi spi, TestCommunicationSpi primarySpi) { + private void checkMessages(TestRecordingCommunicationSpi spi, TestRecordingCommunicationSpi primarySpi) { List<Object> msgs = spi.recordedMessages(); assertEquals(1, msgs.size()); @@ -257,7 +263,7 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest { * @param spi Near node SPI. * @param primarySpi Primary node SPI. */ - private void checkNoMessages(TestCommunicationSpi spi, TestCommunicationSpi primarySpi) { + private void checkNoMessages(TestRecordingCommunicationSpi spi, TestRecordingCommunicationSpi primarySpi) { List<Object> msgs = spi.recordedMessages(); assertEquals(0, msgs.size()); @@ -306,52 +312,4 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest { return ccfg; } - - /** - * - */ - private static class TestCommunicationSpi extends TcpCommunicationSpi { - /** */ - private Class<?> recordCls; - - /** */ - private List<Object> recordedMsgs = new ArrayList<>(); - - /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) - throws IgniteSpiException { - if (msg instanceof GridIoMessage) { - Object msg0 = ((GridIoMessage)msg).message(); - - synchronized (this) { - if (recordCls != null && msg0.getClass().equals(recordCls)) - recordedMsgs.add(msg0); - } - } - - super.sendMessage(node, msg, ackC); - } - - /** - * @param recordCls Message class to record. - */ - void record(@Nullable Class<?> recordCls) { - synchronized (this) { - this.recordCls = recordCls; - } - } - - /** - * @return Recorded messages. - */ - List<Object> recordedMessages() { - synchronized (this) { - List<Object> msgs = recordedMsgs; - - recordedMsgs = new ArrayList<>(); - - return msgs; - } - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java index 68cac17..94613db 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java @@ -300,6 +300,7 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest { throws CacheLoaderException { } + /** {@inheritDoc} */ @Override public void sessionEnd(boolean commit) throws CacheWriterException { evts.offer("sessionEnd " + commit); } http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java index fd94150..0666349 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java @@ -17,25 +17,17 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.util.Collection; -import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteInClosure; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.spi.IgniteSpiException; -import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -79,7 +71,11 @@ public class GridCacheDhtPreloadMessageCountTest extends GridCommonAbstractTest c.setDiscoverySpi(disco); c.setCacheConfiguration(cc); - c.setCommunicationSpi(new TestCommunicationSpi()); + TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); + + commSpi.record(GridDhtPartitionsSingleMessage.class); + + c.setCommunicationSpi(commSpi); return c; } @@ -110,11 +106,13 @@ public class GridCacheDhtPreloadMessageCountTest extends GridCommonAbstractTest IgniteCache<String, Integer> c1 = g1.cache(null); IgniteCache<String, Integer> c2 = g2.cache(null); - TestCommunicationSpi spi0 = (TestCommunicationSpi)g0.configuration().getCommunicationSpi(); - TestCommunicationSpi spi1 = (TestCommunicationSpi)g1.configuration().getCommunicationSpi(); - TestCommunicationSpi spi2 = (TestCommunicationSpi)g2.configuration().getCommunicationSpi(); + TestRecordingCommunicationSpi spi0 = (TestRecordingCommunicationSpi)g0.configuration().getCommunicationSpi(); + TestRecordingCommunicationSpi spi1 = (TestRecordingCommunicationSpi)g1.configuration().getCommunicationSpi(); + TestRecordingCommunicationSpi spi2 = (TestRecordingCommunicationSpi)g2.configuration().getCommunicationSpi(); - info(spi0.sentMessages().size() + " " + spi1.sentMessages().size() + " " + spi2.sentMessages().size()); + info(spi0.recordedMessages().size() + " " + + spi1.recordedMessages().size() + " " + + spi2.recordedMessages().size()); checkCache(c0, cnt); checkCache(c1, cnt); @@ -137,40 +135,4 @@ public class GridCacheDhtPreloadMessageCountTest extends GridCommonAbstractTest assertEquals(Integer.valueOf(i), c.localPeek(key, CachePeekMode.ONHEAP)); } } - - /** - * Communication SPI that will count single partition update messages. - */ - private static class TestCommunicationSpi extends TcpCommunicationSpi { - /** Recorded messages. */ - private Collection<GridDhtPartitionsSingleMessage> sentMsgs = new ConcurrentLinkedQueue<>(); - - /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) - throws IgniteSpiException { - recordMessage((GridIoMessage)msg); - - super.sendMessage(node, msg, ackClosure); - } - - /** - * @return Collection of sent messages. - */ - public Collection<GridDhtPartitionsSingleMessage> sentMessages() { - return sentMsgs; - } - - /** - * Adds message to a list if message is of correct type. - * - * @param msg Message. - */ - private void recordMessage(GridIoMessage msg) { - if (msg.message() instanceof GridDhtPartitionsSingleMessage) { - GridDhtPartitionsSingleMessage partSingleMsg = (GridDhtPartitionsSingleMessage)msg.message(); - - sentMsgs.add(partSingleMsg); - } - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java index 7bd845a..3e6a245 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheGetStoreErrorSelfTest.java @@ -107,7 +107,11 @@ public class GridCacheGetStoreErrorSelfTest extends GridCommonAbstractTest { checkGetError(false, LOCAL); } - /** @throws Exception If failed. */ + /** + * @param nearEnabled Near cache flag. + * @param cacheMode Cache mode. + * @throws Exception If failed. + */ private void checkGetError(boolean nearEnabled, CacheMode cacheMode) throws Exception { this.nearEnabled = nearEnabled; this.cacheMode = cacheMode; @@ -147,14 +151,17 @@ public class GridCacheGetStoreErrorSelfTest extends GridCommonAbstractTest { */ @SuppressWarnings("PublicInnerClass") public static class TestStore extends CacheStoreAdapter<Object, Object> { + /** {@inheritDoc} */ @Override public Object load(Object key) { throw new IgniteException("Failed to get key from store: " + key); } + /** {@inheritDoc} */ @Override public void write(Cache.Entry<?, ?> entry) { // No-op. } + /** {@inheritDoc} */ @Override public void delete(Object key) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java index 684d6e4..f32a5f7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNodeRestartTest.java @@ -23,7 +23,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.processors.cache.distributed.GridCacheAbstractNodeRestartSelfTest; import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; /** @@ -46,7 +46,7 @@ public class GridCachePartitionedNodeRestartTest extends GridCacheAbstractNodeRe cc.setName(CACHE_NAME); cc.setAtomicityMode(atomicityMode()); cc.setCacheMode(PARTITIONED); - cc.setWriteSynchronizationMode(FULL_ASYNC); + cc.setWriteSynchronizationMode(FULL_SYNC); cc.setNearConfiguration(null); cc.setStartSize(20); cc.setRebalanceMode(rebalancMode); http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java index a458aa7..ab7caad 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedOptimisticTxNodeRestartTest.java @@ -25,7 +25,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheAbstract import org.apache.ignite.transactions.TransactionConcurrency; import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; /** @@ -53,7 +53,7 @@ public class GridCachePartitionedOptimisticTxNodeRestartTest extends GridCacheAb cc.setName(CACHE_NAME); cc.setCacheMode(PARTITIONED); - cc.setWriteSynchronizationMode(FULL_ASYNC); + cc.setWriteSynchronizationMode(FULL_SYNC); cc.setStartSize(20); cc.setRebalanceMode(rebalancMode); cc.setRebalanceBatchSize(rebalancBatchSize); http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java index de87e99..0513786 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicPutAllFailov import org.apache.ignite.internal.processors.cache.IgniteCachePutAllRestartTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteBinaryMetadataUpdateNodeRestartTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicNodeRestartTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheGetRestartTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheAtomicReplicatedNodeRestartSelfTest; /** @@ -45,6 +46,8 @@ public class IgniteCacheRestartTestSuite2 extends TestSuite { suite.addTestSuite(IgniteBinaryMetadataUpdateNodeRestartTest.class); + suite.addTestSuite(IgniteCacheGetRestartTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 04d0881..68e52df 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -84,6 +84,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHan import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheReadFromBackupTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSingleGetMessageTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtTxPreloadSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheLockFailoverSelfTest; @@ -289,6 +290,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(CacheGetFutureHangsSelfTest.class); suite.addTestSuite(IgniteCacheSingleGetMessageTest.class); + suite.addTestSuite(IgniteCacheReadFromBackupTest.class); suite.addTestSuite(IgniteCacheGetCustomCollectionsSelfTest.class); suite.addTestSuite(IgniteCacheLoadRebalanceEvictionSelfTest.class);