http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePreloadLifecycleAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePreloadLifecycleAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePreloadLifecycleAbstractTest.java new file mode 100644 index 0000000..217e3f2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePreloadLifecycleAbstractTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.thread.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; + +import java.io.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; + +/** + * Tests for cache preloader. + */ +@SuppressWarnings({"PublicInnerClass"}) +public abstract class GridCachePreloadLifecycleAbstractTest extends GridCommonAbstractTest { + /** */ + protected static final String TEST_STRING = "ABC"; + + /** */ + protected static final GridCachePreloadMode DFLT_PRELOAD_MODE = SYNC; + + /** */ + protected GridCachePreloadMode preloadMode = DFLT_PRELOAD_MODE; + + /** */ + protected LifecycleBean lifecycleBean; + + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Default keys. */ + protected static final String[] DFLT_KEYS = new String[] { + "Branches", + "CurrencyCurvesAssign", + "CurRefIndex", + "MaturityClasses", + "Folders", + "FloatingRates", + "Swap", + "Portfolios" + }; + + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + c.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + c.setIncludeProperties(); + c.setDeploymentMode(IgniteDeploymentMode.SHARED); + c.setNetworkTimeout(10000); + c.setRestEnabled(false); + c.setMarshaller(new IgniteOptimizedMarshaller(false)); + +// c.setPeerClassLoadingLocalClassPathExclude(GridCachePreloadLifecycleAbstractTest.class.getName(), +// MyValue.class.getName()); + + c.setExecutorService(new IgniteThreadPoolExecutor(10, 10, 0, new LinkedBlockingQueue<Runnable>())); + c.setSystemExecutorService(new IgniteThreadPoolExecutor(10, 10, 0, new LinkedBlockingQueue<Runnable>())); + c.setPeerClassLoadingExecutorService(new IgniteThreadPoolExecutor(3, 3, 0, new LinkedBlockingQueue<Runnable>())); + + c.setLifecycleBeans(lifecycleBean); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + preloadMode = DFLT_PRELOAD_MODE; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + lifecycleBean = null; + + stopAllGrids(); + } + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 4 * 60 * 1000; // 4 min. + } + + /** + * @param key Key. + * @return Value. + */ + protected String value(Object key) { + return TEST_STRING + '-' + key; + } + + /** + * @param plain Flag to use plain strings. + * @param cnt Number of keys to gen. + * @param lookup Optional key lookup array. + * @return Generated keys. + */ + @SuppressWarnings("IfMayBeConditional") + protected Object[] keys(boolean plain, int cnt, String... lookup) { + Object[] arr = new Object[cnt]; + + for (int i = 0; i < cnt; i++) + if (plain) + arr[i] = i < lookup.length ? lookup[i] : "str-" + i; + else + arr[i] = i < lookup.length ? new MyStringKey(lookup[i]) : new MyStringKey("str-" + i); + + return arr; + } + + /** + * + */ + public static class MyStringKey implements Serializable { + /** Key. */ + private final String str; + + /** + * @param str Key. + */ + public MyStringKey(String str) { + this.str = str; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return 31 + ((str == null) ? 0 : str.hashCode()); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { +// if (this == obj) +// return true; +// +// if (obj == null) +// return false; +// +// if (getClass() != obj.getClass()) +// return false; +// +// MyStringKey other = (MyStringKey) obj; +// +// if (str == null) { +// if (other.str != null) +// return false; +// } +// else if (!str.equals(other.str)) +// return false; +// +// return true; + return toString().equals(obj.toString()); + } + + /** {@inheritDoc} */ + @Override public String toString() { +// return str; + return S.toString(MyStringKey.class, this, "clsLdr", getClass().getClassLoader()); + } + } + + /** + * + */ + public static class MyValue implements Serializable { + /** Data. */ + private final String data; + + /** + * @param data Data. + */ + public MyValue(String data) { + assert data != null; + + this.data = data; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (obj instanceof MyValue) { + MyValue myObj = (MyValue) obj; + + return data.equals(myObj.data); + } + + return false; + // return data.equals(obj.toString()); + } + + /** {@inheritDoc} */ + @Override public String toString() { +// return data; + return S.toString(MyValue.class, this, "clsLdr", getClass().getClassLoader()); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePreloadRestartAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePreloadRestartAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePreloadRestartAbstractSelfTest.java new file mode 100644 index 0000000..a1fe615 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePreloadRestartAbstractSelfTest.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; + +import static org.apache.ignite.configuration.IgniteDeploymentMode.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheConfiguration.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Test node restart. + */ +public abstract class GridCachePreloadRestartAbstractSelfTest extends GridCommonAbstractTest { + /** Flag for debug output. */ + private static final boolean DEBUG = false; + + /** Cache name. */ + private static final String CACHE_NAME = "TEST_CACHE"; + + /** */ + private static final long TEST_TIMEOUT = 5 * 60 * 1000; + + /** Default backups. */ + private static final int DFLT_BACKUPS = 1; + + /** Partitions. */ + private static final int DFLT_PARTITIONS = 521; + + /** Preload batch size. */ + private static final int DFLT_BATCH_SIZE = DFLT_PRELOAD_BATCH_SIZE; + + /** Number of key backups. Each test method can set this value as required. */ + private int backups = DFLT_BACKUPS; + + /** */ + private static final int DFLT_NODE_CNT = 4; + + /** */ + private static final int DFLT_KEY_CNT = 100; + + /** */ + private static final int DFLT_RETRIES = 2; + + /** */ + private static volatile int idx = -1; + + /** Preload mode. */ + private GridCachePreloadMode preloadMode = ASYNC; + + /** */ + private int preloadBatchSize = DFLT_BATCH_SIZE; + + /** Number of partitions. */ + private int partitions = DFLT_PARTITIONS; + + /** Node count. */ + private int nodeCnt = DFLT_NODE_CNT; + + /** Key count. */ + private int keyCnt = DFLT_KEY_CNT; + + /** Retries. */ + private int retries = DFLT_RETRIES; + + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + // Discovery. + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + c.setDiscoverySpi(disco); + c.setDeploymentMode(CONTINUOUS); + + // Cache. + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setName(CACHE_NAME); + cc.setCacheMode(PARTITIONED); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setStartSize(20); + cc.setPreloadMode(preloadMode); + cc.setPreloadBatchSize(preloadBatchSize); + cc.setAffinity(new GridCacheConsistentHashAffinityFunction(false, partitions)); + cc.setBackups(backups); + cc.setAtomicityMode(TRANSACTIONAL); + + cc.setDistributionMode(nearEnabled() ? NEAR_PARTITIONED : PARTITIONED_ONLY); + + c.setCacheConfiguration(cc); + + return c; + } + + /** + * @return {@code True} if near cache is enabled. + */ + protected abstract boolean nearEnabled(); + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + backups = DFLT_BACKUPS; + partitions = DFLT_PARTITIONS; + preloadMode = ASYNC; + preloadBatchSize = DFLT_BATCH_SIZE; + nodeCnt = DFLT_NODE_CNT; + keyCnt = DFLT_KEY_CNT; + retries = DFLT_RETRIES; + idx = -1; + +// resetLog4j(Level.DEBUG, true, +// // Categories. +// GridDhtPreloader.class.getPackage().getName(), +// GridDhtPartitionTopologyImpl.class.getName(), +// GridDhtLocalPartition.class.getName()); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** + * @throws Exception If failed. + */ + private void startGrids() throws Exception { + for (int i = 0; i < nodeCnt; i++) { + startGrid(i); + + if (idx < 0) + idx = i; + } + } + + /** + * @throws Exception If failed. + */ + private void stopGrids() throws Exception { + for (int i = 0; i < nodeCnt; i++) + stopGrid(i); + } + + /** + * @throws Exception If failed. + */ + public void testSyncPreloadRestart() throws Exception { + preloadMode = SYNC; + + checkRestart(); + } + + /** + * @throws Exception If failed. + */ + public void testAsyncPreloadRestart() throws Exception { + preloadMode = ASYNC; + + checkRestart(); + } + + /** + * @throws Exception If failed. + */ + public void testDisabledPreloadRestart() throws Exception { + preloadMode = NONE; + + checkRestart(); + } + + /** + * @param cache Cache. + * @return Affinity. + */ + @SuppressWarnings({"unchecked"}) + private GridCacheAffinityFunction affinity(GridCache<Integer, ?> cache) { + return cache.configuration().getAffinity(); + } + + /** + * @param c Cache projection. + */ + private void affinityBeforeStop(GridCache<Integer, String> c) { + for (int key = 0; key < keyCnt; key++) { + int part = affinity(c).partition(key); + + info("Affinity nodes before stop [key=" + key + ", partition" + part + ", nodes=" + + U.nodeIds(c.affinity().mapPartitionToPrimaryAndBackups(part)) + ']'); + } + } + + /** + * @param c Cache projection. + */ + private void affinityAfterStart(GridCache<Integer, String> c) { + if (DEBUG) { + for (int key = 0; key < keyCnt; key++) { + int part = affinity(c).partition(key); + + info("Affinity odes after start [key=" + key + ", partition" + part + ", nodes=" + + U.nodeIds(c.affinity().mapPartitionToPrimaryAndBackups(part)) + ']'); + } + } + } + + /** + * @throws Exception If failed. + */ + private void checkRestart() throws Exception { + info("*** STARTING TEST ***"); + + startGrids(); + + try { + GridCache<Integer, String> c = grid(idx).cache(CACHE_NAME); + + for (int j = 0; j < retries; j++) { + for (int i = 0; i < keyCnt; i++) + c.putx(i, Integer.toString(i)); + + info("Stored items."); + + checkGet(c, j); + + info("Stopping node: " + idx); + + affinityBeforeStop(c); + + stopGrid(idx); + + info("Starting node: " + idx); + + Ignite ignite = startGrid(idx); + + c = ignite.cache(CACHE_NAME); + + affinityAfterStart(c); + + checkGet(c, j); + } + } + finally { + stopGrids(); + } + } + + /** + * @param c Cache. + * @param attempt Attempt. + * @throws Exception If failed. + */ + private void checkGet(GridCache<Integer, String> c, int attempt) throws Exception { + for (int i = 0; i < keyCnt; i++) { + String v = c.get(i); + + if (v == null) { + printFailureDetails(c, i, attempt); + + fail("Value is null [key=" + i + ", attempt=" + attempt + "]"); + } + + if (!Integer.toString(i).equals(v)) { + printFailureDetails(c, i, attempt); + + fail("Wrong value for key [key=" + + i + ", actual value=" + v + ", expected value=" + Integer.toString(i) + "]"); + } + } + + info("Read items."); + } + + /** + * @param c Cache projection. + * @param key Key. + * @param attempt Attempt. + */ + private void printFailureDetails(GridCache<Integer, String> c, int key, int attempt) { + error("*** Failure details ***"); + error("Key: " + key); + error("Partition: " + c.configuration().getAffinity().partition(key)); + error("Attempt: " + attempt); + error("Node: " + c.gridProjection().ignite().cluster().localNode().id()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java new file mode 100644 index 0000000..6d83ea9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTransformEventSelfTest.java @@ -0,0 +1,609 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.eclipse.jetty.util.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.junits.common.*; + +import javax.cache.processor.*; +import java.io.*; +import java.util.*; + +import static org.apache.ignite.cache.GridCacheAtomicWriteOrderMode.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Test for TRANSFORM events recording. + */ +@SuppressWarnings("ConstantConditions") +public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest { + /** Nodes count. */ + private static final int GRID_CNT = 3; + + /** Backups count for partitioned cache. */ + private static final int BACKUP_CNT = 1; + + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** Closure name. */ + private static final String CLO_NAME = Transformer.class.getName(); + + /** Key 1. */ + private Integer key1; + + /** Key 2. */ + private Integer key2; + + /** Two keys in form of a set. */ + private Set<Integer> keys; + + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Nodes. */ + private Ignite[] ignites; + + /** Node IDs. */ + private UUID[] ids; + + /** Caches. */ + private IgniteCache<Integer, Integer>[] caches; + + /** Recorded events.*/ + private ConcurrentHashSet<IgniteCacheEvent> evts; + + /** Cache mode. */ + private GridCacheMode cacheMode; + + /** Atomicity mode. */ + private GridCacheAtomicityMode atomicityMode; + + /** TX concurrency. */ + private IgniteTxConcurrency txConcurrency; + + /** TX isolation. */ + private IgniteTxIsolation txIsolation; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + TransactionsConfiguration tCfg = cfg.getTransactionsConfiguration(); + + tCfg.setDefaultTxConcurrency(txConcurrency); + tCfg.setDefaultTxIsolation(txIsolation); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(CACHE_NAME); + + ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setDistributionMode(GridCacheDistributionMode.PARTITIONED_ONLY); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(BACKUP_CNT); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(ccfg); + cfg.setLocalHost("127.0.0.1"); + cfg.setIncludeEventTypes(EVT_CACHE_OBJECT_READ); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + ignites = null; + ids = null; + caches = null; + + evts = null; + + key1 = null; + key2 = null; + keys = null; + } + + /** + * Initialization routine. + * + * @param cacheMode Cache mode. + * @param atomicityMode Atomicity mode. + * @param txConcurrency TX concurrency. + * @param txIsolation TX isolation. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + private void initialize(GridCacheMode cacheMode, GridCacheAtomicityMode atomicityMode, + IgniteTxConcurrency txConcurrency, IgniteTxIsolation txIsolation) throws Exception { + this.cacheMode = cacheMode; + this.atomicityMode = atomicityMode; + this.txConcurrency = txConcurrency; + this.txIsolation = txIsolation; + + evts = new ConcurrentHashSet<>(); + + startGrids(GRID_CNT); + + ignites = new Ignite[GRID_CNT]; + ids = new UUID[GRID_CNT]; + caches = new IgniteCache[GRID_CNT]; + + for (int i = 0; i < GRID_CNT; i++) { + ignites[i] = grid(i); + + ids[i] = ignites[i].cluster().localNode().id(); + + caches[i] = ignites[i].jcache(CACHE_NAME); + + ignites[i].events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + IgniteCacheEvent evt0 = (IgniteCacheEvent)evt; + + if (evt0.closureClassName() != null) { + System.out.println("ADDED: [nodeId=" + evt0.node() + ", evt=" + evt0 + ']'); + + evts.add(evt0); + } + + return true; + } + }, EVT_CACHE_OBJECT_READ); + } + + int key = 0; + + while (true) { + if (cacheMode != PARTITIONED || (primary(0, key) && backup(1, key))) { + key1 = key++; + + break; + } + else + key++; + } + + while (true) { + if (cacheMode != PARTITIONED || (primary(0, key) && backup(1, key))) { + key2 = key; + + break; + } + else + key++; + } + + keys = new HashSet<>(); + + keys.add(key1); + keys.add(key2); + + caches[0].put(key1, 1); + caches[0].put(key2, 2); + + for (int i = 0; i < GRID_CNT; i++) { + ignites[i].events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + IgniteCacheEvent evt0 = (IgniteCacheEvent)evt; + + if (evt0.closureClassName() != null) + evts.add(evt0); + + return true; + } + }, EVT_CACHE_OBJECT_READ); + } + } + + /** + * @param gridIdx Grid index. + * @param key Key. + * @return {@code True} if grid is primary for given key. + */ + private boolean primary(int gridIdx, Object key) { + GridCacheAffinity<Object> aff = grid(0).cache(CACHE_NAME).affinity(); + + return aff.isPrimary(grid(gridIdx).cluster().localNode(), key); + } + + /** + * @param gridIdx Grid index. + * @param key Key. + * @return {@code True} if grid is primary for given key. + */ + private boolean backup(int gridIdx, Object key) { + GridCacheAffinity<Object> aff = grid(0).cache(CACHE_NAME).affinity(); + + return aff.isBackup(grid(gridIdx).cluster().localNode(), key); + } + + /** + * Test TRANSACTIONAL LOCAL cache with OPTIMISTIC/REPEATABLE_READ transaction. + * + * @throws Exception If failed. + */ + public void testTxLocalOptimisticRepeatableRead() throws Exception { + checkTx(LOCAL, OPTIMISTIC, REPEATABLE_READ); + } + + /** + * Test TRANSACTIONAL LOCAL cache with OPTIMISTIC/READ_COMMITTED transaction. + * + * @throws Exception If failed. + */ + public void testTxLocalOptimisticReadCommitted() throws Exception { + checkTx(LOCAL, OPTIMISTIC, READ_COMMITTED); + } + + /** + * Test TRANSACTIONAL LOCAL cache with OPTIMISTIC/SERIALIZABLE transaction. + * + * @throws Exception If failed. + */ + public void testTxLocalOptimisticSerializable() throws Exception { + checkTx(LOCAL, OPTIMISTIC, SERIALIZABLE); + } + + /** + * Test TRANSACTIONAL LOCAL cache with PESSIMISTIC/REPEATABLE_READ transaction. + * + * @throws Exception If failed. + */ + public void testTxLocalPessimisticRepeatableRead() throws Exception { + checkTx(LOCAL, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * Test TRANSACTIONAL LOCAL cache with PESSIMISTIC/READ_COMMITTED transaction. + * + * @throws Exception If failed. + */ + public void testTxLocalPessimisticReadCommitted() throws Exception { + checkTx(LOCAL, PESSIMISTIC, READ_COMMITTED); + } + + /** + * Test TRANSACTIONAL LOCAL cache with PESSIMISTIC/SERIALIZABLE transaction. + * + * @throws Exception If failed. + */ + public void testTxLocalPessimisticSerializable() throws Exception { + checkTx(LOCAL, PESSIMISTIC, SERIALIZABLE); + } + + /** + * Test TRANSACTIONAL PARTITIONED cache with OPTIMISTIC/REPEATABLE_READ transaction. + * + * @throws Exception If failed. + */ + public void testTxPartitionedOptimisticRepeatableRead() throws Exception { + checkTx(PARTITIONED, OPTIMISTIC, REPEATABLE_READ); + } + + /** + * Test TRANSACTIONAL PARTITIONED cache with OPTIMISTIC/READ_COMMITTED transaction. + * + * @throws Exception If failed. + */ + public void testTxPartitionedOptimisticReadCommitted() throws Exception { + checkTx(PARTITIONED, OPTIMISTIC, READ_COMMITTED); + } + + /** + * Test TRANSACTIONAL PARTITIONED cache with OPTIMISTIC/SERIALIZABLE transaction. + * + * @throws Exception If failed. + */ + public void testTxPartitionedOptimisticSerializable() throws Exception { + checkTx(PARTITIONED, OPTIMISTIC, SERIALIZABLE); + } + + /** + * Test TRANSACTIONAL PARTITIONED cache with PESSIMISTIC/REPEATABLE_READ transaction. + * + * @throws Exception If failed. + */ + public void testTxPartitionedPessimisticRepeatableRead() throws Exception { + checkTx(PARTITIONED, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * Test TRANSACTIONAL PARTITIONED cache with PESSIMISTIC/READ_COMMITTED transaction. + * + * @throws Exception If failed. + */ + public void testTxPartitionedPessimisticReadCommitted() throws Exception { + checkTx(PARTITIONED, PESSIMISTIC, READ_COMMITTED); + } + + /** + * Test TRANSACTIONAL PARTITIONED cache with PESSIMISTIC/SERIALIZABLE transaction. + * + * @throws Exception If failed. + */ + public void testTxPartitionedPessimisticSerializable() throws Exception { + checkTx(PARTITIONED, PESSIMISTIC, SERIALIZABLE); + } + + /** + * Test TRANSACTIONAL REPLICATED cache with OPTIMISTIC/REPEATABLE_READ transaction. + * + * @throws Exception If failed. + */ + public void testTxReplicatedOptimisticRepeatableRead() throws Exception { + checkTx(REPLICATED, OPTIMISTIC, REPEATABLE_READ); + } + + /** + * Test TRANSACTIONAL REPLICATED cache with OPTIMISTIC/READ_COMMITTED transaction. + * + * @throws Exception If failed. + */ + public void testTxReplicatedOptimisticReadCommitted() throws Exception { + checkTx(REPLICATED, OPTIMISTIC, READ_COMMITTED); + } + + /** + * Test TRANSACTIONAL REPLICATED cache with OPTIMISTIC/SERIALIZABLE transaction. + * + * @throws Exception If failed. + */ + public void testTxReplicatedOptimisticSerializable() throws Exception { + checkTx(REPLICATED, OPTIMISTIC, SERIALIZABLE); + } + + /** + * Test TRANSACTIONAL REPLICATED cache with PESSIMISTIC/REPEATABLE_READ transaction. + * + * @throws Exception If failed. + */ + public void testTxReplicatedPessimisticRepeatableRead() throws Exception { + checkTx(REPLICATED, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * Test TRANSACTIONAL REPLICATED cache with PESSIMISTIC/READ_COMMITTED transaction. + * + * @throws Exception If failed. + */ + public void testTxReplicatedPessimisticReadCommitted() throws Exception { + checkTx(REPLICATED, PESSIMISTIC, READ_COMMITTED); + } + + /** + * Test TRANSACTIONAL REPLICATED cache with PESSIMISTIC/SERIALIZABLE transaction. + * + * @throws Exception If failed. + */ + public void testTxReplicatedPessimisticSerializable() throws Exception { + checkTx(REPLICATED, PESSIMISTIC, SERIALIZABLE); + } + + /** + * Test ATOMIC LOCAL cache. + * + * @throws Exception If failed. + */ + public void testAtomicLocal() throws Exception { + checkAtomic(LOCAL); + } + + /** + * Test ATOMIC PARTITIONED cache. + * + * @throws Exception If failed. + */ + public void testAtomicPartitioned() throws Exception { + checkAtomic(PARTITIONED); + } + + /** + * Test ATOMIC REPLICATED cache. + * + * @throws Exception If failed. + */ + public void testAtomicReplicated() throws Exception { + checkAtomic(REPLICATED); + } + + /** + * Check ATOMIC cache. + * + * @param cacheMode Cache mode. + * @throws Exception If failed. + */ + private void checkAtomic(GridCacheMode cacheMode) throws Exception { + initialize(cacheMode, ATOMIC, null, null); + + caches[0].invoke(key1, new Transformer()); + + checkEventNodeIdsStrict(primaryIdsForKeys(key1)); + + assert evts.isEmpty(); + + caches[0].invokeAll(keys, new Transformer()); + + checkEventNodeIdsStrict(primaryIdsForKeys(key1, key2)); + } + + /** + * Check TRANSACTIONAL cache. + * + * @param cacheMode Cache mode. + * @param txConcurrency TX concurrency. + * @param txIsolation TX isolation. + * + * @throws Exception If failed. + */ + private void checkTx(GridCacheMode cacheMode, IgniteTxConcurrency txConcurrency, + IgniteTxIsolation txIsolation) throws Exception { + initialize(cacheMode, TRANSACTIONAL, txConcurrency, txIsolation); + + System.out.println("BEFORE: " + evts.size()); + + caches[0].invoke(key1, new Transformer()); + + System.out.println("AFTER: " + evts.size()); + + checkEventNodeIdsStrict(idsForKeys(key1)); + + assert evts.isEmpty(); + + caches[0].invokeAll(keys, new Transformer()); + + checkEventNodeIdsStrict(idsForKeys(key1, key2)); + } + + /** + * Get node IDs where the given keys must reside. + * + * @param keys Keys. + * @return Node IDs. + */ + private UUID[] idsForKeys(int... keys) { + return idsForKeys(false, keys); + } + + /** + * Get primary node IDs where the given keys must reside. + * + * @param keys Keys. + * @return Node IDs. + */ + private UUID[] primaryIdsForKeys(int... keys) { + return idsForKeys(true, keys); + } + + /** + * Get node IDs where the given keys must reside. + * + * @param primaryOnly Primary only flag. + * @param keys Keys. + * @return Node IDs. + */ + @SuppressWarnings("UnusedDeclaration") + private UUID[] idsForKeys(boolean primaryOnly, int... keys) { + List<UUID> res = new ArrayList<>(); + + if (cacheMode == LOCAL) { + for (int key : keys) + res.add(ids[0]); // Perform PUTs from the node with index 0. + } + else if (cacheMode == PARTITIONED) { + for (int key : keys) { + for (int i = 0; i < GRID_CNT; i++) { + if (primary(i, key) || (!primaryOnly && backup(i, key))) + res.add(ids[i]); + } + } + } + else if (cacheMode == REPLICATED) { + for (int key : keys) { + if (primaryOnly) + res.add(grid(0).cache(CACHE_NAME).affinity().mapKeyToNode(key).id()); + else + res.addAll(Arrays.asList(ids)); + } + } + + return res.toArray(new UUID[res.size()]); + } + + /** + * Ensure that events were recorded on the given nodes. + * + * @param ids Event IDs. + */ + private void checkEventNodeIdsStrict(UUID... ids) { + if (ids == null) + assertTrue(evts.isEmpty()); + else { + assertEquals(ids.length, evts.size()); + + for (UUID id : ids) { + IgniteCacheEvent foundEvt = null; + + for (IgniteCacheEvent evt : evts) { + if (F.eq(id, evt.node().id())) { + assertEquals(CLO_NAME, evt.closureClassName()); + + foundEvt = evt; + + break; + } + } + + if (foundEvt == null) { + int gridIdx = -1; + + for (int i = 0; i < GRID_CNT; i++) { + if (F.eq(this.ids[i], id)) { + gridIdx = i; + + break; + } + } + + fail("Expected transform event was not triggered on the node [nodeId=" + id + + ", key1Primary=" + primary(gridIdx, key1) + ", key1Backup=" + backup(gridIdx, key1) + + ", key2Primary=" + primary(gridIdx, key2) + ", key2Backup=" + backup(gridIdx, key2) + ']'); + } + else + evts.remove(foundEvt); + } + } + } + + /** + * Transform closure. + */ + private static class Transformer implements EntryProcessor<Integer, Integer, Void>, Serializable { + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + e.setValue(e.getValue() + 1); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java new file mode 100644 index 0000000..fbfa310 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.IgniteTxIsolation.REPEATABLE_READ; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * + */ +public abstract class IgniteTxConsistencyRestartAbstractSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Grid count. */ + private static final int GRID_CNT = 4; + + /** Key range. */ + private static final int RANGE = 100_000; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + cfg.setCacheConfiguration(cacheConfiguration(gridName)); + + return cfg; + } + + /** + * @param gridName Grid name. + * @return Cache configuration. + */ + public CacheConfiguration cacheConfiguration(String gridName) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setCacheMode(cacheMode()); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setDistributionMode(partitionDistributionMode()); + ccfg.setPreloadMode(SYNC); + + if (cacheMode() == GridCacheMode.PARTITIONED) + ccfg.setBackups(1); + + return ccfg; + } + + /** + * @return Cache mode. + */ + protected abstract GridCacheMode cacheMode(); + + /** + * @return Partition distribution mode for PARTITIONED cache. + */ + protected abstract GridCacheDistributionMode partitionDistributionMode(); + + /** + * @throws Exception If failed. + */ + public void testTxConsistency() throws Exception { + startGridsMultiThreaded(GRID_CNT); + + IgniteDataLoader<Object, Object> ldr = grid(0).dataLoader(null); + + for (int i = 0; i < RANGE; i++) { + ldr.addData(i, 0); + + if (i > 0 && i % 1000 == 0) + info("Put keys: " + i); + } + + ldr.close(); + + final AtomicBoolean done = new AtomicBoolean(false); + + Thread restartThread = new Thread() { + @Override public void run() { + Random rnd = new Random(); + + while (!done.get()) { + try { + int idx = rnd.nextInt(GRID_CNT); + + stopGrid(idx); + + startGrid(idx); + } + catch (Exception e) { + e.printStackTrace(); + } + } + } + }; + + restartThread.start(); + + Random rnd = new Random(); + + // Make some iterations with 1-3 keys transactions. + for (int i = 0; i < 50_000; i++) { + int idx = i % GRID_CNT; + + if (i > 0 && i % 1000 == 0) + info("Running iteration: " + i); + + try { + GridKernal grid = (GridKernal)grid(idx); + + GridCache<Integer, Integer> cache = grid.cache(null); + + List<Integer> keys = new ArrayList<>(); + + int keyCnt = rnd.nextInt(3); + + for (int k = 0; k < keyCnt; k++) + keys.add(rnd.nextInt(RANGE)); + + Collections.sort(keys); + + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + Map<Integer, Integer> map = cache.getAll(keys); + + for (Map.Entry<Integer, Integer> entry : map.entrySet()) { + assertNotNull("Null value received from cache [key=" + entry.getKey() + "]", entry.getValue()); + + cache.put(entry.getKey(), entry.getValue() + 1); + } + + tx.commit(); + } + } + catch (Exception e) { + info("Failed to update keys: " + e.getMessage()); + } + } + + done.set(true); + + restartThread.join(); + + for (int k = 0; k < RANGE; k++) { + Integer val = null; + + for (int i = 0; i < GRID_CNT; i++) { + GridEx grid = grid(i); + + GridCache<Integer, Integer> cache = grid.cache(null); + + if (cache.affinity().isPrimaryOrBackup(grid.localNode(), k)) { + if (val == null) { + val = cache.peek(k); + + assertNotNull("Failed to peek value for key: " + k, val); + } + else + assertEquals("Failed to find value in cache [primary=" + + cache.affinity().isPrimary(grid.localNode(), k) + ']', + val, cache.peek(k)); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java new file mode 100644 index 0000000..d74babc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.internal.managers.communication.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.lang.*; +import org.gridgain.testframework.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; + +/** + * Abstract test for originating node failure. + */ +public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends GridCacheAbstractSelfTest { + /** */ + protected static final int GRID_CNT = 5; + + /** Ignore node ID. */ + private volatile UUID ignoreMsgNodeId; + + /** Ignore message class. */ + private Class<?> ignoreMsgCls; + + /** + * @throws Exception If failed. + */ + public void testManyKeysCommit() throws Exception { + Collection<Integer> keys = new ArrayList<>(200); + + for (int i = 0; i < 200; i++) + keys.add(i); + + testTxOriginatingNodeFails(keys, false); + } + + /** + * @throws Exception If failed. + */ + public void testManyKeysRollback() throws Exception { + Collection<Integer> keys = new ArrayList<>(200); + + for (int i = 0; i < 200; i++) + keys.add(i); + + testTxOriginatingNodeFails(keys, true); + } + + /** + * @return Index of node starting transaction. + */ + protected int originatingNode() { + return 0; + } + + /** + * Ignores messages to given node of given type. + * + * @param dstNodeId Destination node ID. + * @param msgCls Message type. + */ + protected void ignoreMessages(UUID dstNodeId, Class<?> msgCls) { + ignoreMsgNodeId = dstNodeId; + ignoreMsgCls = msgCls; + } + + /** + * Gets ignore message class to simulate partial prepare message. + * + * @return Ignore message class. + */ + protected abstract Class<?> ignoreMessageClass(); + + /** + * @param keys Keys to update. + * @param partial Flag indicating whether to simulate partial prepared state. + * @throws Exception If failed. + */ + protected void testTxOriginatingNodeFails(Collection<Integer> keys, final boolean partial) throws Exception { + assertFalse(keys.isEmpty()); + + final Collection<GridKernal> grids = new ArrayList<>(); + + ClusterNode txNode = grid(originatingNode()).localNode(); + + for (int i = 1; i < gridCount(); i++) + grids.add((GridKernal)grid(i)); + + final Map<Integer, String> map = new HashMap<>(); + + final String initVal = "initialValue"; + + for (Integer key : keys) { + grid(originatingNode()).cache(null).put(key, initVal); + + map.put(key, String.valueOf(key)); + } + + Map<Integer, Collection<ClusterNode>> nodeMap = new HashMap<>(); + + GridCacheAdapter<Integer, String> cache = ((GridKernal)grid(1)).internalCache(); + + info("Node being checked: " + grid(1).localNode().id()); + + for (Integer key : keys) { + Collection<ClusterNode> nodes = new ArrayList<>(); + + nodes.addAll(cache.affinity().mapKeyToPrimaryAndBackups(key)); + + nodes.remove(txNode); + + nodeMap.put(key, nodes); + } + + info("Starting tx [values=" + map + ", topVer=" + + ((GridKernal)grid(1)).context().discovery().topologyVersion() + ']'); + + if (partial) + ignoreMessages(grid(1).localNode().id(), ignoreMessageClass()); + + final Ignite txIgniteNode = G.ignite(txNode.id()); + + GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + GridCache<Integer, String> cache = txIgniteNode.cache(null); + + assertNotNull(cache); + + IgniteTxProxyImpl tx = (IgniteTxProxyImpl)cache.txStart(); + + IgniteTxEx txEx = GridTestUtils.getFieldValue(tx, "tx"); + + cache.putAll(map); + + try { + txEx.prepareAsync().get(3, TimeUnit.SECONDS); + } + catch (IgniteFutureTimeoutException ignored) { + info("Failed to wait for prepare future completion: " + partial); + } + + return null; + } + }).get(); + + info("Stopping originating node " + txNode); + + G.stop(G.ignite(txNode.id()).name(), true); + + info("Stopped grid, waiting for transactions to complete."); + + boolean txFinished = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (GridKernal g : grids) { + GridCacheSharedContext<Object, Object> ctx = g.context().cache().context(); + + int txNum = ctx.tm().idMapSize(); + + if (txNum != 0) + return false; + } + + return true; + } + }, 10000); + + assertTrue(txFinished); + + info("Transactions finished."); + + for (Map.Entry<Integer, Collection<ClusterNode>> e : nodeMap.entrySet()) { + final Integer key = e.getKey(); + + final String val = map.get(key); + + assertFalse(e.getValue().isEmpty()); + + for (ClusterNode node : e.getValue()) { + compute(G.ignite(node.id()).cluster().forNode(node)).call(new Callable<Void>() { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + @Override public Void call() throws Exception { + GridCache<Integer, String> cache = ignite.cache(null); + + assertNotNull(cache); + + assertEquals(partial ? initVal : val, cache.peek(key)); + + return null; + } + }); + } + } + + for (Map.Entry<Integer, String> e : map.entrySet()) { + for (Ignite g : G.allGrids()) { + UUID locNodeId = g.cluster().localNode().id(); + + assertEquals("Check failed for node: " + locNodeId, partial ? initVal : e.getValue(), + g.cache(null).get(e.getKey())); + } + } + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCommunicationSpi(new TcpCommunicationSpi() { + @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg) + throws IgniteSpiException { + if (!F.eq(ignoreMsgNodeId, node.id()) || !ignoredMessage((GridIoMessage)msg)) + super.sendMessage(node, msg); + } + }); + + cfg.getTransactionsConfiguration().setDefaultTxConcurrency(OPTIMISTIC); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setCacheStoreFactory(null); + cfg.setReadThrough(false); + cfg.setWriteThrough(false); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected abstract GridCacheMode cacheMode(); + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGridsMultiThreaded(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + // No-op + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + ignoreMsgCls = null; + ignoreMsgNodeId = null; + } + + /** + * Checks if message should be ignored. + * + * @param msg Message. + * @return {@code True} if message should be ignored. + */ + private boolean ignoredMessage(GridIoMessage msg) { + return ignoreMsgCls != null && ignoreMsgCls.isAssignableFrom(msg.message().getClass()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java new file mode 100644 index 0000000..a20c73c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.managers.communication.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.lang.*; +import org.gridgain.testframework.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; + +/** + * Abstract test for originating node failure. + */ +public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest extends GridCacheAbstractSelfTest { + /** */ + protected static final int GRID_CNT = 5; + + /** Ignore node ID. */ + private volatile Collection<UUID> ignoreMsgNodeIds; + + /** Ignore message class. */ + private Collection<Class<?>> ignoreMsgCls; + + /** Failing node ID. */ + private UUID failingNodeId; + + /** + * @throws Exception If failed. + */ + public void testManyKeysCommit() throws Exception { + Collection<Integer> keys = new ArrayList<>(200); + + for (int i = 0; i < 200; i++) + keys.add(i); + + testTxOriginatingNodeFails(keys, false); + } + + /** + * @throws Exception If failed. + */ + public void testManyKeysRollback() throws Exception { + Collection<Integer> keys = new ArrayList<>(200); + + for (int i = 0; i < 200; i++) + keys.add(i); + + testTxOriginatingNodeFails(keys, true); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureCommit() throws Exception { + checkPrimaryNodeCrash(true); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureRollback() throws Exception { + checkPrimaryNodeCrash(false); + } + + /** + * @return Index of node starting transaction. + */ + protected int originatingNode() { + return 0; + } + + /** + * Ignores messages to given node of given type. + * + * @param dstNodeIds Destination node IDs. + * @param msgCls Message type. + */ + protected void ignoreMessages(Collection<Class<?>> msgCls, Collection<UUID> dstNodeIds) { + ignoreMsgNodeIds = dstNodeIds; + ignoreMsgCls = msgCls; + } + + /** + * Gets ignore message class to simulate partial prepare message. + * + * @return Ignore message class. + */ + protected abstract Collection<Class<?>> ignoreMessageClasses(); + + /** + * @param keys Keys to update. + * @param fullFailure Flag indicating whether to simulate rollback state. + * @throws Exception If failed. + */ + protected void testTxOriginatingNodeFails(Collection<Integer> keys, final boolean fullFailure) throws Exception { + assertFalse(keys.isEmpty()); + + final Collection<GridKernal> grids = new ArrayList<>(); + + ClusterNode txNode = grid(originatingNode()).localNode(); + + for (int i = 1; i < gridCount(); i++) + grids.add((GridKernal)grid(i)); + + failingNodeId = grid(0).localNode().id(); + + final Map<Integer, String> map = new HashMap<>(); + + final String initVal = "initialValue"; + + for (Integer key : keys) { + grid(originatingNode()).cache(null).put(key, initVal); + + map.put(key, String.valueOf(key)); + } + + Map<Integer, Collection<ClusterNode>> nodeMap = new HashMap<>(); + + GridCacheAdapter<Integer, String> cache = ((GridKernal)grid(1)).internalCache(); + + info("Node being checked: " + grid(1).localNode().id()); + + for (Integer key : keys) { + Collection<ClusterNode> nodes = new ArrayList<>(); + + nodes.addAll(cache.affinity().mapKeyToPrimaryAndBackups(key)); + + nodes.remove(txNode); + + nodeMap.put(key, nodes); + } + + info("Starting tx [values=" + map + ", topVer=" + + ((GridKernal)grid(1)).context().discovery().topologyVersion() + ']'); + + if (fullFailure) + ignoreMessages(ignoreMessageClasses(), allNodeIds()); + else + ignoreMessages(ignoreMessageClasses(), F.asList(grid(1).localNode().id())); + + final GridEx originatingNodeGrid = grid(originatingNode()); + + GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + GridCache<Integer, String> cache = originatingNodeGrid.cache(null); + + assertNotNull(cache); + + IgniteTx tx = cache.txStart(); + + try { + cache.putAll(map); + + info("Before commitAsync"); + + tx = (IgniteTx)tx.enableAsync(); + + tx.commit(); + + IgniteFuture<IgniteTx> fut = tx.future(); + + info("Got future for commitAsync()."); + + fut.get(3, TimeUnit.SECONDS); + } + catch (IgniteFutureTimeoutException ignored) { + info("Failed to wait for commit future completion [fullFailure=" + fullFailure + ']'); + } + + return null; + } + }).get(); + + info(">>> Stopping originating node " + txNode); + + G.stop(grid(originatingNode()).name(), true); + + ignoreMessages(Collections.<Class<?>>emptyList(), Collections.<UUID>emptyList()); + + info(">>> Stopped originating node: " + txNode.id()); + + boolean txFinished = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (GridKernal g : grids) { + GridCacheAdapter<?, ?> cache = g.internalCache(); + + IgniteTxManager txMgr = cache.isNear() ? + ((GridNearCacheAdapter)cache).dht().context().tm() : + cache.context().tm(); + + int txNum = txMgr.idMapSize(); + + if (txNum != 0) + return false; + } + + return true; + } + }, 10000); + + assertTrue(txFinished); + + info("Transactions finished."); + + for (Map.Entry<Integer, Collection<ClusterNode>> e : nodeMap.entrySet()) { + final Integer key = e.getKey(); + + final String val = map.get(key); + + assertFalse(e.getValue().isEmpty()); + + for (ClusterNode node : e.getValue()) { + final UUID checkNodeId = node.id(); + + compute(G.ignite(checkNodeId).cluster().forNode(node)).call(new Callable<Void>() { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + @Override public Void call() throws Exception { + GridCache<Integer, String> cache = ignite.cache(null); + + assertNotNull(cache); + + assertEquals("Failed to check entry value on node: " + checkNodeId, + fullFailure ? initVal : val, cache.peek(key)); + + return null; + } + }); + } + } + + for (Map.Entry<Integer, String> e : map.entrySet()) { + for (Ignite g : G.allGrids()) + assertEquals(fullFailure ? initVal : e.getValue(), g.cache(null).get(e.getKey())); + } + } + + /** + * Checks tx data consistency in case when primary node crashes. + * + * @param commmit Whether to commit or rollback a transaction. + * @throws Exception If failed. + */ + private void checkPrimaryNodeCrash(final boolean commmit) throws Exception { + Collection<Integer> keys = new ArrayList<>(20); + + for (int i = 0; i < 20; i++) + keys.add(i); + + final Collection<GridKernal> grids = new ArrayList<>(); + + ClusterNode primaryNode = grid(1).localNode(); + + for (int i = 0; i < gridCount(); i++) { + if (i != 1) + grids.add((GridKernal)grid(i)); + } + + failingNodeId = primaryNode.id(); + + final Map<Integer, String> map = new HashMap<>(); + + final String initVal = "initialValue"; + + for (Integer key : keys) { + grid(originatingNode()).cache(null).put(key, initVal); + + map.put(key, String.valueOf(key)); + } + + Map<Integer, Collection<ClusterNode>> nodeMap = new HashMap<>(); + + GridCache<Integer, String> cache = grid(0).cache(null); + + info("Failing node ID: " + grid(1).localNode().id()); + + for (Integer key : keys) { + Collection<ClusterNode> nodes = new ArrayList<>(); + + nodes.addAll(cache.affinity().mapKeyToPrimaryAndBackups(key)); + + nodes.remove(primaryNode); + + nodeMap.put(key, nodes); + } + + info("Starting tx [values=" + map + ", topVer=" + + ((GridKernal)grid(1)).context().discovery().topologyVersion() + ']'); + + assertNotNull(cache); + + try (IgniteTx tx = cache.txStart()) { + cache.getAll(keys); + + // Should not send any messages. + cache.putAll(map); + + // Fail the node in the middle of transaction. + info(">>> Stopping primary node " + primaryNode); + + G.stop(G.ignite(primaryNode.id()).name(), true); + + info(">>> Stopped originating node, finishing transaction: " + primaryNode.id()); + + if (commmit) + tx.commit(); + else + tx.rollback(); + } + + boolean txFinished = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (GridKernal g : grids) { + GridCacheAdapter<?, ?> cache = g.internalCache(); + + IgniteTxManager txMgr = cache.isNear() ? + ((GridNearCacheAdapter)cache).dht().context().tm() : + cache.context().tm(); + + int txNum = txMgr.idMapSize(); + + if (txNum != 0) + return false; + } + + return true; + } + }, 10000); + + assertTrue(txFinished); + + info("Transactions finished."); + + for (Map.Entry<Integer, Collection<ClusterNode>> e : nodeMap.entrySet()) { + final Integer key = e.getKey(); + + final String val = map.get(key); + + assertFalse(e.getValue().isEmpty()); + + for (ClusterNode node : e.getValue()) { + final UUID checkNodeId = node.id(); + + compute(G.ignite(checkNodeId).cluster().forNode(node)).call(new Callable<Void>() { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + @Override public Void call() throws Exception { + GridCache<Integer, String> cache = ignite.cache(null); + + assertNotNull(cache); + + assertEquals("Failed to check entry value on node: " + checkNodeId, + !commmit ? initVal : val, cache.peek(key)); + + return null; + } + }); + } + } + + for (Map.Entry<Integer, String> e : map.entrySet()) { + for (Ignite g : G.allGrids()) + assertEquals(!commmit ? initVal : e.getValue(), g.cache(null).get(e.getKey())); + } + } + + /** + * @return All node IDs. + */ + private Collection<UUID> allNodeIds() { + Collection<UUID> nodeIds = new ArrayList<>(gridCount()); + + for (int i = 0; i < gridCount(); i++) + nodeIds.add(grid(i).localNode().id()); + + return nodeIds; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCommunicationSpi(new TcpCommunicationSpi() { + @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg) + throws IgniteSpiException { + if (getSpiContext().localNode().id().equals(failingNodeId)) { + if (ignoredMessage((GridIoMessage)msg) && ignoreMsgNodeIds != null) { + for (UUID ignored : ignoreMsgNodeIds) { + if (node.id().equals(ignored)) + return; + } + } + } + + super.sendMessage(node, msg); + } + }); + + cfg.getTransactionsConfiguration().setDefaultTxConcurrency(PESSIMISTIC); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setCacheStoreFactory(null); + cfg.setReadThrough(false); + cfg.setWriteThrough(false); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected abstract GridCacheMode cacheMode(); + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGridsMultiThreaded(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + // No-op + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + ignoreMsgCls = null; + ignoreMsgNodeIds = null; + } + + /** + * Checks if message should be ignored. + * + * @param msg Message. + * @return {@code True} if message should be ignored. + */ + private boolean ignoredMessage(GridIoMessage msg) { + Collection<Class<?>> ignoreClss = ignoreMsgCls; + + if (ignoreClss != null) { + for (Class<?> ignoreCls : ignoreClss) { + if (ignoreCls.isAssignableFrom(msg.message().getClass())) + return true; + } + + return false; + } + else + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java new file mode 100644 index 0000000..36546eb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.testframework.*; +import org.jetbrains.annotations.*; + +import javax.cache.processor.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Tests transaction during cache preloading. + */ +public abstract class IgniteTxPreloadAbstractTest extends GridCacheAbstractSelfTest { + /** */ + private static final int GRID_CNT = 6; + + /** */ + private static volatile boolean keyNotLoaded; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + keyNotLoaded = false; + + startGrid(0); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** + * @throws Exception If failed. + */ + public void testRemoteTxPreloading() throws Exception { + IgniteCache<String, Integer> cache = jcache(0); + + for (int i = 0; i < 10000; i++) + cache.put(String.valueOf(i), 0); + + final AtomicInteger gridIdx = new AtomicInteger(1); + + IgniteFuture<?> fut = GridTestUtils.runMultiThreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + int idx = gridIdx.getAndIncrement(); + + startGrid(idx); + + return null; + } + }, + GRID_CNT - 1, + "grid-starter-" + getName() + ); + + waitForRemoteNodes(grid(0), 2); + + Set<String> keys = new HashSet<>(); + + for (int i = 0; i < 10; i++) + keys.add(String.valueOf(i * 1000)); + + cache.invokeAll(keys, new EntryProcessor<String, Integer, Void>() { + @Override public Void process(MutableEntry<String, Integer> e, Object... args) { + Integer val = e.getValue(); + + if (val == null) { + keyNotLoaded = true; + + e.setValue(1); + + return null; + } + + e.setValue(val + 1); + + return null; + } + }); + + assertFalse(keyNotLoaded); + + fut.get(); + + for (int i = 0; i < GRID_CNT; i++) + // Wait for preloader. + cache(i).forceRepartition().get(); + + for (int i = 0; i < GRID_CNT; i++) { + for (String key : keys) + assertEquals("Unexpected value for cache " + i, (Integer)1, cache(i).get(key)); + } + } + + /** + * @throws Exception If failed. + */ + public void testLocalTxPreloadingOptimistic() throws Exception { + testLocalTxPreloading(OPTIMISTIC); + } + + /** + * @throws Exception If failed. + */ + public void testLocalTxPreloadingPessimistic() throws Exception { + testLocalTxPreloading(PESSIMISTIC); + } + + /** + * Tries to execute transaction doing transform when target key is not yet preloaded. + * + * @param txConcurrency Transaction concurrency; + * @throws Exception If failed. + */ + private void testLocalTxPreloading(IgniteTxConcurrency txConcurrency) throws Exception { + Map<String, Integer> map = new HashMap<>(); + + for (int i = 0; i < 10000; i++) + map.put(String.valueOf(i), 0); + + IgniteCache<String, Integer> cache0 = jcache(0); + + cache0.putAll(map); + + final String TX_KEY = "9000"; + + int expVal = 0; + + for (int i = 1; i < GRID_CNT; i++) { + assertEquals((Integer)expVal, cache0.get(TX_KEY)); + + startGrid(i); + + IgniteCache<String, Integer> cache = jcache(i); + + IgniteTransactions txs = ignite(i).transactions(); + + try (IgniteTx tx = txs.txStart(txConcurrency, IgniteTxIsolation.READ_COMMITTED)) { + cache.invoke(TX_KEY, new EntryProcessor<String, Integer, Void>() { + @Override public Void process(MutableEntry<String, Integer> e, Object... args) { + Integer val = e.getValue(); + + if (val == null) { + keyNotLoaded = true; + + e.setValue(1); + + return null; + } + + e.setValue(val + 1); + + return null; + } + }); + + tx.commit(); + } + + assertFalse(keyNotLoaded); + + expVal++; + + assertEquals((Integer)expVal, cache.get(TX_KEY)); + } + + for (int i = 0; i < GRID_CNT; i++) + assertEquals("Unexpected value for cache " + i, (Integer)expVal, cache(i).get(TX_KEY)); + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setPreloadMode(ASYNC); + + cfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheStoreFactory(null); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java new file mode 100644 index 0000000..d109137 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.transactions.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Simple cache test. + */ +public class IgniteTxTimeoutAbstractTest extends GridCommonAbstractTest { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Grid count. */ + private static final int GRID_COUNT = 2; + + /** Grid instances. */ + private static final List<Ignite> IGNITEs = new ArrayList<>(); + + /** Transaction timeout. */ + private static final long TIMEOUT = 50; + + /** + * @throws Exception If failed. + */ + @Override protected void beforeTestsStarted() throws Exception { + for (int i = 0; i < GRID_COUNT; i++) + IGNITEs.add(startGrid(i)); + } + + /** + * @throws Exception If failed. + */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + IGNITEs.clear(); + } + + /** + * @param i Grid index. + * @return Cache. + */ + @Override protected <K, V> GridCache<K, V> cache(int i) { + return IGNITEs.get(i).cache(null); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticReadCommitted() throws Exception { + checkTransactionTimeout(PESSIMISTIC, READ_COMMITTED); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticRepeatableRead() throws Exception { + checkTransactionTimeout(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticSerializable() throws Exception { + checkTransactionTimeout(PESSIMISTIC, SERIALIZABLE); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticReadCommitted() throws Exception { + checkTransactionTimeout(OPTIMISTIC, READ_COMMITTED); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticRepeatableRead() throws Exception { + checkTransactionTimeout(OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticSerializable() throws Exception { + checkTransactionTimeout(OPTIMISTIC, SERIALIZABLE); + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + * @throws IgniteCheckedException If test failed. + */ + private void checkTransactionTimeout(IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation) throws Exception { + + int idx = RAND.nextInt(GRID_COUNT); + + GridCache<Integer, String> cache = cache(idx); + + IgniteTx tx = cache.txStart(concurrency, isolation, TIMEOUT, 0); + + try { + info("Storing value in cache [key=1, val=1]"); + + cache.put(1, "1"); + + long sleep = TIMEOUT * 2; + + info("Going to sleep for (ms): " + sleep); + + Thread.sleep(sleep); + + info("Storing value in cache [key=1, val=2]"); + + cache.put(1, "2"); + + info("Committing transaction: " + tx); + + tx.commit(); + + assert false : "Timeout never happened for transaction: " + tx; + } + catch (IgniteTxTimeoutException e) { + info("Received expected timeout exception [msg=" + e.getMessage() + ", tx=" + tx + ']'); + } + finally { + tx.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAbstractPartitionedOnlyByteArrayValuesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAbstractPartitionedOnlyByteArrayValuesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAbstractPartitionedOnlyByteArrayValuesSelfTest.java index 23e34dc..4e95880 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAbstractPartitionedOnlyByteArrayValuesSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAbstractPartitionedOnlyByteArrayValuesSelfTest.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; import org.apache.ignite.spi.swapspace.file.*; import static org.apache.ignite.cache.GridCacheAtomicWriteOrderMode.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicExpiredEntriesPreloadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicExpiredEntriesPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicExpiredEntriesPreloadSelfTest.java index 5002961..34cdf21 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicExpiredEntriesPreloadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicExpiredEntriesPreloadSelfTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.cache.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; import static org.apache.ignite.cache.GridCacheAtomicityMode.*; import static org.apache.ignite.cache.GridCacheMode.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicReloadAllSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicReloadAllSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicReloadAllSelfTest.java index b609d29..f9fa4e3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicReloadAllSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicReloadAllSelfTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.cache.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; import static org.apache.ignite.cache.GridCacheAtomicityMode.*;