http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java index 5d0cacc..041a0f9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java @@ -27,12 +27,14 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.testframework.*; +import javax.cache.processor.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheRebalanceMode.*; /** * @@ -46,7 +48,9 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr /** * @return Keys count for the test. */ - protected abstract int keysCount(); + private int keysCount() { + return 10_000; + } /** {@inheritDoc} */ @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { @@ -54,7 +58,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr cfg.setAtomicWriteOrderMode(writeOrderMode()); cfg.setBackups(1); - cfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cfg.setRebalanceMode(SYNC); return cfg; } @@ -78,25 +82,47 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr protected CacheAtomicWriteOrderMode writeOrderMode() { return CLOCK; } + /** * @throws Exception If failed. */ public void testPut() throws Exception { - checkPut(false); + checkRetry(Test.PUT); + } + + /** + * @throws Exception If failed. + */ + public void testPutAll() throws Exception { + checkRetry(Test.PUT_ALL); } /** * @throws Exception If failed. */ public void testPutAsync() throws Exception { - checkPut(true); + checkRetry(Test.PUT_ASYNC); + } + + /** + * @throws Exception If failed. + */ + public void testInvoke() throws Exception { + checkRetry(Test.INVOKE); } /** - * @param async If {@code true} tests asynchronous put. * @throws Exception If failed. */ - private void checkPut(boolean async) throws Exception { + public void testInvokeAll() throws Exception { + checkRetry(Test.INVOKE_ALL); + } + + /** + * @param test Test type. + * @throws Exception If failed. + */ + private void checkRetry(Test test) throws Exception { final AtomicBoolean finished = new AtomicBoolean(); int keysCnt = keysCount(); @@ -115,52 +141,140 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr } }); + IgniteCache<Integer, Integer> cache = ignite(0).cache(null); - IgniteCache<Object, Object> cache = ignite(0).cache(null); + int iter = 0; - if (atomicityMode() == ATOMIC) - assertEquals(writeOrderMode(), cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode()); + try { + if (atomicityMode() == ATOMIC) + assertEquals(writeOrderMode(), cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode()); - int iter = 0; + long stopTime = System.currentTimeMillis() + 60_000; - long stopTime = System.currentTimeMillis() + 60_000; + switch (test) { + case PUT: { + while (System.currentTimeMillis() < stopTime) { + Integer val = ++iter; - if (async) { - IgniteCache<Object, Object> cache0 = cache.withAsync(); + for (int i = 0; i < keysCnt; i++) + cache.put(i, val); - while (System.currentTimeMillis() < stopTime) { - Integer val = ++iter; + for (int i = 0; i < keysCnt; i++) + assertEquals(val, cache.get(i)); + } - for (int i = 0; i < keysCnt; i++) { - cache0.put(i, val); + break; + } + + case PUT_ALL: { + while (System.currentTimeMillis() < stopTime) { + Integer val = ++iter; + + Map<Integer, Integer> map = new LinkedHashMap<>(); + + for (int i = 0; i < keysCnt; i++) { + map.put(i, val); - cache0.future().get(); + if (map.size() == 100 || i == keysCnt - 1) + cache.putAll(map); + } + + for (int i = 0; i < keysCnt; i++) + assertEquals(val, cache.get(i)); + } } - for (int i = 0; i < keysCnt; i++) { - cache0.get(i); + case PUT_ASYNC: { + IgniteCache<Integer, Integer> cache0 = cache.withAsync(); + + while (System.currentTimeMillis() < stopTime) { + Integer val = ++iter; + + for (int i = 0; i < keysCnt; i++) { + cache0.put(i, val); + + cache0.future().get(); + } - assertEquals(val, cache0.future().get()); + for (int i = 0; i < keysCnt; i++) { + cache0.get(i); + + assertEquals(val, cache0.future().get()); + } + } + + break; } - } - } - else { - while (System.currentTimeMillis() < stopTime) { - Integer val = ++iter; - for (int i = 0; i < keysCnt; i++) - cache.put(i, val); + case INVOKE: { + while (System.currentTimeMillis() < stopTime) { + Integer val = ++iter; + + Integer expOld = iter - 1; + + for (int i = 0; i < keysCnt; i++) { + Integer old = cache.invoke(i, new SetEntryProcessor(val)); + + assertNotNull(old); + assertTrue(old.equals(expOld) || old.equals(val)); + } + + for (int i = 0; i < keysCnt; i++) + assertEquals(val, cache.get(i)); + } + + break; + } + + case INVOKE_ALL: { + while (System.currentTimeMillis() < stopTime) { + Integer val = ++iter; + + Integer expOld = iter - 1; + + Set<Integer> keys = new LinkedHashSet<>(); + + for (int i = 0; i < keysCnt; i++) { + keys.add(i); + + if (keys.size() == 100 || i == keysCnt - 1) { + Map<Integer, EntryProcessorResult<Integer>> resMap = + cache.invokeAll(keys, new SetEntryProcessor(val)); + + for (Integer key : keys) { + EntryProcessorResult<Integer> res = resMap.get(key); + + assertNotNull(res); + + Integer old = res.get(); + + assertTrue(old.equals(expOld) || old.equals(val)); + } - for (int i = 0; i < keysCnt; i++) - assertEquals(val, cache.get(i)); + assertEquals(keys.size(), resMap.size()); + + keys.clear(); + } + } + + for (int i = 0; i < keysCnt; i++) + assertEquals(val, cache.get(i)); + } + + break; + } + + default: + assert false : test; } } - - finished.set(true); - fut.get(); + finally { + finished.set(true); + fut.get(); + } for (int i = 0; i < keysCnt; i++) - assertEquals(iter, cache.get(i)); + assertEquals((Integer)iter, cache.get(i)); } /** @@ -201,34 +315,41 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr try { int keysCnt = keysCount(); - boolean eThrown = false; + boolean eThrown = false; - IgniteCache<Object, Object> cache = ignite(0).cache(null).withNoRetries(); + IgniteCache<Object, Object> cache = ignite(0).cache(null).withNoRetries(); - if (async) - cache = cache.withAsync(); + if (async) + cache = cache.withAsync(); - for (int i = 0; i < keysCnt; i++) { - try { - if (async) { - cache.put(i, i); + long stopTime = System.currentTimeMillis() + 60_000; - cache.future().get(); + while (System.currentTimeMillis() < stopTime) { + for (int i = 0; i < keysCnt; i++) { + try { + if (async) { + cache.put(i, i); + + cache.future().get(); + } + else + cache.put(i, i); + } + catch (Exception e) { + assertTrue("Invalid exception: " + e, + X.hasCause(e, ClusterTopologyCheckedException.class, CachePartialUpdateException.class)); + + eThrown = true; + + break; + } } - else - cache.put(i, i); - } - catch (Exception e) { - assertTrue("Invalid exception: " + e, - X.hasCause(e, ClusterTopologyCheckedException.class, CachePartialUpdateException.class)); - - eThrown = true; + if (eThrown) break; - } } - assertTrue(eThrown); + assertTrue(eThrown); finished.set(true); @@ -243,4 +364,48 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr @Override protected long getTestTimeout() { return 3 * 60 * 1000; } + + /** + * + */ + enum Test { + /** */ + PUT, + + /** */ + PUT_ALL, + + /** */ + PUT_ASYNC, + + /** */ + INVOKE, + + /** */ + INVOKE_ALL + } + + /** + * + */ + class SetEntryProcessor implements CacheEntryProcessor<Integer, Integer, Integer> { + /** */ + private Integer val; + + /** + * @param val Value. + */ + public SetEntryProcessor(Integer val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public Integer process(MutableEntry<Integer, Integer> e, Object... args) { + Integer old = e.getValue(); + + e.setValue(val); + + return old == null ? 0 : old; + } + } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java index e76663a..be442d2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java @@ -16,7 +16,21 @@ */ package org.apache.ignite.internal.processors.cache.distributed.dht; +import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.transactions.*; + +import javax.cache.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; /** * @@ -24,11 +38,66 @@ import org.apache.ignite.cache.*; public class IgniteCachePutRetryAtomicSelfTest extends IgniteCachePutRetryAbstractSelfTest { /** {@inheritDoc} */ @Override protected CacheAtomicityMode atomicityMode() { - return CacheAtomicityMode.ATOMIC; + return ATOMIC; } - /** {@inheritDoc} */ - @Override protected int keysCount() { - return 60_000; + /** + * @throws Exception If failed. + */ + public void testPutInsideTransaction() throws Exception { + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + + ccfg.setName("tx-cache"); + ccfg.setAtomicityMode(TRANSACTIONAL); + + try (IgniteCache<Integer, Integer> txCache = ignite(0).getOrCreateCache(ccfg)) { + final AtomicBoolean finished = new AtomicBoolean(); + + IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!finished.get()) { + stopGrid(3); + + U.sleep(300); + + startGrid(3); + } + + return null; + } + }); + + try { + IgniteTransactions txs = ignite(0).transactions(); + + IgniteCache<Object, Object> cache = ignite(0).cache(null); + + long stopTime = System.currentTimeMillis() + 60_000; + + while (System.currentTimeMillis() < stopTime) { + for (int i = 0; i < 10_000; i++) { + try { + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + txCache.put(0, 0); + + cache.put(i, i); + + tx.commit(); + } + } + catch (IgniteException | CacheException e) { + log.info("Ignore exception: " + e); + } + } + } + + finished.set(true); + + fut.get(); + } + finally { + finished.set(true); + } + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java index 0ab5729..e113fcc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -32,6 +33,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; import static org.apache.ignite.transactions.TransactionConcurrency.*; import static org.apache.ignite.transactions.TransactionIsolation.*; @@ -44,12 +46,12 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr /** {@inheritDoc} */ @Override protected CacheAtomicityMode atomicityMode() { - return CacheAtomicityMode.TRANSACTIONAL; + return TRANSACTIONAL; } /** {@inheritDoc} */ - @Override protected int keysCount() { - return 20_000; + @Override protected NearCacheConfiguration nearConfiguration() { + return null; } /** @@ -74,7 +76,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr } }); - int keysCnt = keysCount(); + final int keysCnt = 20_000; try { for (int i = 0; i < keysCnt; i++) @@ -90,6 +92,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") public void testExplicitTransactionRetries() throws Exception { final AtomicInteger idx = new AtomicInteger(); int threads = 8; @@ -97,8 +100,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr final AtomicReferenceArray<Exception> err = new AtomicReferenceArray<>(threads); IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { - @Override - public Object call() throws Exception { + @Override public Object call() throws Exception { int th = idx.getAndIncrement(); int base = th * FACTOR; http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java index 5b9af4f..5bb1706 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java @@ -22,6 +22,8 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.testframework.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; + /** * */ @@ -37,11 +39,6 @@ public class IgniteCacheSslStartStopSelfTest extends IgniteCachePutRetryAbstract /** {@inheritDoc} */ @Override protected CacheAtomicityMode atomicityMode() { - return CacheAtomicityMode.ATOMIC; - } - - /** {@inheritDoc} */ - @Override protected int keysCount() { - return 60_000; + return ATOMIC; } }
