http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java new file mode 100644 index 0000000..37689f8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Test cases for partitioned cache {@link GridDhtPreloader preloader}. + * + * Forum example <a + * href="http://www.gridgainsystems.com/jiveforums/thread.jspa?threadID=1449"> + * http://www.gridgainsystems.com/jiveforums/thread.jspa?threadID=1449</a> + */ +public class GridCacheDhtPreloadMessageCountTest extends GridCommonAbstractTest { + /** Key count. */ + private static final int KEY_CNT = 1000; + + /** Preload mode. */ + private GridCachePreloadMode preloadMode = GridCachePreloadMode.SYNC; + + /** IP finder. */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + assert preloadMode != null; + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setPreloadMode(preloadMode); + cc.setAffinity(new GridCacheConsistentHashAffinityFunction(false, 521)); + cc.setBackups(1); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + disco.setMaxMissedHeartbeats(Integer.MAX_VALUE); + + c.setDiscoverySpi(disco); + c.setCacheConfiguration(cc); + + c.setCommunicationSpi(new TestCommunicationSpi()); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testAutomaticPreload() throws Exception { + Ignite g0 = startGrid(0); + + int cnt = KEY_CNT; + + GridCache<String, Integer> c0 = g0.cache(null); + + for (int i = 0; i < cnt; i++) + c0.put(Integer.toString(i), i); + + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + + U.sleep(1000); + + GridCache<String, Integer> c1 = g1.cache(null); + GridCache<String, Integer> c2 = g2.cache(null); + + TestCommunicationSpi spi0 = (TestCommunicationSpi)g0.configuration().getCommunicationSpi(); + TestCommunicationSpi spi1 = (TestCommunicationSpi)g1.configuration().getCommunicationSpi(); + TestCommunicationSpi spi2 = (TestCommunicationSpi)g2.configuration().getCommunicationSpi(); + + info(spi0.sentMessages().size() + " " + spi1.sentMessages().size() + " " + spi2.sentMessages().size()); + + checkCache(c0, cnt); + checkCache(c1, cnt); + checkCache(c2, cnt); + } + + /** + * Checks if keys are present. + * + * @param c Cache. + * @param keyCnt Key count. + */ + private void checkCache(GridCache<String, Integer> c, int keyCnt) { + Ignite g = c.gridProjection().ignite(); + + for (int i = 0; i < keyCnt; i++) { + String key = Integer.toString(i); + + if (c.affinity().isPrimaryOrBackup(g.cluster().localNode(), key)) + assertEquals(Integer.valueOf(i), c.peek(key)); + } + } + + /** + * Communication SPI that will count single partition update messages. + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** Recorded messages. */ + private Collection<GridDhtPartitionsSingleMessage> sentMsgs = new ConcurrentLinkedQueue<>(); + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg) + throws IgniteSpiException { + recordMessage((GridIoMessage)msg); + + super.sendMessage(node, msg); + } + + /** + * @return Collection of sent messages. + */ + public Collection<GridDhtPartitionsSingleMessage> sentMessages() { + return sentMsgs; + } + + /** + * Adds message to a list if message is of correct type. + * + * @param msg Message. + */ + private void recordMessage(GridIoMessage msg) { + if (msg.message() instanceof GridDhtPartitionsSingleMessage) { + GridDhtPartitionsSingleMessage partSingleMsg = (GridDhtPartitionsSingleMessage)msg.message(); + + sentMsgs.add(partSingleMsg); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMultiThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMultiThreadedSelfTest.java new file mode 100644 index 0000000..2cb3e6f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMultiThreadedSelfTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.concurrent.*; + +/** + * MultiThreaded load test for DHT preloader. + */ +public class GridCacheDhtPreloadMultiThreadedSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** + * Creates new test. + */ + public GridCacheDhtPreloadMultiThreadedSelfTest() { + super(false); + } + + /** + * @throws Exception If failed. + */ + public void testNodeLeaveBeforePreloadingComplete() throws Exception { + try { + final CountDownLatch startLatch = new CountDownLatch(1); + + final CountDownLatch stopLatch = new CountDownLatch(1); + + GridTestUtils.runMultiThreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + Ignite g = startGrid("first"); + + g.events().localListen( + new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + stopLatch.countDown(); + + return true; + } + }, + IgniteEventType.EVT_NODE_JOINED); + + startLatch.countDown(); + + stopLatch.await(); + + G.stop(g.name(), false); + + return null; + } + }, + 1, + "first" + ); + + GridTestUtils.runMultiThreaded( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + startLatch.await(); + + startGrid("second"); + + return null; + } + }, + 1, + "second" + ); + } + finally { + // Intentionally used this method. See startGrid(String, String). + G.stopAll(false); + } + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentNodesStart() throws Exception { + try { + multithreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + IgniteConfiguration cfg = loadConfiguration("modules/core/src/test/config/spring-multicache.xml"); + + startGrid(Thread.currentThread().getName(), cfg); + + return null; + } + }, + 4, + "starter" + ).get(); + } + finally { + G.stopAll(true); + } + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentNodesStartStop() throws Exception { + try { + multithreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + String gridName = "grid-" + Thread.currentThread().getName(); + + startGrid(gridName, "modules/core/src/test/config/example-cache.xml"); + + // Immediately stop the grid. + stopGrid(gridName); + + return null; + } + }, + 6, + "tester" + ).get(); + } + finally { + G.stopAll(true); + } + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = loadConfiguration("modules/core/src/test/config/spring-multicache.xml"); + + cfg.setGridName(gridName); + + for (CacheConfiguration cCfg : cfg.getCacheConfiguration()) { + if (cCfg.getCacheMode() == GridCacheMode.PARTITIONED) { + cCfg.setAffinity(new GridCacheConsistentHashAffinityFunction(2048, null)); + cCfg.setBackups(1); + } + } + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadOffHeapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadOffHeapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadOffHeapSelfTest.java new file mode 100644 index 0000000..c1a9663 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadOffHeapSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; + +import static org.apache.ignite.cache.GridCacheMemoryMode.*; + +/** + * Test cases for partitioned cache {@link GridDhtPreloader preloader} with off-heap value storage. + */ +public class GridCacheDhtPreloadOffHeapSelfTest extends GridCacheDhtPreloadSelfTest { + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) { + CacheConfiguration cacheCfg = super.cacheConfiguration(gridName); + + cacheCfg.setQueryIndexEnabled(false); + cacheCfg.setMemoryMode(OFFHEAP_VALUES); + cacheCfg.setOffHeapMaxMemory(0); + + return cacheCfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPutGetSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPutGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPutGetSelfTest.java new file mode 100644 index 0000000..fab89af --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPutGetSelfTest.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Test cases for partitioned cache {@link GridDhtPreloader preloader}. + * + * Forum example <a + * href="http://www.gridgainsystems.com/jiveforums/thread.jspa?threadID=1449"> + * http://www.gridgainsystems.com/jiveforums/thread.jspa?threadID=1449</a> + */ +public class GridCacheDhtPreloadPutGetSelfTest extends GridCommonAbstractTest { + /** Key count. */ + private static final int KEY_CNT = 1000; + + /** Iterations count. */ + private static final int ITER_CNT = 10; + + /** Frequency. */ + private static final int FREQUENCY = 100; + + /** Number of key backups. Each test method can set this value as required. */ + private int backups; + + /** Preload mode. */ + private GridCachePreloadMode preloadMode; + + /** IP finder. */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + assert preloadMode != null; + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setPreloadMode(preloadMode); + cacheCfg.setBackups(backups); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testPutGetAsync0() throws Exception { + preloadMode = ASYNC; + backups = 0; + + performTest(); + } + + /** + * @throws Exception If failed. + */ + public void testPutGetAsync1() throws Exception { + preloadMode = ASYNC; + backups = 1; + + performTest(); + } + + /** + * @throws Exception If failed. + */ + public void testPutGetAsync2() throws Exception { + preloadMode = ASYNC; + backups = 2; + + performTest(); + } + + /** + * @throws Exception If failed. + */ + public void testPutGetSync0() throws Exception { + preloadMode = SYNC; + backups = 0; + + performTest(); + } + + /** + * @throws Exception If failed. + */ + public void testPutGetSync1() throws Exception { + preloadMode = SYNC; + backups = 1; + + performTest(); + } + + /** + * @throws Exception If failed. + */ + public void testPutGetSync2() throws Exception { + preloadMode = SYNC; + backups = 2; + + performTest(); + } + + /** + * @throws Exception If failed. + */ + public void testPutGetNone0() throws Exception { + preloadMode = NONE; + backups = 0; + + performTest(); + } + + /** + * @throws Exception If failed. + */ + public void testPutGetNone1() throws Exception { + preloadMode = NONE; + backups = 1; + + performTest(); + } + + /** + * @throws Exception If failed. + */ + public void testPutGetNone2() throws Exception { + preloadMode = NONE; + backups = 2; + + performTest(); + } + + /** + * @throws Exception If test fails. + */ + private void performTest() throws Exception { + try { + final CountDownLatch writeLatch = new CountDownLatch(1); + + final CountDownLatch readLatch = new CountDownLatch(1); + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteFuture fut1 = GridTestUtils.runMultiThreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + Ignite g2 = startGrid(2); + + for (int i = 0; i < ITER_CNT; i++) { + info("Iteration # " + i); + + GridCache<Integer, Integer> cache = g2.cache(null); + + for (int j = 0; j < KEY_CNT; j++) { + GridCacheEntry<Integer, Integer> entry = cache.entry(j); + + assert entry != null; + + Integer val = entry.getValue(); + + if (j % FREQUENCY == 0) + info("Read entry: " + entry.getKey() + " -> " + val); + + if (done.get()) + assert val != null && val == j; + } + + writeLatch.countDown(); + + readLatch.await(); + } + + return null; + } + }, + 1, + "reader" + ); + + IgniteFuture fut2 = GridTestUtils.runMultiThreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + writeLatch.await(); + + Ignite g1 = startGrid(1); + + GridCache<Integer, Integer> cache = g1.cache(null); + + for (int j = 0; j < KEY_CNT; j++) { + cache.put(j, j); + + if (j % FREQUENCY == 0) + info("Stored value in cache: " + j); + } + + done.set(true); + + for (int j = 0; j < KEY_CNT; j++) { + GridCacheEntry<Integer, Integer> entry = cache.entry(j); + + assert entry != null; + + Integer val = entry.getValue(); + + if (j % FREQUENCY == 0) + info("Read entry: " + entry.getKey() + " -> " + val); + + assert val != null && val == j; + } + + if (backups > 0) + stopGrid(1); + + readLatch.countDown(); + + return null; + } + }, + 1, + "writer" + ); + + fut1.get(); + fut2.get(); + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java new file mode 100644 index 0000000..157a827 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadSelfTest.java @@ -0,0 +1,674 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.configuration.IgniteDeploymentMode.*; +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.cache.CacheConfiguration.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; + +/** + * Test cases for partitioned cache {@link GridDhtPreloader preloader}. + */ +public class GridCacheDhtPreloadSelfTest extends GridCommonAbstractTest { + /** Flag to print preloading events. */ + private static final boolean DEBUG = false; + + /** */ + private static final long TEST_TIMEOUT = 5 * 60 * 1000; + + /** Default backups. */ + private static final int DFLT_BACKUPS = 1; + + /** Partitions. */ + private static final int DFLT_PARTITIONS = 521; + + /** Preload batch size. */ + private static final int DFLT_BATCH_SIZE = DFLT_PRELOAD_BATCH_SIZE; + + /** Number of key backups. Each test method can set this value as required. */ + private int backups = DFLT_BACKUPS; + + /** Preload mode. */ + private GridCachePreloadMode preloadMode = ASYNC; + + /** */ + private int preloadBatchSize = DFLT_BATCH_SIZE; + + /** Number of partitions. */ + private int partitions = DFLT_PARTITIONS; + + /** IP finder. */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * + */ + public GridCacheDhtPreloadSelfTest() { + super(false /*start grid. */); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + cfg.setCacheConfiguration(cacheConfiguration(gridName)); + cfg.setDeploymentMode(CONTINUOUS); + + return cfg; + } + + /** + * Gets cache configuration for grid with given name. + * + * @param gridName Grid name. + * @return Cache configuration. + */ + protected CacheConfiguration cacheConfiguration(String gridName) { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setPreloadBatchSize(preloadBatchSize); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setPreloadMode(preloadMode); + cacheCfg.setAffinity(new GridCacheConsistentHashAffinityFunction(false, partitions)); + cacheCfg.setBackups(backups); + + return cacheCfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { +// resetLog4j(Level.DEBUG, true, +// // Categories. +// GridDhtPreloader.class.getPackage().getName(), +// GridDhtPartitionTopologyImpl.class.getName(), +// GridDhtLocalPartition.class.getName()); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + backups = DFLT_BACKUPS; + partitions = DFLT_PARTITIONS; + preloadMode = ASYNC; + preloadBatchSize = DFLT_BATCH_SIZE; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** + * @param cache Cache. + * @return Affinity. + */ + @SuppressWarnings({"unchecked"}) + private GridCacheAffinity<Integer> affinity(GridCache<Integer, ?> cache) { + return cache.affinity(); + } + + /** + * @param c Cache. + * @return {@code True} if synchronous preloading. + */ + private boolean isSync(GridCache<?, ?> c) { + return c.configuration().getPreloadMode() == SYNC; + } + + /** + * @throws Exception If failed. + */ + public void testActivePartitionTransferSyncSameCoordinator() throws Exception { + preloadMode = SYNC; + + checkActivePartitionTransfer(1000, 4, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testActivePartitionTransferAsyncSameCoordinator() throws Exception { + checkActivePartitionTransfer(1000, 4, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testActivePartitionTransferSyncChangingCoordinator() throws Exception { + preloadMode = SYNC; + + checkActivePartitionTransfer(1000, 4, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testActivePartitionTransferAsyncChangingCoordinator() throws Exception { + checkActivePartitionTransfer(1000, 4, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testActivePartitionTransferSyncRandomCoordinator() throws Exception { + preloadMode = SYNC; + + checkActivePartitionTransfer(1000, 4, false, true); + } + + /** + * @throws Exception If failed. + */ + public void testActivePartitionTransferAsyncRandomCoordinator() throws Exception { + checkActivePartitionTransfer(1000, 4, false, true); + } + + /** + * @param keyCnt Key count. + * @param nodeCnt Node count. + * @param sameCoord Same coordinator flag. + * @param shuffle Shuffle flag. + * @throws Exception If failed. + */ + private void checkActivePartitionTransfer(int keyCnt, int nodeCnt, boolean sameCoord, boolean shuffle) + throws Exception { +// resetLog4j(Level.DEBUG, true, +// // Categories. +// GridDhtPreloader.class.getPackage().getName(), +// GridDhtPartitionTopologyImpl.class.getName(), +// GridDhtLocalPartition.class.getName()); + + try { + Ignite ignite1 = startGrid(0); + + GridCache<Integer, String> cache1 = ignite1.cache(null); + + putKeys(cache1, keyCnt); + checkKeys(cache1, keyCnt, F.asList(ignite1)); + + List<Ignite> ignites = new ArrayList<>(nodeCnt + 1); + + startGrids(nodeCnt, 1, ignites); + + // Check all nodes. + for (Ignite g : ignites) { + GridCache<Integer, String> c = g.cache(null); + + checkKeys(c, keyCnt, ignites); + } + + if (shuffle) + Collections.shuffle(ignites); + + if (sameCoord) + // Add last. + ignites.add(ignite1); + else + // Add first. + ignites.add(0, ignite1); + + if (!sameCoord && shuffle) + Collections.shuffle(ignites); + + checkActiveState(ignites); + + info(">>> Finished checking nodes [keyCnt=" + keyCnt + ", nodeCnt=" + nodeCnt + ", grids=" + + U.grids2names(ignites) + ']'); + + Collection<IgniteFuture<?>> futs = new LinkedList<>(); + + Ignite last = F.last(ignites); + + for (Iterator<Ignite> it = ignites.iterator(); it.hasNext(); ) { + Ignite g = it.next(); + + if (!it.hasNext()) { + assert last == g; + + break; + } + + checkActiveState(ignites); + + final UUID nodeId = g.cluster().localNode().id(); + + it.remove(); + + futs.add(waitForLocalEvent(last.events(), new P1<IgniteEvent>() { + @Override public boolean apply(IgniteEvent e) { + IgniteCachePreloadingEvent evt = (IgniteCachePreloadingEvent)e; + + ClusterNode node = evt.discoveryNode(); + + return evt.type() == EVT_CACHE_PRELOAD_STOPPED && node.id().equals(nodeId) && + evt.discoveryEventType() == EVT_NODE_LEFT; + } + }, EVT_CACHE_PRELOAD_STOPPED)); + + info("Before grid stop [name=" + g.name() + ", fullTop=" + top2string(ignites)); + + stopGrid(g.name()); + + info("After grid stop [name=" + g.name() + ", fullTop=" + top2string(ignites)); + + // Check all left nodes. + checkActiveState(ignites); + } + + info("Waiting for preload futures: " + F.view(futs, F.unfinishedFutures())); + + X.waitAll(futs); + + info("Finished waiting for preload futures."); + + assert last != null; + + GridCache<Integer, String> lastCache = last.cache(null); + + GridDhtCacheAdapter<Integer, String> dht = dht(lastCache); + + GridCacheAffinity<Integer> aff = affinity(lastCache); + + info("Finished waiting for all exchange futures..."); + + for (int i = 0; i < keyCnt; i++) { + if (aff.mapPartitionToPrimaryAndBackups(aff.partition(i)).contains(last.cluster().localNode())) { + GridDhtPartitionTopology<Integer, String> top = dht.topology(); + + for (GridDhtLocalPartition<Integer, String> p : top.localPartitions()) { + Collection<ClusterNode> moving = top.moving(p.id()); + + assert moving.isEmpty() : "Nodes with partition in moving state [part=" + p + + ", moving=" + moving + ']'; + + assert OWNING == p.state() : "Invalid partition state for partition [part=" + p + ", map=" + + top.partitionMap(false) + ']'; + } + } + } + + checkActiveState(ignites); + } + catch (Error | Exception e) { + error("Test failed.", e); + + throw e; + } finally { + stopAllGrids(); + } + } + + /** + * @param grids Grids. + */ + private void checkActiveState(Iterable<Ignite> grids) { + // Check that nodes don't have non-active information about other nodes. + for (Ignite g : grids) { + GridCache<Integer, String> c = g.cache(null); + + GridDhtCacheAdapter<Integer, String> dht = dht(c); + + GridDhtPartitionFullMap allParts = dht.topology().partitionMap(false); + + for (GridDhtPartitionMap parts : allParts.values()) { + if (!parts.nodeId().equals(g.cluster().localNode().id())) { + for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet()) { + int p = e.getKey(); + + GridDhtPartitionState state = e.getValue(); + + assert state == OWNING || state == MOVING || state == RENTING : + "Invalid state [grid=" + g.name() + ", part=" + p + ", state=" + state + + ", parts=" + parts + ']'; + + assert state.active(); + } + } + } + } + } + + /** + * @throws Exception If failed. + */ + public void testMultiplePartitionBatchesSyncPreload() throws Exception { + preloadMode = SYNC; + preloadBatchSize = 100; + partitions = 2; + + checkNodes(1000, 1, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testMultiplePartitionBatchesAsyncPreload() throws Exception { + preloadBatchSize = 100; + partitions = 2; + + checkNodes(1000, 1, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testMultipleNodesSyncPreloadSameCoordinator() throws Exception { + preloadMode = SYNC; + + checkNodes(1000, 4, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testMultipleNodesAsyncPreloadSameCoordinator() throws Exception { + checkNodes(1000, 4, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testMultipleNodesSyncPreloadChangingCoordinator() throws Exception { + preloadMode = SYNC; + + checkNodes(1000, 4, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testMultipleNodesAsyncPreloadChangingCoordinator() throws Exception { + checkNodes(1000, 4, false, false); + } + + + /** + * @throws Exception If failed. + */ + public void testMultipleNodesSyncPreloadRandomCoordinator() throws Exception { + preloadMode = SYNC; + + checkNodes(1000, 4, false, true); + } + + /** + * @throws Exception If failed. + */ + public void testMultipleNodesAsyncPreloadRandomCoordinator() throws Exception { + checkNodes(1000, 4, false, true); + } + + /** + * @param cnt Number of grids. + * @param startIdx Start node index. + * @param list List of started grids. + * @throws Exception If failed. + */ + private void startGrids(int cnt, int startIdx, Collection<Ignite> list) throws Exception { + for (int i = 0; i < cnt; i++) { + final Ignite g = startGrid(startIdx++); + + if (DEBUG) + g.events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + info("\n>>> Preload event [grid=" + g.name() + ", evt=" + evt + ']'); + + return true; + } + }, EVTS_CACHE_PRELOAD); + + list.add(g); + } + } + + /** + * @param grids Grids to stop. + */ + private void stopGrids(Iterable<Ignite> grids) { + for (Ignite g : grids) + stopGrid(g.name()); + } + + /** + * @param keyCnt Key count. + * @param nodeCnt Node count. + * @param sameCoord Same coordinator flag. + * @param shuffle Shuffle flag. + * @throws Exception If failed. + */ + private void checkNodes(int keyCnt, int nodeCnt, boolean sameCoord, boolean shuffle) + throws Exception { +// resetLog4j(Level.DEBUG, true, +// // Categories. +// GridDhtPreloader.class.getPackage().getName(), +// GridDhtPartitionTopologyImpl.class.getName(), +// GridDhtLocalPartition.class.getName()); + + try { + Ignite ignite1 = startGrid(0); + + GridCache<Integer, String> cache1 = ignite1.cache(null); + + putKeys(cache1, keyCnt); + checkKeys(cache1, keyCnt, F.asList(ignite1)); + + List<Ignite> ignites = new ArrayList<>(nodeCnt + 1); + + startGrids(nodeCnt, 1, ignites); + + // Check all nodes. + for (Ignite g : ignites) { + GridCache<Integer, String> c = g.cache(null); + + checkKeys(c, keyCnt, ignites); + } + + if (shuffle) + Collections.shuffle(ignites); + + if (sameCoord) + // Add last. + ignites.add(ignite1); + else + // Add first. + ignites.add(0, ignite1); + + if (!sameCoord && shuffle) + Collections.shuffle(ignites); + + info(">>> Finished checking nodes [keyCnt=" + keyCnt + ", nodeCnt=" + nodeCnt + ", grids=" + + U.grids2names(ignites) + ']'); + + Ignite last = null; + + for (Iterator<Ignite> it = ignites.iterator(); it.hasNext(); ) { + Ignite g = it.next(); + + if (!it.hasNext()) { + last = g; + + break; + } + + final UUID nodeId = g.cluster().localNode().id(); + + it.remove(); + + Collection<IgniteFuture<?>> futs = new LinkedList<>(); + + for (Ignite gg : ignites) + futs.add(waitForLocalEvent(gg.events(), new P1<IgniteEvent>() { + @Override public boolean apply(IgniteEvent e) { + IgniteCachePreloadingEvent evt = (IgniteCachePreloadingEvent)e; + + ClusterNode node = evt.discoveryNode(); + + return evt.type() == EVT_CACHE_PRELOAD_STOPPED && node.id().equals(nodeId) && + evt.discoveryEventType() == EVT_NODE_LEFT; + } + }, EVT_CACHE_PRELOAD_STOPPED)); + + + info("Before grid stop [name=" + g.name() + ", fullTop=" + top2string(ignites)); + + stopGrid(g.name()); + + info(">>> Waiting for preload futures [leftNode=" + g.name() + ", remaining=" + U.grids2names(ignites) + ']'); + + X.waitAll(futs); + + info("After grid stop [name=" + g.name() + ", fullTop=" + top2string(ignites)); + + // Check all left nodes. + for (Ignite gg : ignites) { + GridCache<Integer, String> c = gg.cache(null); + + checkKeys(c, keyCnt, ignites); + } + } + + assert last != null; + + GridCache<Integer, String> lastCache = last.cache(null); + + GridDhtCacheAdapter<Integer, String> dht = dht(lastCache); + + GridCacheAffinity<Integer> aff = affinity(lastCache); + + for (int i = 0; i < keyCnt; i++) { + if (aff.mapPartitionToPrimaryAndBackups(aff.partition(i)).contains(last.cluster().localNode())) { + GridDhtPartitionTopology<Integer, String> top = dht.topology(); + + for (GridDhtLocalPartition<Integer, String> p : top.localPartitions()) { + Collection<ClusterNode> moving = top.moving(p.id()); + + assert moving.isEmpty() : "Nodes with partition in moving state [part=" + p + + ", moving=" + moving + ']'; + + assert OWNING == p.state() : "Invalid partition state for partition [part=" + p + ", map=" + + top.partitionMap(false) + ']'; + } + } + } + } + catch (Error | Exception e) { + error("Test failed.", e); + + throw e; + } finally { + stopAllGrids(); + } + } + + /** + * @param c Cache. + * @param cnt Key count. + * @throws IgniteCheckedException If failed. + */ + private void putKeys(GridCache<Integer, String> c, int cnt) throws IgniteCheckedException { + for (int i = 0; i < cnt; i++) + c.put(i, Integer.toString(i)); + } + + /** + * @param cache Cache. + * @param cnt Key count. + * @param grids Grids. + * @throws IgniteCheckedException If failed. + */ + private void checkKeys(GridCache<Integer, String> cache, int cnt, Iterable<Ignite> grids) throws IgniteCheckedException { + GridCacheAffinity<Integer> aff = affinity(cache); + + Ignite ignite = cache.gridProjection().ignite(); + + ClusterNode loc = ignite.cluster().localNode(); + + boolean sync = cache.configuration().getPreloadMode() == SYNC; + + for (int i = 0; i < cnt; i++) { + Collection<ClusterNode> nodes = ignite.cluster().nodes(); + + Collection<ClusterNode> affNodes = aff.mapPartitionToPrimaryAndBackups(aff.partition(i)); + + assert !affNodes.isEmpty(); + + if (affNodes.contains(loc)) { + String val = sync ? cache.peek(i) : cache.get(i); + + ClusterNode primaryNode = F.first(affNodes); + + assert primaryNode != null; + + boolean primary = primaryNode.equals(loc); + + assert Integer.toString(i).equals(val) : "Key check failed [grid=" + ignite.name() + + ", cache=" + cache.name() + ", key=" + i + ", expected=" + i + ", actual=" + val + + ", part=" + aff.partition(i) + ", primary=" + primary + ", affNodes=" + U.nodeIds(affNodes) + + ", locId=" + loc.id() + ", allNodes=" + U.nodeIds(nodes) + ", allParts=" + top2string(grids) + ']'; + } + } + } + + /** + * @param grids Grids + * @return String representation of all partitions and their state. + */ + @SuppressWarnings( {"ConstantConditions"}) + private String top2string(Iterable<Ignite> grids) { + Map<String, String> map = new HashMap<>(); + + for (Ignite g : grids) { + GridCache<Integer, String> c = g.cache(null); + + GridDhtCacheAdapter<Integer, String> dht = dht(c); + + GridDhtPartitionFullMap fullMap = dht.topology().partitionMap(false); + + map.put(g.name(), DEBUG ? fullMap.toFullString() : fullMap.toString()); + } + + return "Grid partition maps [" + map.toString() + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadStartStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadStartStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadStartStopSelfTest.java new file mode 100644 index 0000000..15b0145 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadStartStopSelfTest.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.configuration.IgniteDeploymentMode.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheConfiguration.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; + +/** + * Test cases for partitioned cache {@link GridDhtPreloader preloader}. + */ +public class GridCacheDhtPreloadStartStopSelfTest extends GridCommonAbstractTest { + /** */ + private static final long TEST_TIMEOUT = 5 * 60 * 1000; + + /** Default backups. */ + private static final int DFLT_BACKUPS = 1; + + /** Partitions. */ + private static final int DFLT_PARTITIONS = 521; + + /** Preload batch size. */ + private static final int DFLT_BATCH_SIZE = DFLT_PRELOAD_BATCH_SIZE; + + /** Default cache count. */ + private static final int DFLT_CACHE_CNT = 10; + + /** Number of key backups. Each test method can set this value as required. */ + private int backups = DFLT_BACKUPS; + + /** Preload mode. */ + private GridCachePreloadMode preloadMode = ASYNC; + + /** */ + private int preloadBatchSize = DFLT_BATCH_SIZE; + + /** Number of partitions. */ + private int partitions = DFLT_PARTITIONS; + + /** */ + private int cacheCnt = DFLT_CACHE_CNT; + + /** IP finder. */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * + */ + public GridCacheDhtPreloadStartStopSelfTest() { + super(false /*start grid. */); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration[] cacheCfgs = new CacheConfiguration[cacheCnt]; + + for (int i = 0; i < cacheCnt; i++) { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName("partitioned-" + i); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setPreloadBatchSize(preloadBatchSize); + cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setPreloadMode(preloadMode); + cacheCfg.setAffinity(new GridCacheConsistentHashAffinityFunction(false, partitions)); + cacheCfg.setBackups(backups); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + cacheCfgs[i] = cacheCfg; + } + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + cfg.setCacheConfiguration(cacheCfgs); + cfg.setDeploymentMode(CONTINUOUS); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + backups = DFLT_BACKUPS; + partitions = DFLT_PARTITIONS; + preloadMode = ASYNC; + preloadBatchSize = DFLT_BATCH_SIZE; + cacheCnt = DFLT_CACHE_CNT; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** + * @param cache Cache. + * @return Affinity. + */ + private GridCacheAffinity<Integer> affinity(GridCache<Integer, ?> cache) { + return cache.affinity(); + } + + /** + * @param c Cache. + * @return {@code True} if synchronoous preloading. + */ + private boolean isSync(GridCache<?, ?> c) { + return c.configuration().getPreloadMode() == SYNC; + } + + /** + * @param cnt Number of grids. + * @param startIdx Start node index. + * @param list List of started grids. + * @throws Exception If failed. + */ + private void startGrids(int cnt, int startIdx, Collection<Ignite> list) throws Exception { + for (int i = 0; i < cnt; i++) + list.add(startGrid(startIdx++)); + } + + /** @param grids Grids to stop. */ + private void stopGrids(Iterable<Ignite> grids) { + for (Ignite g : grids) + stopGrid(g.name()); + } + + /** @throws Exception If failed. */ + public void testDeadlock() throws Exception { + info("Testing deadlock..."); + + Collection<Ignite> ignites = new LinkedList<>(); + + int gridCnt = 3; + + startGrids(gridCnt, 1, ignites); + + info("Grids started: " + gridCnt); + + stopGrids(ignites); + } + + /** + * @param keyCnt Key count. + * @param nodeCnt Node count. + * @throws Exception If failed. + */ + private void checkNodes(int keyCnt, int nodeCnt) throws Exception { + try { + Ignite g1 = startGrid(0); + + GridCache<Integer, String> c1 = g1.cache(null); + + putKeys(c1, keyCnt); + checkKeys(c1, keyCnt); + + Collection<Ignite> ignites = new LinkedList<>(); + + startGrids(nodeCnt, 1, ignites); + + // Check all nodes. + for (Ignite g : ignites) { + GridCache<Integer, String> c = g.cache(null); + + checkKeys(c, keyCnt); + } + + info(">>> Finished checking nodes [keyCnt=" + keyCnt + ", nodeCnt=" + nodeCnt + ']'); + + stopGrids(ignites); + + GridDhtCacheAdapter<Integer, String> dht = dht(c1); + + info(">>> Waiting for preload futures..."); + + GridCachePartitionExchangeManager<Object, Object> exchMgr + = ((GridKernal)g1).context().cache().context().exchange(); + + // Wait for exchanges to complete. + for (IgniteFuture<?> fut : exchMgr.exchangeFutures()) + fut.get(); + + GridCacheAffinity<Integer> aff = affinity(c1); + + for (int i = 0; i < keyCnt; i++) { + if (aff.mapPartitionToPrimaryAndBackups(aff.partition(i)).contains(g1.cluster().localNode())) { + GridDhtPartitionTopology<Integer, String> top = dht.topology(); + + for (GridDhtLocalPartition<Integer, String> p : top.localPartitions()) + assertEquals("Invalid partition state for partition: " + p, OWNING, p.state()); + } + } + } + finally { + stopAllGrids(); + } + } + + /** + * @param c Cache. + * @param cnt Key count. + * @throws IgniteCheckedException If failed. + */ + private void putKeys(GridCache<Integer, String> c, int cnt) throws IgniteCheckedException { + for (int i = 0; i < cnt; i++) + c.put(i, Integer.toString(i)); + } + + /** + * @param c Cache. + * @param cnt Key count. + * @throws IgniteCheckedException If failed. + */ + private void checkKeys(GridCache<Integer, String> c, int cnt) throws IgniteCheckedException { + GridCacheAffinity<Integer> aff = affinity(c); + + boolean sync = isSync(c); + + Ignite ignite = c.gridProjection().ignite(); + + for (int i = 0; i < cnt; i++) { + if (aff.mapPartitionToPrimaryAndBackups(aff.partition(i)).contains(ignite.cluster().localNode())) { + String val = sync ? c.peek(i) : c.get(i); + + assertEquals("Key check failed [grid=" + ignite.name() + ", cache=" + c.name() + ", key=" + i + ']', + Integer.toString(i), val); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadUnloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadUnloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadUnloadSelfTest.java new file mode 100644 index 0000000..a9732e8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadUnloadSelfTest.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.testframework.junits.common.*; + +import static org.apache.ignite.configuration.IgniteDeploymentMode.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheConfiguration.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; + +/** + * Test large cache counts. + */ +@SuppressWarnings({"BusyWait"}) +public class GridCacheDhtPreloadUnloadSelfTest extends GridCommonAbstractTest { + /** Default backups. */ + private static final int DFLT_BACKUPS = 1; + + /** Partitions. */ + private static final int DFLT_PARTITIONS = 521; + + /** Preload batch size. */ + private static final int DFLT_BATCH_SIZE = DFLT_PRELOAD_BATCH_SIZE; + + /** Number of key backups. Each test method can set this value as required. */ + private int backups = DFLT_BACKUPS; + + /** Preload mode. */ + private GridCachePreloadMode preloadMode = ASYNC; + + /** */ + private int preloadBatchSize = DFLT_BATCH_SIZE; + + /** Number of partitions. */ + private int partitions = DFLT_PARTITIONS; + + /** */ + private LifecycleBean lbean; + + /** IP finder. */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Network timeout. */ + private long netTimeout = 1000; + + /** + * + */ + public GridCacheDhtPreloadUnloadSelfTest() { + super(false /*start grid. */); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + cc.setPreloadBatchSize(preloadBatchSize); + cc.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cc.setPreloadMode(preloadMode); + cc.setAffinity(new GridCacheConsistentHashAffinityFunction(false, partitions)); + cc.setBackups(backups); + cc.setAtomicityMode(TRANSACTIONAL); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + if (lbean != null) + c.setLifecycleBeans(lbean); + + c.setDiscoverySpi(disco); + c.setCacheConfiguration(cc); + c.setDeploymentMode(CONTINUOUS); + c.setNetworkTimeout(netTimeout); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + backups = DFLT_BACKUPS; + partitions = DFLT_PARTITIONS; + preloadMode = ASYNC; + preloadBatchSize = DFLT_BATCH_SIZE; + netTimeout = 1000; + } + + /** @throws Exception If failed. */ + public void testUnloadZeroBackupsTwoNodes() throws Exception { + preloadMode = SYNC; + backups = 0; + netTimeout = 500; + + try { + startGrid(0); + + int cnt = 1000; + + populate(grid(0).<Integer, String>cache(null), cnt); + + int gridCnt = 2; + + for (int i = 1; i < gridCnt; i++) + startGrid(i); + + long wait = 3000; + + waitForUnload(gridCnt, cnt, wait); + } + finally { + stopAllGrids(); + } + } + + /** @throws Exception If failed. */ + public void testUnloadOneBackupTwoNodes() throws Exception { + preloadMode = SYNC; + backups = 1; + netTimeout = 500; + + try { + startGrid(0); + + int cnt = 1000; + + populate(grid(0).<Integer, String>cache(null), cnt); + + int gridCnt = 2; + + for (int i = 1; i < gridCnt; i++) + startGrid(i); + + long wait = 2000; + + info("Sleeping for " + wait + "ms"); + + // Unfortunately there is no other way but sleep. + Thread.sleep(wait); + + for (int i = 0; i < gridCnt; i++) + info("Grid size [i=" + i + ", size=" + grid(i).cache(null).size() + ']'); + + for (int i = 0; i < gridCnt; i++) { + GridCache<Integer, String> c = grid(i).cache(null); + + // Nothing should be unloaded since nodes are backing up each other. + assert c.size() == cnt; + } + } + finally { + stopAllGrids(); + } + } + + /** + * + * @param gridCnt Grid count. + * @param cnt Count. + * @param wait Wait. + * @throws InterruptedException If interrupted. + */ + private void waitForUnload(long gridCnt, long cnt, long wait) throws InterruptedException { + info("Waiting for preloading to complete for " + wait + "ms..."); + + long endTime = System.currentTimeMillis() + wait; + + while (System.currentTimeMillis() < endTime) { + boolean err = false; + + for (int i = 0; i < gridCnt; i++) { + GridCache<Integer, String> c = grid(i).cache(null); + + if (c.size() >= cnt) + err = true; + } + + if (!err) + break; + else + Thread.sleep(500); + } + + for (int i = 0; i < gridCnt; i++) + info("Grid size [i=" + i + ", size=" + grid(i).cache(null).size() + ']'); + + for (int i = 0; i < gridCnt; i++) { + GridCache<Integer, String> c = grid(i).cache(null); + + assert c.size() < cnt; + } + } + + /** @throws Exception If failed. */ + public void testUnloadOneBackupThreeNodes() throws Exception { + preloadMode = SYNC; + backups = 1; + netTimeout = 500; + partitions = 23; + + try { + startGrid(0); + + int cnt = 1000; + + populate(grid(0).<Integer, String>cache(null), cnt); + + int gridCnt = 3; + + for (int i = 1; i < gridCnt; i++) { + startGrid(i); + + for (int j = 0; j <= i; j++) + info("Grid size [i=" + i + ", size=" + grid(j).cache(null).size() + ']'); + } + + long wait = 3000; + + waitForUnload(gridCnt, cnt, wait); + } + finally { + stopAllGrids(); + } + } + + /** @throws Exception If failed. */ + public void testUnloadOneBackThreeNodesWithLifeCycleBean() throws Exception { + preloadMode = SYNC; + backups = 1; + + try { + final int cnt = 1000; + + lbean = new LifecycleBean() { + @IgniteInstanceResource + private Ignite ignite; + + @Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteCheckedException { + if (evt == LifecycleEventType.AFTER_GRID_START) { + GridCache<Integer, String> c = ignite.cache(null); + + if (c.putxIfAbsent(-1, "true")) { + populate(ignite.<Integer, String>cache(null), cnt); + + info(">>> POPULATED GRID <<<"); + } + } + } + }; + + int gridCnt = 3; + + for (int i = 0; i < gridCnt; i++) { + startGrid(i); + + for (int j = 0; j < i; j++) + info("Grid size [i=" + i + ", size=" + grid(j).cache(null).size() + ']'); + } + + long wait = 3000; + + waitForUnload(gridCnt, cnt, wait); + } + finally { + lbean = null; + + stopAllGrids(); + } + } + + /** + * @param c Cache. + * @param cnt Key count. + * @throws IgniteCheckedException If failed. + */ + private void populate(GridCache<Integer, String> c, int cnt) throws IgniteCheckedException { + for (int i = 0; i < cnt; i++) + c.put(i, value(1024)); + } + + /** + * @param size Size. + * @return Value. + */ + private String value(int size) { + StringBuilder b = new StringBuilder(size / 2 + 1); + + for (int i = 0; i < size / 3; i++) + b.append('a' + (i % 26)); + + return b.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtRemoveFailureTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtRemoveFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtRemoveFailureTest.java new file mode 100644 index 0000000..d501d96 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtRemoveFailureTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; +import org.gridgain.grid.kernal.processors.cache.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + * Tests that removes are not lost when topology changes. + */ +public class GridCacheDhtRemoveFailureTest extends GridCacheAbstractRemoveFailureTest { + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setDistributionMode(PARTITIONED_ONLY); + cfg.setBackups(1); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java new file mode 100644 index 0000000..4da11a9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTestUtils.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cluster.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.lang.reflect.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; + +/** + * Utility methods for dht preloader testing. + */ +public class GridCacheDhtTestUtils { + /** + * Ensure singleton. + */ + private GridCacheDhtTestUtils() { + // No-op. + } + + /** + * @param cache Cache. + * @return Dht cache. + */ + static <K, V> GridDhtCacheAdapter<K, V> dht(GridCacheProjection<K, V> cache) { + return ((GridNearCacheAdapter<K, V>)cache.<K, V>cache()).dht(); + } + + /** + * @param dht Cache. + * @param keyCnt Number of test keys to put into cache. + * @throws IgniteCheckedException If failed to prepare. + */ + @SuppressWarnings({"UnusedAssignment", "unchecked"}) + static void prepareKeys(GridDhtCache<Integer, String> dht, int keyCnt) throws IgniteCheckedException { + GridCacheAffinityFunction aff = dht.context().config().getAffinity(); + + GridCacheConcurrentMap<Integer, String> cacheMap; + + try { + Field field = GridCacheAdapter.class.getDeclaredField("map"); + + field.setAccessible(true); + + cacheMap = (GridCacheConcurrentMap<Integer, String>)field.get(dht); + } + catch (Exception e) { + throw new IgniteCheckedException("Failed to get cache map.", e); + } + + GridDhtPartitionTopology<Integer,String> top = dht.topology(); + + for (int i = 0; i < keyCnt; i++) { + cacheMap.putEntry(-1, i, "value" + i, 0); + + dht.preloader().request(Collections.singleton(i), -1); + + GridDhtLocalPartition part = top.localPartition(aff.partition(i), false); + + assert part != null; + + part.own(); + } + } + + /** + * @param cache Dht cache. + */ + static void printAffinityInfo(GridCache<?, ?> cache) { + GridCacheConsistentHashAffinityFunction aff = + (GridCacheConsistentHashAffinityFunction)cache.configuration().getAffinity(); + + System.out.println("Affinity info."); + System.out.println("----------------------------------"); + System.out.println("Number of key backups: " + cache.configuration().getBackups()); + System.out.println("Number of cache partitions: " + aff.getPartitions()); + } + + /** + * @param dht Dht cache. + * @param idx Cache index + */ + static void printDhtTopology(GridDhtCache<Integer, String> dht, int idx) { + final GridCacheAffinity<Integer> aff = dht.affinity(); + + Ignite ignite = dht.context().grid(); + ClusterNode locNode = ignite.cluster().localNode(); + + GridDhtPartitionTopology<Integer, String> top = dht.topology(); + + System.out.println("\nTopology of cache #" + idx + " (" + locNode.id() + ")" + ":"); + System.out.println("----------------------------------"); + + List<Integer> affParts = new LinkedList<>(); + + GridDhtPartitionMap map = dht.topology().partitions(locNode.id()); + + if (map != null) + for (int p : map.keySet()) + affParts.add(p); + + Collections.sort(affParts); + + System.out.println("Affinity partitions: " + affParts + "\n"); + + List<GridDhtLocalPartition> locals = new ArrayList<GridDhtLocalPartition>(top.localPartitions()); + + Collections.sort(locals); + + for (final GridDhtLocalPartition part : locals) { + Collection<ClusterNode> partNodes = aff.mapKeyToPrimaryAndBackups(part.id()); + + String ownStr = !partNodes.contains(dht.context().localNode()) ? "NOT AN OWNER" : + F.eqNodes(CU.primary(partNodes), locNode) ? "PRIMARY" : "BACKUP"; + + Collection<Integer> keys = F.viewReadOnly(dht.keySet(), F.<Integer>identity(), new P1<Integer>() { + @Override public boolean apply(Integer k) { + return aff.partition(k) == part.id(); + } + }); + + System.out.println("Local partition: [" + part + "], [owning=" + ownStr + ", keyCnt=" + keys.size() + + ", keys=" + keys + "]"); + } + + System.out.println("\nNode map:"); + + for (Map.Entry<UUID, GridDhtPartitionMap> e : top.partitionMap(false).entrySet()) { + List<Integer> list = new ArrayList<>(e.getValue().keySet()); + + Collections.sort(list); + + System.out.println("[node=" + e.getKey() + ", parts=" + list + "]"); + } + + System.out.println(""); + } + + /** + * Checks consistency of partitioned cache. + * Any preload processes must be finished before this method call(). + * + * @param dht Dht cache. + * @param idx Cache index. + * @param log Logger. + */ + static void checkDhtTopology(GridDhtCache<Integer, String> dht, int idx, IgniteLogger log) { + assert dht != null; + assert idx >= 0; + assert log != null; + + log.info("Checking balanced state of cache #" + idx); + + GridCacheAffinity<Integer> aff = dht.affinity(); + + Ignite ignite = dht.context().grid(); + ClusterNode locNode = ignite.cluster().localNode(); + + GridDhtPartitionTopology<Integer,String> top = dht.topology(); + + // Expected partitions calculated with affinity function. + // They should be in topology in OWNING state. + Collection<Integer> affParts = new HashSet<>(); + + GridDhtPartitionMap map = dht.topology().partitions(locNode.id()); + + if (map != null) + for (int p : map.keySet()) + affParts.add(p); + + if (F.isEmpty(affParts)) + return; + + for (int p : affParts) + assert top.localPartition(p, false) != null : + "Partition does not exist in topology: [cache=" + idx + ", part=" + p + "]"; + + for (GridDhtLocalPartition p : top.localPartitions()) { + assert affParts.contains(p.id()) : + "Invalid local partition: [cache=" + idx + ", part=" + p + ", node partitions=" + affParts + "]"; + + assert p.state() == OWNING : "Invalid partition state [cache=" + idx + ", part=" + p + "]"; + + Collection<ClusterNode> partNodes = aff.mapPartitionToPrimaryAndBackups(p.id()); + + assert partNodes.contains(locNode) : + "Partition affinity nodes does not contain local node: [cache=" + idx + "]"; + } + + // Check keys. + for (GridCacheEntryEx<Integer, String> e : dht.entries()) { + GridDhtCacheEntry<Integer, String> entry = (GridDhtCacheEntry<Integer, String>)e; + + if (!affParts.contains(entry.partition())) + log.warning("Partition of stored entry is obsolete for node: [cache=" + idx + ", entry=" + entry + + ", node partitions=" + affParts + "]"); + + int p = aff.partition(entry.key()); + + if (!affParts.contains(p)) + log.warning("Calculated entry partition is not in node partitions: [cache=" + idx + ", part=" + p + + ", entry=" + entry + ", node partitions=" + affParts + "]"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTxPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTxPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTxPreloadSelfTest.java new file mode 100644 index 0000000..76d34e3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtTxPreloadSelfTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + * Tests cache transaction during preloading. + */ +public class GridCacheDhtTxPreloadSelfTest extends IgniteTxPreloadAbstractTest { + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setDistributionMode(PARTITIONED_ONLY); + cfg.setBackups(4); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheExColocatedFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheExColocatedFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheExColocatedFullApiSelfTest.java new file mode 100644 index 0000000..48c0765 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheExColocatedFullApiSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; +import org.gridgain.grid.kernal.processors.cache.*; + +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * Tests private cache interface on colocated cache. + */ +public class GridCacheExColocatedFullApiSelfTest extends GridCacheExAbstractFullApiSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } +}