Repository: ignite Updated Branches: refs/heads/master 84a7b595f -> a393e6962
http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAbstractDynamicCacheStartFailTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAbstractDynamicCacheStartFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAbstractDynamicCacheStartFailTest.java new file mode 100644 index 0000000..9b9e5d7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAbstractDynamicCacheStartFailTest.java @@ -0,0 +1,775 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.CacheException; +import javax.cache.configuration.Factory; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cluster.BaselineNode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * Tests the recovery after a dynamic cache start failure. + */ +public abstract class IgniteAbstractDynamicCacheStartFailTest extends GridCacheAbstractSelfTest { + /** */ + private static final String DYNAMIC_CACHE_NAME = "TestDynamicCache"; + + /** */ + private static final String CLIENT_GRID_NAME = "client"; + + /** */ + protected static final String EXISTING_CACHE_NAME = "existing-cache";; + + /** */ + private static final int PARTITION_COUNT = 16; + + /** Coordinator node index. */ + private int crdIdx = 0; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** + * Returns {@code true} if persistence is enabled. + * + * @return {@code true} if persistence is enabled. + */ + protected boolean persistenceEnabled() { + return false; + } + + /** + * @throws Exception If failed. + */ + public void testBrokenAffinityFunStartOnServerFailedOnClient() throws Exception { + final String clientName = CLIENT_GRID_NAME + "testBrokenAffinityFunStartOnServerFailedOnClient"; + + IgniteConfiguration clientCfg = getConfiguration(clientName); + + clientCfg.setClientMode(true); + + Ignite client = startGrid(clientName, clientCfg); + + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setName(DYNAMIC_CACHE_NAME + "-server-1"); + + cfg.setAffinity(new BrokenAffinityFunction(false, clientName)); + + try { + IgniteCache cache = ignite(0).getOrCreateCache(cfg); + } + catch (CacheException e) { + fail("Exception should not be thrown."); + } + + stopGrid(clientName); + } + + /** + * @throws Exception If failed. + */ + public void testBrokenAffinityFunStartOnServerFailedOnServer() throws Exception { + final String clientName = CLIENT_GRID_NAME + "testBrokenAffinityFunStartOnServerFailedOnServer"; + + IgniteConfiguration clientCfg = getConfiguration(clientName); + + clientCfg.setClientMode(true); + + Ignite client = startGrid(clientName, clientCfg); + + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setName(DYNAMIC_CACHE_NAME + "-server-2"); + + cfg.setAffinity(new BrokenAffinityFunction(false, getTestIgniteInstanceName(0))); + + try { + IgniteCache cache = ignite(0).getOrCreateCache(cfg); + + fail("Expected exception was not thrown."); + } + catch (CacheException e) { + } + + stopGrid(clientName); + } + + /** + * @throws Exception If failed. + */ + public void testBrokenAffinityFunStartOnClientFailOnServer() throws Exception { + final String clientName = CLIENT_GRID_NAME + "testBrokenAffinityFunStartOnClientFailOnServer"; + + IgniteConfiguration clientCfg = getConfiguration(clientName); + + clientCfg.setClientMode(true); + + Ignite client = startGrid(clientName, clientCfg); + + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setName(DYNAMIC_CACHE_NAME + "-client-2"); + + cfg.setAffinity(new BrokenAffinityFunction(false, getTestIgniteInstanceName(0))); + + try { + IgniteCache cache = client.getOrCreateCache(cfg); + + fail("Expected exception was not thrown."); + } + catch (CacheException e) { + } + + stopGrid(clientName); + } + + /** + * Test cache start with broken affinity function that throws an exception on all nodes. + */ + public void testBrokenAffinityFunOnAllNodes() { + final boolean failOnAllNodes = true; + final int unluckyNode = 0; + final int unluckyCfg = 1; + final int numOfCaches = 3; + final int initiator = 0; + + testDynamicCacheStart( + createCacheConfigsWithBrokenAffinityFun( + failOnAllNodes, unluckyNode, unluckyCfg, numOfCaches, false), + initiator); + } + + /** + * Test cache start with broken affinity function that throws an exception on initiator node. + */ + public void testBrokenAffinityFunOnInitiator() { + final boolean failOnAllNodes = false; + final int unluckyNode = 1; + final int unluckyCfg = 1; + final int numOfCaches = 3; + final int initiator = 1; + + testDynamicCacheStart( + createCacheConfigsWithBrokenAffinityFun( + failOnAllNodes, unluckyNode, unluckyCfg, numOfCaches, false), + initiator); + } + + /** + * Test cache start with broken affinity function that throws an exception on non-initiator node. + */ + public void testBrokenAffinityFunOnNonInitiator() { + final boolean failOnAllNodes = false; + final int unluckyNode = 1; + final int unluckyCfg = 1; + final int numOfCaches = 3; + final int initiator = 2; + + testDynamicCacheStart( + createCacheConfigsWithBrokenAffinityFun( + failOnAllNodes, unluckyNode, unluckyCfg, numOfCaches, false), + initiator); + } + + /** + * Test cache start with broken affinity function that throws an exception on coordinator node. + */ + public void testBrokenAffinityFunOnCoordinatorDiffInitiator() { + final boolean failOnAllNodes = false; + final int unluckyNode = crdIdx; + final int unluckyCfg = 1; + final int numOfCaches = 3; + final int initiator = (crdIdx + 1) % gridCount(); + + testDynamicCacheStart( + createCacheConfigsWithBrokenAffinityFun( + failOnAllNodes, unluckyNode, unluckyCfg, numOfCaches, false), + initiator); + } + + /** + * Test cache start with broken affinity function that throws an exception on initiator node. + */ + public void testBrokenAffinityFunOnCoordinator() { + final boolean failOnAllNodes = false; + final int unluckyNode = crdIdx; + final int unluckyCfg = 1; + final int numOfCaches = 3; + final int initiator = crdIdx; + + testDynamicCacheStart( + createCacheConfigsWithBrokenAffinityFun( + failOnAllNodes, unluckyNode, unluckyCfg, numOfCaches, false), + initiator); + } + + /** + * Tests cache start with node filter and broken affinity function that throws an exception on initiator node. + */ + public void testBrokenAffinityFunWithNodeFilter() { + final boolean failOnAllNodes = false; + final int unluckyNode = 0; + final int unluckyCfg = 0; + final int numOfCaches = 1; + final int initiator = 0; + + testDynamicCacheStart( + createCacheConfigsWithBrokenAffinityFun( + failOnAllNodes, unluckyNode, unluckyCfg, numOfCaches, true), + initiator); + } + + /** + * Tests cache start with broken cache store that throws an exception on all nodes. + */ + public void testBrokenCacheStoreOnAllNodes() { + final boolean failOnAllNodes = true; + final int unluckyNode = 0; + final int unluckyCfg = 1; + final int numOfCaches = 3; + final int initiator = 0; + + testDynamicCacheStart( + createCacheConfigsWithBrokenCacheStore( + failOnAllNodes, unluckyNode, unluckyCfg, numOfCaches, false), + initiator); + } + + /** + * Tests cache start with broken cache store that throws an exception on initiator node. + */ + public void testBrokenCacheStoreOnInitiator() { + final boolean failOnAllNodes = false; + final int unluckyNode = 1; + final int unluckyCfg = 1; + final int numOfCaches = 3; + final int initiator = 1; + + testDynamicCacheStart( + createCacheConfigsWithBrokenCacheStore( + failOnAllNodes, unluckyNode, unluckyCfg, numOfCaches, false), + initiator); + } + + /** + * Tests cache start with broken cache store that throws an exception on non-initiator node. + */ + public void testBrokenCacheStoreOnNonInitiator() { + final boolean failOnAllNodes = false; + final int unluckyNode = 1; + final int unluckyCfg = 1; + final int numOfCaches = 3; + final int initiator = 2; + + testDynamicCacheStart( + createCacheConfigsWithBrokenCacheStore( + failOnAllNodes, unluckyNode, unluckyCfg, numOfCaches, false), + initiator); + } + + /** + * Tests cache start with broken cache store that throws an exception on initiator node. + */ + public void testBrokenCacheStoreOnCoordinatorDiffInitiator() { + final boolean failOnAllNodes = false; + final int unluckyNode = crdIdx; + final int unluckyCfg = 1; + final int numOfCaches = 3; + final int initiator = (crdIdx + 1) % gridCount(); + + testDynamicCacheStart( + createCacheConfigsWithBrokenCacheStore( + failOnAllNodes, unluckyNode, unluckyCfg, numOfCaches, false), + initiator); + } + + /** + * Tests cache start with broken cache store that throws an exception on coordinator node. + */ + public void testBrokenCacheStoreFunOnCoordinator() { + final boolean failOnAllNodes = false; + final int unluckyNode = crdIdx; + final int unluckyCfg = 1; + final int numOfCaches = 3; + final int initiator = crdIdx; + + testDynamicCacheStart( + createCacheConfigsWithBrokenCacheStore( + failOnAllNodes, unluckyNode, unluckyCfg, numOfCaches, false), + initiator); + } + + /** + * Tests multiple creation of cache with broken affinity function. + */ + public void testCreateCacheMultipleTimes() { + final boolean failOnAllNodes = false; + final int unluckyNode = 1; + final int unluckyCfg = 0; + final int numOfAttempts = 15; + + CacheConfiguration cfg = createCacheConfigsWithBrokenAffinityFun( + failOnAllNodes, unluckyNode, unluckyCfg, 1, false).get(0); + + for (int i = 0; i < numOfAttempts; ++i) { + try { + IgniteCache cache = ignite(0).getOrCreateCache(cfg); + + fail("Expected exception was not thrown"); + } + catch (CacheException e) { + } + } + } + + /** + * Tests that a cache with the same name can be started after failure if cache configuration is corrected. + * + * @throws Exception If test failed. + */ + public void testCacheStartAfterFailure() throws Exception { + CacheConfiguration cfg = createCacheConfigsWithBrokenAffinityFun( + false, 1, 0, 1, false).get(0); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + grid(0).getOrCreateCache(cfg); + return null; + } + }, CacheException.class, null); + + // Correct the cache configuration. Default constructor creates a good affinity function. + cfg.setAffinity(new BrokenAffinityFunction()); + + IgniteCache<Integer, Value> cache = grid(0).getOrCreateCache(createCacheConfiguration(EXISTING_CACHE_NAME)); + + checkCacheOperations(cache); + } + + /** + * Tests that other cache (existed before the failed start) is still operable after the failure. + * + * @throws Exception If test failed. + */ + public void testExistingCacheAfterFailure() throws Exception { + IgniteCache<Integer, Value> cache = grid(0).getOrCreateCache(createCacheConfiguration(EXISTING_CACHE_NAME)); + + CacheConfiguration cfg = createCacheConfigsWithBrokenAffinityFun( + false, 1, 0, 1, false).get(0); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + grid(0).getOrCreateCache(cfg); + return null; + } + }, CacheException.class, null); + + checkCacheOperations(cache); + } + + /** + * Tests that other cache works as expected after the failure and further topology changes. + * + * @throws Exception If test failed. + */ + public void testTopologyChangesAfterFailure() throws Exception { + final String clientName = "testTopologyChangesAfterFailure"; + + IgniteCache<Integer, Value> cache = grid(0).getOrCreateCache(createCacheConfiguration(EXISTING_CACHE_NAME)); + + checkCacheOperations(cache); + + CacheConfiguration cfg = createCacheConfigsWithBrokenAffinityFun( + false, 0, 0, 1, false).get(0); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + grid(0).getOrCreateCache(cfg); + return null; + } + }, CacheException.class, null); + + awaitPartitionMapExchange(); + + checkCacheOperations(cache); + + // Start a new server node and check cache operations. + Ignite serverNode = startGrid(gridCount() + 1); + + if (persistenceEnabled()) { + List<BaselineNode> baseline = new ArrayList<>(grid(0).cluster().currentBaselineTopology()); + + baseline.add(serverNode.cluster().localNode()); + + grid(0).cluster().setBaselineTopology(baseline); + } + + awaitPartitionMapExchange(); + + checkCacheOperations(serverNode.cache(EXISTING_CACHE_NAME)); + + // Start a new client node and check cache operations. + IgniteConfiguration clientCfg = getConfiguration(clientName); + + clientCfg.setClientMode(true); + + Ignite clientNode = startGrid(clientName, clientCfg); + + checkCacheOperations(clientNode.cache(EXISTING_CACHE_NAME)); + } + + public void testConcurrentClientNodeJoins() throws Exception { + final int clientCnt = 3; + final int numberOfAttempts = 5; + + IgniteCache<Integer, Value> cache = grid(0).getOrCreateCache(createCacheConfiguration(EXISTING_CACHE_NAME)); + + final AtomicInteger attemptCnt = new AtomicInteger(); + final CountDownLatch stopLatch = new CountDownLatch(clientCnt); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + String clientName = Thread.currentThread().getName(); + + try { + for (int i = 0; i < numberOfAttempts; ++i) { + int uniqueCnt = attemptCnt.getAndIncrement(); + + IgniteConfiguration clientCfg = getConfiguration(clientName + uniqueCnt); + + clientCfg.setClientMode(true); + + final Ignite clientNode = startGrid(clientName, clientCfg); + + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setName(clientName + uniqueCnt); + + String instanceName = getTestIgniteInstanceName(uniqueCnt % gridCount()); + + cfg.setAffinity(new BrokenAffinityFunction(false, instanceName)); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + clientNode.getOrCreateCache(cfg); + return null; + } + }, CacheException.class, null); + + stopGrid(clientName, true); + } + } + catch (Exception e) { + fail("Unexpected exception: " + e.getMessage()); + } + finally { + stopLatch.countDown(); + } + + return null; + } + }, clientCnt, "start-client-thread"); + + stopLatch.await(); + + assertEquals(numberOfAttempts * clientCnt, attemptCnt.get()); + + checkCacheOperations(cache); + } + + protected void testDynamicCacheStart(final Collection<CacheConfiguration> cfgs, final int initiatorId) { + assert initiatorId < gridCount(); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + grid(initiatorId).getOrCreateCaches(cfgs); + return null; + } + }, CacheException.class, null); + + for (CacheConfiguration cfg: cfgs) { + IgniteCache cache = grid(initiatorId).cache(cfg.getName()); + + assertNull(cache); + } + } + + /** + * Creates new cache configuration with the given name. + * + * @param cacheName Cache name. + * @return New cache configuration. + */ + protected CacheConfiguration createCacheConfiguration(String cacheName) { + CacheConfiguration cfg = new CacheConfiguration() + .setName(cacheName) + .setCacheMode(CacheMode.PARTITIONED) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setAffinity(new BrokenAffinityFunction()); + + return cfg; + } + + /** + * Create list of cache configurations. + * + * @param failOnAllNodes {@code true} if affinity function should be broken on all nodes. + * @param unluckyNode Node, where exception is raised. + * @param unluckyCfg Unlucky cache configuration number. + * @param cacheNum Number of caches. + * @param useFilter {@code true} if NodeFilter should be used. + * + * @return List of cache configurations. + */ + protected List<CacheConfiguration> createCacheConfigsWithBrokenAffinityFun( + boolean failOnAllNodes, + int unluckyNode, + final int unluckyCfg, + int cacheNum, + boolean useFilter + ) { + assert unluckyCfg >= 0 && unluckyCfg < cacheNum; + + final UUID uuid = ignite(unluckyNode).cluster().localNode().id(); + + List<CacheConfiguration> cfgs = new ArrayList<>(); + + for (int i = 0; i < cacheNum; ++i) { + CacheConfiguration cfg = createCacheConfiguration(DYNAMIC_CACHE_NAME + "-" + i); + + if (i == unluckyCfg) + cfg.setAffinity(new BrokenAffinityFunction(failOnAllNodes, getTestIgniteInstanceName(unluckyNode))); + + if (useFilter) + cfg.setNodeFilter(new NodeFilter(uuid)); + + cfgs.add(cfg); + } + + return cfgs; + } + + /** + * Create list of cache configurations. + * + * @param failOnAllNodes {@code true} if cache store should be broken on all nodes. + * @param unluckyNode Node, where exception is raised. + * @param unluckyCfg Unlucky cache configuration number. + * @param cacheNum Number of caches. + * @param useFilter {@code true} if NodeFilter should be used. + * + * @return List of cache configurations. + */ + protected List<CacheConfiguration> createCacheConfigsWithBrokenCacheStore( + boolean failOnAllNodes, + int unluckyNode, + int unluckyCfg, + int cacheNum, + boolean useFilter + ) { + assert unluckyCfg >= 0 && unluckyCfg < cacheNum; + + final UUID uuid = ignite(unluckyNode).cluster().localNode().id(); + + List<CacheConfiguration> cfgs = new ArrayList<>(); + + for (int i = 0; i < cacheNum; ++i) { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setName(DYNAMIC_CACHE_NAME + "-" + i); + + if (i == unluckyCfg) + cfg.setCacheStoreFactory(new BrokenStoreFactory(failOnAllNodes, getTestIgniteInstanceName(unluckyNode))); + + if (useFilter) + cfg.setNodeFilter(new NodeFilter(uuid)); + + cfgs.add(cfg); + } + + return cfgs; + } + + /** + * Test the basic cache operations. + * + * @param cache Cache. + * @throws Exception If test failed. + */ + protected void checkCacheOperations(IgniteCache<Integer, Value> cache) throws Exception { + int cnt = 1000; + + // Check cache operations. + for (int i = 0; i < cnt; ++i) + cache.put(i, new Value(i)); + + for (int i = 0; i < cnt; ++i) { + Value v = cache.get(i); + + assertNotNull(v); + assertEquals(i, v.getValue()); + } + + // Check Data Streamer functionality. + try (IgniteDataStreamer<Integer, Value> streamer = grid(0).dataStreamer(cache.getName())) { + for (int i = 0; i < 10_000; ++i) + streamer.addData(i, new Value(i)); + } + } + + /** + * + */ + public static class Value { + @QuerySqlField + private final int fieldVal; + + public Value(int fieldVal) { + this.fieldVal = fieldVal; + } + + public int getValue() { + return fieldVal; + } + } + + /** + * Filter specifying on which node the cache should be started. + */ + public static class NodeFilter implements IgnitePredicate<ClusterNode> { + /** Cache should be created node with certain UUID. */ + public UUID uuid; + + /** + * @param uuid node ID. + */ + public NodeFilter(UUID uuid) { + this.uuid = uuid; + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode clusterNode) { + return clusterNode.id().equals(uuid); + } + } + + /** + * Affinity function that throws an exception when affinity nodes are calculated on the given node. + */ + public static class BrokenAffinityFunction extends RendezvousAffinityFunction { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** Exception should arise on all nodes. */ + private boolean eOnAllNodes = false; + + /** Exception should arise on node with certain name. */ + private String gridName; + + /** + * Constructs a good affinity function. + */ + public BrokenAffinityFunction() { + super(false, PARTITION_COUNT); + // No-op. + } + + /** + * @param eOnAllNodes {@code True} if exception should be thrown on all nodes. + * @param gridName Exception should arise on node with certain name. + */ + public BrokenAffinityFunction(boolean eOnAllNodes, String gridName) { + super(false, PARTITION_COUNT); + + this.eOnAllNodes = eOnAllNodes; + this.gridName = gridName; + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { + if (eOnAllNodes || ignite.name().equals(gridName)) + throw new IllegalStateException("Simulated exception [locNodeId=" + + ignite.cluster().localNode().id() + "]"); + else + return super.assignPartitions(affCtx); + } + } + + /** + * Factory that throws an exception is got created. + */ + public static class BrokenStoreFactory implements Factory<CacheStore<Integer, String>> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** Exception should arise on all nodes. */ + boolean eOnAllNodes = true; + + /** Exception should arise on node with certain name. */ + public static String gridName; + + /** + * @param eOnAllNodes {@code True} if exception should be thrown on all nodes. + * @param gridName Exception should arise on node with certain name. + */ + public BrokenStoreFactory(boolean eOnAllNodes, String gridName) { + this.eOnAllNodes = eOnAllNodes; + + this.gridName = gridName; + } + + /** {@inheritDoc} */ + @Override public CacheStore<Integer, String> create() { + if (eOnAllNodes || ignite.name().equals(gridName)) + throw new IllegalStateException("Simulated exception [locNodeId=" + + ignite.cluster().localNode().id() + "]"); + else + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartCoordinatorFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartCoordinatorFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartCoordinatorFailoverTest.java new file mode 100644 index 0000000..36e1879 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartCoordinatorFailoverTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.GridJobExecuteResponse; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +public class IgniteDynamicCacheStartCoordinatorFailoverTest extends GridCommonAbstractTest { + /** Default IP finder for single-JVM cloud grid. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Latch which blocks DynamicCacheChangeFailureMessage until main thread has sent node fail signal. */ + private static volatile CountDownLatch latch; + + /** */ + private static final String COORDINATOR_ATTRIBUTE = "coordinator"; + + /** Client mode flag. */ + private Boolean appendCustomAttribute; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + latch = new CountDownLatch(1); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + discoSpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoSpi); + + TcpCommunicationSpi commSpi = new CustomCommunicationSpi(); + commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); + + cfg.setCommunicationSpi(commSpi); + + cfg.setFailureDetectionTimeout(15_000); + + if (appendCustomAttribute) { + Map<String, Object> attrs = new HashMap<>(); + + attrs.put(COORDINATOR_ATTRIBUTE, Boolean.TRUE); + + cfg.setUserAttributes(attrs); + } + + return cfg; + } + + /** + * Tests coordinator failover during cache start failure. + * + * @throws Exception If test failed. + */ + public void testCoordinatorFailure() throws Exception { + // Start coordinator node. + appendCustomAttribute = true; + + Ignite g = startGrid(0); + + appendCustomAttribute = false; + + Ignite g1 = startGrid(1); + Ignite g2 = startGrid(2); + + awaitPartitionMapExchange(); + + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setName("test-coordinator-failover"); + + cfg.setAffinity(new BrokenAffinityFunction(false, getTestIgniteInstanceName(2))); + + GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + g1.getOrCreateCache(cfg); + return null; + } + }, CacheException.class, null); + + return null; + } + }, "cache-starter-thread"); + + latch.await(); + + stopGrid(0, true); + + awaitPartitionMapExchange(); + + // Correct the cache configuration. + cfg.setAffinity(new RendezvousAffinityFunction()); + + IgniteCache cache = g1.getOrCreateCache(cfg); + + checkCacheOperations(g1, cache); + } + + /** + * Test the basic cache operations. + * + * @param cache Cache. + * @throws Exception If test failed. + */ + protected void checkCacheOperations(Ignite ignite, IgniteCache cache) throws Exception { + int cnt = 1000; + + // Check base cache operations. + for (int i = 0; i < cnt; ++i) + cache.put(i, i); + + for (int i = 0; i < cnt; ++i) { + Integer v = (Integer) cache.get(i); + + assertNotNull(v); + assertEquals(i, v.intValue()); + } + + // Check Data Streamer capabilities. + try (IgniteDataStreamer streamer = ignite.dataStreamer(cache.getName())) { + for (int i = 0; i < 10_000; ++i) + streamer.addData(i, i); + } + } + + /** + * Communication SPI which could optionally block outgoing messages. + */ + private static class CustomCommunicationSpi extends TcpCommunicationSpi { + /** + * Send message optionally either blocking it or throwing an exception if it is of + * {@link GridJobExecuteResponse} type. + * + * @param node Destination node. + * @param msg Message to be sent. + * @param ackClosure Ack closure. + * @throws org.apache.ignite.spi.IgniteSpiException If failed. + */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + throws IgniteSpiException { + + if (msg instanceof GridIoMessage) { + GridIoMessage msg0 = (GridIoMessage)msg; + + if (msg0.message() instanceof GridDhtPartitionsSingleMessage) { + Boolean attr = (Boolean) node.attributes().get(COORDINATOR_ATTRIBUTE); + + GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage) msg0.message(); + + Exception err = singleMsg.getError(); + + if (Boolean.TRUE.equals(attr) && err != null) { + // skip message + latch.countDown(); + + return; + } + } + } + + super.sendMessage(node, msg, ackClosure); + } + } + + /** + * Affinity function that throws an exception when affinity nodes are calculated on the given node. + */ + public static class BrokenAffinityFunction extends RendezvousAffinityFunction { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** Exception should arise on all nodes. */ + private boolean eOnAllNodes = false; + + /** Exception should arise on node with certain name. */ + private String gridName; + + /** + * Default constructor. + */ + public BrokenAffinityFunction() { + // No-op. + } + + /** + * @param eOnAllNodes {@code True} if exception should be thrown on all nodes. + * @param gridName Exception should arise on node with certain name. + */ + public BrokenAffinityFunction(boolean eOnAllNodes, String gridName) { + this.eOnAllNodes = eOnAllNodes; + this.gridName = gridName; + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { + if (eOnAllNodes || ignite.name().equals(gridName)) + throw new IllegalStateException("Simulated exception [locNodeId=" + + ignite.cluster().localNode().id() + "]"); + else + return super.assignPartitions(affCtx); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartFailTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartFailTest.java new file mode 100644 index 0000000..888a458 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartFailTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * Tests the recovery after a dynamic cache start failure. + */ +public class IgniteDynamicCacheStartFailTest extends IgniteAbstractDynamicCacheStartFailTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(gridCount()); + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartFailWithPersistenceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartFailWithPersistenceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartFailWithPersistenceTest.java new file mode 100644 index 0000000..3b7bf52 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartFailWithPersistenceTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; + +/** + * Tests the recovery after a dynamic cache start failure, with enabled persistence. + */ +public class IgniteDynamicCacheStartFailWithPersistenceTest extends IgniteAbstractDynamicCacheStartFailTest { + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60 * 1000; + } + + protected boolean persistenceEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setMaxSize(256L * 1024 * 1024) + .setPersistenceEnabled(true)) + .setWalMode(WALMode.LOG_ONLY)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + cleanPersistenceDir(); + + startGrids(gridCount()); + + grid(0).cluster().active(true); + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + protected void checkCacheOperations(IgniteCache<Integer, Value> cache) throws Exception { + super.checkCacheOperations(cache); + + // Disable write-ahead log. + grid(0).cluster().disableWal(cache.getName()); + + try (IgniteDataStreamer<Integer, Value> streamer = grid(0).dataStreamer(cache.getName())) { + for (int i = 10_000; i < 15_000; ++i) + streamer.addData(i, new Value(i)); + } + + grid(0).cluster().enableWal(cache.getName()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 7a4d4be..b973c91 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -78,6 +78,9 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheTxReplicatedPeekMo import org.apache.ignite.internal.processors.cache.IgniteClientCacheInitializationFailTest; import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheFilterTest; import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheMultinodeTest; +import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartCoordinatorFailoverTest; +import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartFailTest; +import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartFailWithPersistenceTest; import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartNoExchangeTimeoutTest; import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartSelfTest; import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartStopConcurrentTest; @@ -225,6 +228,9 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(IgniteDynamicCacheStartSelfTest.class); suite.addTestSuite(IgniteDynamicCacheMultinodeTest.class); + suite.addTestSuite(IgniteDynamicCacheStartFailTest.class); + suite.addTestSuite(IgniteDynamicCacheStartFailWithPersistenceTest.class); + suite.addTestSuite(IgniteDynamicCacheStartCoordinatorFailoverTest.class); suite.addTestSuite(IgniteDynamicCacheWithConfigStartSelfTest.class); suite.addTestSuite(IgniteCacheDynamicStopSelfTest.class); suite.addTestSuite(IgniteDynamicCacheStartStopConcurrentTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryAfterDynamicCacheStartFailureTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryAfterDynamicCacheStartFailureTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryAfterDynamicCacheStartFailureTest.java new file mode 100644 index 0000000..f28833e --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheQueryAfterDynamicCacheStartFailureTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.List; +import javax.cache.Cache; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; + +public class CacheQueryAfterDynamicCacheStartFailureTest extends IgniteAbstractDynamicCacheStartFailTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(gridCount()); + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + protected CacheConfiguration createCacheConfiguration(String cacheName) { + CacheConfiguration cfg = new CacheConfiguration() + .setName(cacheName) + .setIndexedTypes(Integer.class, Value.class); + + return cfg; + } + + protected void checkCacheOperations(IgniteCache<Integer, Value> cache) throws Exception { + super.checkCacheOperations(cache); + + // Check SQL API. + String sql = "fieldVal >= ? and fieldVal <= ?"; + List<Cache.Entry<Integer, Value>> res = cache.query( + new SqlQuery<Integer, Value>(Value.class, sql).setArgs(1, 100)).getAll(); + + assertNotNull(res); + assertEquals(100, res.size()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java index 12851c5..0a9d8af 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.CacheBinaryKeyConcurrentQuery import org.apache.ignite.internal.processors.cache.CacheConfigurationP2PTest; import org.apache.ignite.internal.processors.cache.CacheIndexStreamerTest; import org.apache.ignite.internal.processors.cache.CacheOperationsWithExpirationTest; +import org.apache.ignite.internal.processors.cache.CacheQueryAfterDynamicCacheStartFailureTest; import org.apache.ignite.internal.processors.cache.CacheQueryFilterExpiredTest; import org.apache.ignite.internal.processors.cache.CacheRandomOperationsMultithreadedTest; import org.apache.ignite.internal.processors.cache.ClientReconnectAfterClusterRestartTest; @@ -79,6 +80,8 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite { suite.addTestSuite(ClientReconnectAfterClusterRestartTest.class); + suite.addTestSuite(CacheQueryAfterDynamicCacheStartFailureTest.class); + suite.addTestSuite(IgniteCacheGroupsSqlTest.class); suite.addTestSuite(IgniteDataStreamerTest.class);
