Fixed "IGNITE-2630 Make 'updateCntr' available through CacheInterceptor API".
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cead1e0d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cead1e0d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cead1e0d Branch: refs/heads/ignite-1786 Commit: cead1e0d31479f3925bfea092c29a85c4f40469d Parents: d3569f7 Author: nikolay_tikhonov <[email protected]> Authored: Thu Apr 7 20:06:12 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Thu Apr 7 20:06:33 2016 +0300 ---------------------------------------------------------------------- .../ignite/cache/CacheInterceptorEntry.java | 39 + .../cache/query/CacheQueryEntryEvent.java | 12 +- .../processors/cache/CacheLazyEntry.java | 47 +- .../processors/cache/GridCacheMapEntry.java | 19 +- ...erceptorPartitionCounterLocalSanityTest.java | 687 ++++++++++++ ...torPartitionCounterRandomOperationsTest.java | 1054 ++++++++++++++++++ .../IgniteCacheInterceptorSelfTestSuite.java | 2 + 7 files changed, 1844 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cead1e0d/modules/core/src/main/java/org/apache/ignite/cache/CacheInterceptorEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheInterceptorEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheInterceptorEntry.java new file mode 100644 index 0000000..61be00a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheInterceptorEntry.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache; + +import javax.cache.Cache; + +/** + * A cache interceptor map entry. + * + * @param <K> The type of key. + * @param <V> The type of value. + */ +public abstract class CacheInterceptorEntry<K, V> implements Cache.Entry<K, V> { + /** + * Each cache update increases partition counter. The same cache updates have on the same value of counter + * on primary and backup nodes. This value can be useful to communicate with external applications. + * The value has sense only for entries get by {@link CacheInterceptor#onAfterPut(Cache.Entry)} and + * {@link CacheInterceptor#onAfterRemove(Cache.Entry)} methods. For entries got by other methods will return + * {@code 0}. + * + * @return Value of counter for this entry. + */ + public abstract long getPartitionUpdateCounter(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cead1e0d/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java index 2c1c5e6..3a7994f 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java @@ -24,18 +24,18 @@ import javax.cache.event.EventType; /** * A Cache continuous query entry event. * - * @param <K> the type of key - * @param <V> the type of value + * @param <K> The type of key. + * @param <V> The type of value. */ public abstract class CacheQueryEntryEvent<K, V> extends CacheEntryEvent<K, V> { /** * Constructs a cache entry event from a given cache as source. * - * @param source the cache that originated the event - * @param eventType Event type. + * @param src The cache that originated the event. + * @param evtType Event type. */ - public CacheQueryEntryEvent(Cache source, EventType eventType) { - super(source, eventType); + public CacheQueryEntryEvent(Cache src, EventType evtType) { + super(src, evtType); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/cead1e0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java index c1fcb77..c8cfc99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java @@ -17,15 +17,15 @@ package org.apache.ignite.internal.processors.cache; -import javax.cache.Cache; import org.apache.ignite.Ignite; +import org.apache.ignite.cache.CacheInterceptorEntry; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; /** * */ -public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> { +public class CacheLazyEntry<K, V> extends CacheInterceptorEntry<K, V> { /** Cache context. */ protected GridCacheContext cctx; @@ -46,6 +46,9 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> { /** Keep binary flag. */ private boolean keepBinary; + /** Update counter. */ + private Long updateCntr; + /** * @param cctx Cache context. * @param keyObj Key cache object. @@ -78,6 +81,32 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> { * @param key Key value. * @param valObj Cache object * @param keepBinary Keep binary flag. + * @param updateCntr Partition update counter. + * @param val Cache value. + */ + public CacheLazyEntry(GridCacheContext ctx, + KeyCacheObject keyObj, + K key, + CacheObject valObj, + V val, + boolean keepBinary, + Long updateCntr + ) { + this.cctx = ctx; + this.keyObj = keyObj; + this.key = key; + this.valObj = valObj; + this.val = val; + this.keepBinary = keepBinary; + this.updateCntr = updateCntr; + } + + /** + * @param ctx Cache context. + * @param keyObj Key cache object. + * @param key Key value. + * @param valObj Cache object + * @param keepBinary Keep binary flag. * @param val Cache value. */ public CacheLazyEntry(GridCacheContext ctx, @@ -144,6 +173,20 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> { } /** {@inheritDoc} */ + @Override public long getPartitionUpdateCounter() { + return updateCntr == null ? 0L : updateCntr; + } + + /** + * Sets update counter. + * + * @param updateCntr Update counter. + */ + public void updateCounter(Long updateCntr) { + this.updateCntr = updateCntr; + } + + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <T> T unwrap(Class<T> cls) { if (cls.isAssignableFrom(Ignite.class)) http://git-wip-us.apache.org/repos/asf/ignite/blob/cead1e0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index a448307..1a052f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1258,7 +1258,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme cctx.store().put(tx, key, val, newVer); if (intercept) - cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0, keepBinary)); + cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0, keepBinary, updateCntr0)); return valid ? new GridCacheUpdateTxResult(true, retval ? old : null, updateCntr0) : new GridCacheUpdateTxResult(false, null); @@ -1308,7 +1308,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme IgniteBiTuple<Boolean, Object> interceptRes = null; - Cache.Entry entry0 = null; + CacheLazyEntry entry0 = null; Long updateCntr0; @@ -1473,8 +1473,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme onMarkedObsolete(); } - if (intercept) + if (intercept) { + entry0.updateCounter(updateCntr0); + cctx.config().getInterceptor().onAfterRemove(entry0); + } if (valid) { CacheObject ret; @@ -1820,9 +1823,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (intercept) { if (op == GridCacheOperation.UPDATE) - cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary)); + cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary, 0L)); else - cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary)); + cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary, 0L)); } } @@ -2394,7 +2397,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme else { if (intercept) { interceptRes = cctx.config().getInterceptor().onBeforeRemove(new CacheLazyEntry(cctx, key, key0, - oldVal, old0, keepBinary)); + oldVal, old0, keepBinary, updateCntr0)); if (cctx.cancelRemove(interceptRes)) return new GridCacheUpdateAtomicResult(false, @@ -2497,9 +2500,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (intercept) { if (op == GridCacheOperation.UPDATE) - cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary)); + cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary, updateCntr0)); else - cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, oldVal, old0, keepBinary)); + cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, oldVal, old0, keepBinary, updateCntr0)); if (interceptRes != null) oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())); http://git-wip-us.apache.org/repos/asf/ignite/blob/cead1e0d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterLocalSanityTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterLocalSanityTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterLocalSanityTest.java new file mode 100644 index 0000000..5db2781 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterLocalSanityTest.java @@ -0,0 +1,687 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.Random; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheInterceptorAdapter; +import org.apache.ignite.cache.CacheInterceptorEntry; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.eclipse.jetty.util.BlockingArrayQueue; +import org.jetbrains.annotations.NotNull; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.LOCAL; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * + */ +public class CacheInterceptorPartitionCounterLocalSanityTest extends GridCommonAbstractTest { + /** */ + private static final int NODES = 1; + + /** */ + private static final int KEYS = 50; + + /** */ + private static final int VALS = 10; + + /** */ + private static final int ITERATION_CNT = 100; + + /** */ + private static BlockingQueue<Cache.Entry<TestKey, TestValue>> afterPutEvts; + + /** */ + private static BlockingQueue<Cache.Entry<TestKey, TestValue>> afterRmvEvts; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrid(NODES - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + afterPutEvts = new BlockingArrayQueue<>(); + afterRmvEvts = new BlockingArrayQueue<>(); + } + + /** + * @throws Exception If failed. + */ + public void testLocal() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration( + ATOMIC, + ONHEAP_TIERED, + false); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testLocalWithStore() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration( + ATOMIC, + ONHEAP_TIERED, + true); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testLocalTx() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration( + TRANSACTIONAL, + ONHEAP_TIERED, + false); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testLocalTxWithStore() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration( + TRANSACTIONAL, + ONHEAP_TIERED, + true); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + private void doTestPartitionCounterOperation(CacheConfiguration<Object, Object> ccfg) + throws Exception { + ignite(0).createCache(ccfg); + + try { + long seed = System.currentTimeMillis(); + + Random rnd = new Random(seed); + + log.info("Random seed: " + seed); + + ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>(); + + for (int i = 0; i < ITERATION_CNT; i++) { + if (i % 20 == 0) + log.info("Iteration: " + i); + + randomUpdate(rnd, expData, grid(0).cache(ccfg.getName())); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param rnd Random generator. + * @param expData Expected cache data. + * @param cache Cache. + * @throws Exception If failed. + */ + private void randomUpdate( + Random rnd, + ConcurrentMap<Object, Object> expData, + IgniteCache<Object, Object> cache) + throws Exception { + Object key = new TestKey(rnd.nextInt(KEYS)); + Object newVal = value(rnd); + Object oldVal = expData.get(key); + + int op = rnd.nextInt(11); + + Ignite ignite = cache.unwrap(Ignite.class); + + Transaction tx = null; + + if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean()) + tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd)); + + try { + //log.info("Random operation [key=" + key + ", op=" + op + ']'); + + switch (op) { + case 0: { + cache.put(key, newVal); + + if (tx != null) + tx.commit(); + + waitAndCheckEvent(key, newVal, oldVal, false); + + expData.put(key, newVal); + + break; + } + + case 1: { + cache.getAndPut(key, newVal); + + if (tx != null) + tx.commit(); + + waitAndCheckEvent(key, newVal, oldVal, false); + + expData.put(key, newVal); + + break; + } + + case 2: { + cache.remove(key); + + if (tx != null) + tx.commit(); + + waitAndCheckEvent(key, null, oldVal, true); + + expData.remove(key); + + break; + } + + case 3: { + cache.getAndRemove(key); + + if (tx != null) + tx.commit(); + + waitAndCheckEvent(key, null, oldVal, true); + + expData.remove(key); + + break; + } + + case 4: { + cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean())); + + if (tx != null) + tx.commit(); + + waitAndCheckEvent(key, newVal, oldVal, false); + + expData.put(key, newVal); + + break; + } + + case 5: { + cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean())); + + if (tx != null) + tx.commit(); + + waitAndCheckEvent(key, null, oldVal, true); + + expData.remove(key); + + break; + } + + case 6: { + cache.putIfAbsent(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal == null) { + waitAndCheckEvent(key, newVal, null, false); + + expData.put(key, newVal); + } + else + checkNoEvent(afterPutEvts); + + break; + } + + case 7: { + cache.getAndPutIfAbsent(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal == null) { + waitAndCheckEvent(key, newVal, null, false); + + expData.put(key, newVal); + } + else + checkNoEvent(afterPutEvts); + + break; + } + + case 8: { + cache.replace(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal != null) { + waitAndCheckEvent(key, newVal, oldVal, false); + + expData.put(key, newVal); + } + else + checkNoEvent(afterPutEvts); + + break; + } + + case 9: { + cache.getAndReplace(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal != null) { + waitAndCheckEvent(key, newVal, oldVal, false); + + expData.put(key, newVal); + } + else + checkNoEvent(afterPutEvts); + + break; + } + + case 10: { + if (oldVal != null) { + Object replaceVal = value(rnd); + + boolean success = replaceVal.equals(oldVal); + + if (success) { + cache.replace(key, replaceVal, newVal); + + if (tx != null) + tx.commit(); + + waitAndCheckEvent(key, newVal, oldVal, false); + + expData.put(key, newVal); + } + else { + cache.replace(key, replaceVal, newVal); + + if (tx != null) + tx.commit(); + + checkNoEvent(afterPutEvts); + } + } + else { + cache.replace(key, value(rnd), newVal); + + if (tx != null) + tx.commit(); + + checkNoEvent(afterPutEvts); + } + + break; + } + + default: + fail("Op:" + op); + } + } + finally { + if (tx != null) + tx.close(); + } + } + + /** + * @param rnd {@link Random}. + * @return {@link TransactionIsolation}. + */ + private TransactionIsolation txRandomIsolation(Random rnd) { + int val = rnd.nextInt(3); + + if (val == 0) + return READ_COMMITTED; + else if (val == 1) + return REPEATABLE_READ; + else + return SERIALIZABLE; + } + + /** + * @param rnd {@link Random}. + * @return {@link TransactionConcurrency}. + */ + private TransactionConcurrency txRandomConcurrency(Random rnd) { + return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC; + } + + /** + * @param rnd Random generator. + * @return Cache value. + */ + private static Object value(Random rnd) { + return new TestValue(rnd.nextInt(VALS)); + } + + /** + * @param key Key. + * @param val Value. + * @param oldVal Old value. + * @param rmv Remove operation. + * @throws Exception If failed. + */ + private void waitAndCheckEvent( + Object key, + Object val, + Object oldVal, + boolean rmv) + throws Exception { + BlockingQueue<Cache.Entry<TestKey, TestValue>> evtsQueue = rmv ? afterRmvEvts : afterPutEvts; + + if (val == null && oldVal == null) { + checkNoEvent(evtsQueue); + + return; + } + + Cache.Entry<TestKey, TestValue> entry = evtsQueue.poll(5, SECONDS); + + assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', entry); + assertEquals(key, entry.getKey()); + assertEquals(rmv ? oldVal : val, entry.getValue()); + + CacheInterceptorEntry interceptorEntry = entry.unwrap(CacheInterceptorEntry.class); + + assertNotNull(interceptorEntry); + + // For local cache partition counter always zero. + assertEquals(0, interceptorEntry.getPartitionUpdateCounter()); + + assertNull(evtsQueue.peek()); + } + + /** + * @param evtsQueue Event queue. + * @throws Exception If failed. + */ + private void checkNoEvent(BlockingQueue<Cache.Entry<TestKey, TestValue>> evtsQueue) throws Exception { + Cache.Entry<TestKey, TestValue> evt = evtsQueue.poll(50, MILLISECONDS); + + assertTrue(evt == null || evt.getValue() == null); + } + + /** + * @param atomicityMode Cache atomicity mode. + * @param memoryMode Cache memory mode. + * @param store If {@code true} configures dummy cache store. + * @return Cache configuration. + */ + protected CacheConfiguration<Object, Object> cacheConfiguration( + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode, + boolean store) { + CacheConfiguration<TestKey, TestValue> ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(atomicityMode); + ccfg.setMemoryMode(memoryMode); + ccfg.setCacheMode(LOCAL); + + if (store) { + ccfg.setCacheStoreFactory(new TestStoreFactory()); + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + } + + ccfg.setInterceptor(new TestInterceptor()); + + return (CacheConfiguration)ccfg; + } + + /** + * + */ + private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> { + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public CacheStore<Object, Object> create() { + return new CacheStoreAdapter() { + @Override public Object load(Object key) throws CacheLoaderException { + return null; + } + + @Override public void write(Cache.Entry entry) throws CacheWriterException { + // No-op. + } + + @Override public void delete(Object key) throws CacheWriterException { + // No-op. + } + }; + } + } + + /** + * + */ + public static class TestKey implements Serializable, Comparable { + /** */ + private final Integer key; + + /** + * @param key Key. + */ + public TestKey(Integer key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestKey that = (TestKey)o; + + return key.equals(that.key); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestKey.class, this); + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull Object o) { + return key - ((TestKey)o).key; + } + } + + /** + * + */ + private static class TestInterceptor extends CacheInterceptorAdapter<TestKey, TestValue> { + /** {@inheritDoc} */ + @Override public void onAfterPut(Cache.Entry<TestKey, TestValue> e) { + e.getKey(); + e.getValue(); + + afterPutEvts.add(e); + } + + /** {@inheritDoc} */ + @Override public void onAfterRemove(Cache.Entry<TestKey, TestValue> e) { + e.getKey(); + e.getValue(); + + afterRmvEvts.add(e); + } + } + + /** + * + */ + public static class TestValue implements Serializable { + /** */ + @GridToStringInclude + protected final Integer val1; + + /** */ + @GridToStringInclude + protected final String val2; + + /** + * @param val Value. + */ + public TestValue(Integer val) { + this.val1 = val; + this.val2 = String.valueOf(val); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestValue that = (TestValue) o; + + return val1.equals(that.val1) && val2.equals(that.val2); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = val1.hashCode(); + + res = 31 * res + val2.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestValue.class, this); + } + } + + /** + * + */ + private static class EntrySetValueProcessor implements EntryProcessor<Object, Object, Object> { + /** */ + private Object val; + + /** */ + private boolean retOld; + + /** + * @param val Value to set. + * @param retOld Return old value flag. + */ + EntrySetValueProcessor(Object val, boolean retOld) { + this.val = val; + this.retOld = retOld; + } + + /** {@inheritDoc} */ + @Override public Object process(MutableEntry<Object, Object> e, Object... args) { + Object old = retOld ? e.getValue() : null; + + if (val != null) + e.setValue(val); + else + e.remove(); + + return old; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(EntrySetValueProcessor.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cead1e0d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java new file mode 100644 index 0000000..055374d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheInterceptorPartitionCounterRandomOperationsTest.java @@ -0,0 +1,1054 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheInterceptorAdapter; +import org.apache.ignite.cache.CacheInterceptorEntry; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.eclipse.jetty.util.BlockingArrayQueue; +import org.jetbrains.annotations.NotNull; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * + */ +public class CacheInterceptorPartitionCounterRandomOperationsTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 5; + + /** */ + private static final int KEYS = 50; + + /** */ + private static final int VALS = 10; + + /** */ + public static final int ITERATION_CNT = 100; + + /** */ + private boolean client; + + /** */ + private static ConcurrentMap<UUID, BlockingQueue<Cache.Entry<TestKey, TestValue>>> + afterPutEvts = new ConcurrentHashMap<>(); + + /** */ + private static ConcurrentMap<UUID, BlockingQueue<Cache.Entry<TestKey, TestValue>>> + afterRmvEvts = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES - 1); + + client = true; + + startGrid(NODES - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + afterPutEvts.clear(); + afterRmvEvts.clear(); + + for (int i = 0; i < NODES; i++) { + afterRmvEvts.put(grid(i).cluster().localNode().id(), + new BlockingArrayQueue<Cache.Entry<TestKey, TestValue>>()); + afterPutEvts.put(grid(i).cluster().localNode().id(), + new BlockingArrayQueue<Cache.Entry<TestKey, TestValue>>()); + } + } + + /** + * @throws Exception If failed. + */ + public void testAtomic() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + ONHEAP_TIERED, + false); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicWithStore() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + ONHEAP_TIERED, + true); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicated() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, + 0, + ATOMIC, + ONHEAP_TIERED, + false); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedWithStore() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, + 0, + ATOMIC, + ONHEAP_TIERED, + true); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapValues() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + OFFHEAP_VALUES, + false); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapValuesWithStore() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + OFFHEAP_VALUES, + true); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTiered() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + OFFHEAP_TIERED, + false); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicNoBackups() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 0, + ATOMIC, + ONHEAP_TIERED, + false); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTx() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + ONHEAP_TIERED, + false); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxWithStore() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + ONHEAP_TIERED, + true); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxExplicit() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + ONHEAP_TIERED, + false); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxReplicated() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, + 0, + TRANSACTIONAL, + ONHEAP_TIERED, + false); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxReplicatedWithStore() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, + 0, + TRANSACTIONAL, + ONHEAP_TIERED, + true); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapValues() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + OFFHEAP_VALUES, + false); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapValuesExplicit() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + OFFHEAP_VALUES, + false); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapTiered() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + OFFHEAP_TIERED, + false); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapTieredExplicit() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + OFFHEAP_TIERED, + false); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoBackups() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 0, + TRANSACTIONAL, + ONHEAP_TIERED, + false); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoBackupsWithStore() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 0, + TRANSACTIONAL, + ONHEAP_TIERED, + true); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoBackupsExplicit() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 0, + TRANSACTIONAL, + ONHEAP_TIERED, + false); + + doTestPartitionCounterOperation(ccfg); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + protected void doTestPartitionCounterOperation(CacheConfiguration<Object, Object> ccfg) + throws Exception { + ignite(0).createCache(ccfg); + + try { + long seed = System.currentTimeMillis(); + + Random rnd = new Random(seed); + + log.info("Random seed: " + seed); + + ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>(); + + Map<Integer, Long> partCntr = new ConcurrentHashMap<>(); + + for (int i = 0; i < ITERATION_CNT; i++) { + if (i % 20 == 0) + log.info("Iteration: " + i); + + for (int idx = 0; idx < NODES; idx++) + randomUpdate(rnd, expData, partCntr, grid(idx).cache(ccfg.getName())); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param rnd Random generator. + * @param expData Expected cache data. + * @param partCntr Partition counter. + * @param cache Cache. + * @throws Exception If failed. + */ + private void randomUpdate( + Random rnd, + ConcurrentMap<Object, Object> expData, + Map<Integer, Long> partCntr, + IgniteCache<Object, Object> cache) + throws Exception { + Object key = new TestKey(rnd.nextInt(KEYS)); + Object newVal = value(rnd); + Object oldVal = expData.get(key); + + int op = rnd.nextInt(11); + + Ignite ignite = cache.unwrap(Ignite.class); + + Transaction tx = null; + + if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean()) + tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd)); + + try { + //log.info("Random operation [key=" + key + ", op=" + op + ']'); + + switch (op) { + case 0: { + cache.put(key, newVal); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false); + + expData.put(key, newVal); + + break; + } + + case 1: { + cache.getAndPut(key, newVal); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false); + + expData.put(key, newVal); + + break; + } + + case 2: { + cache.remove(key); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(cache, partCntr, affinity(cache), key, null, oldVal, true); + + expData.remove(key); + + break; + } + + case 3: { + cache.getAndRemove(key); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(cache, partCntr, affinity(cache), key, null, oldVal, true); + + expData.remove(key); + + break; + } + + case 4: { + cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean())); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false); + + expData.put(key, newVal); + + break; + } + + case 5: { + cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean())); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(cache, partCntr, affinity(cache), key, null, oldVal, true); + + expData.remove(key); + + break; + } + + case 6: { + cache.putIfAbsent(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal == null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, null, false); + + expData.put(key, newVal); + } + else + checkNoEvent(getInterceptorQueues(cache, key, false)); + + break; + } + + case 7: { + cache.getAndPutIfAbsent(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal == null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, null, false); + + expData.put(key, newVal); + } + else + checkNoEvent(getInterceptorQueues(cache, key, false)); + + break; + } + + case 8: { + cache.replace(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal != null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false); + + expData.put(key, newVal); + } + else + checkNoEvent(getInterceptorQueues(cache, key, false)); + + break; + } + + case 9: { + cache.getAndReplace(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal != null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false); + + expData.put(key, newVal); + } + else + checkNoEvent(getInterceptorQueues(cache, key, false)); + + break; + } + + case 10: { + if (oldVal != null) { + Object replaceVal = value(rnd); + + boolean success = replaceVal.equals(oldVal); + + if (success) { + cache.replace(key, replaceVal, newVal); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(cache, partCntr, affinity(cache), key, newVal, oldVal, false); + + expData.put(key, newVal); + } + else { + cache.replace(key, replaceVal, newVal); + + if (tx != null) + tx.commit(); + + checkNoEvent(getInterceptorQueues(cache, key, false)); + } + } + else { + cache.replace(key, value(rnd), newVal); + + if (tx != null) + tx.commit(); + + checkNoEvent(getInterceptorQueues(cache, key, false)); + } + + break; + } + + default: + fail("Op:" + op); + } + } + finally { + if (tx != null) + tx.close(); + } + } + + /** + * @param cache Cache. + * @param key Key. + * @param rmv Remove operation. + * @return Queues. + */ + @NotNull private List<BlockingQueue<Cache.Entry<TestKey, TestValue>>> getInterceptorQueues( + IgniteCache<Object, Object> cache, + Object key, + boolean rmv + ) { + Collection<ClusterNode> nodes = + cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL ? + affinity(cache).mapKeyToPrimaryAndBackups(key) : + Collections.singletonList(affinity(cache).mapKeyToNode(key)); + + assert nodes.size() > 0; + + List<BlockingQueue<Cache.Entry<TestKey, TestValue>>> queues = new ArrayList<>(); + + for (ClusterNode node : nodes) + queues.add(rmv ? afterRmvEvts.get(node.id()) : afterPutEvts.get(node.id())); + + return queues; + } + + /** + * @param rnd {@link Random}. + * @return {@link TransactionIsolation}. + */ + private TransactionIsolation txRandomIsolation(Random rnd) { + int val = rnd.nextInt(3); + + if (val == 0) + return READ_COMMITTED; + else if (val == 1) + return REPEATABLE_READ; + else + return SERIALIZABLE; + } + + /** + * @param rnd {@link Random}. + * @return {@link TransactionConcurrency}. + */ + private TransactionConcurrency txRandomConcurrency(Random rnd) { + return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC; + } + + /** + * @param cache Cache. + * @param key Key + * @param cntrs Partition counters. + */ + private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) { + Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName()); + + int part = aff.partition(key); + + Long partCntr = cntrs.get(part); + + if (partCntr == null) + partCntr = 0L; + + cntrs.put(part, ++partCntr); + } + + /** + * @param rnd Random generator. + * @return Cache value. + */ + private static Object value(Random rnd) { + return new TestValue(rnd.nextInt(VALS)); + } + + /** + * @param cache Ignite cache. + * @param partCntrs Partition counters. + * @param aff Affinity function. + * @param key Key. + * @param val Value. + * @param oldVal Old value. + * @param rmv Remove operation. + * @throws Exception If failed. + */ + private void waitAndCheckEvent(IgniteCache<Object, Object> cache, + Map<Integer, Long> partCntrs, + Affinity<Object> aff, + Object key, + Object val, + Object oldVal, + boolean rmv) + throws Exception { + Collection<BlockingQueue<Cache.Entry<TestKey, TestValue>>> entries = getInterceptorQueues(cache, key, + rmv); + + assert !entries.isEmpty(); + + if (val == null && oldVal == null) { + checkNoEvent(entries); + + return; + } + + for (BlockingQueue<Cache.Entry<TestKey, TestValue>> evtsQueue : entries) { + Cache.Entry<TestKey, TestValue> entry = evtsQueue.poll(5, SECONDS); + + assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', entry); + assertEquals(key, entry.getKey()); + assertEquals(rmv ? oldVal : val, entry.getValue()); + + long cntr = partCntrs.get(aff.partition(key)); + CacheInterceptorEntry interceptorEntry = entry.unwrap(CacheInterceptorEntry.class); + + assertNotNull(cntr); + assertNotNull(interceptorEntry); + + assertEquals(cntr, interceptorEntry.getPartitionUpdateCounter()); + + assertNull(evtsQueue.peek()); + } + } + + /** + * @param evtsQueues Event queue. + * @throws Exception If failed. + */ + private void checkNoEvent(Collection<BlockingQueue<Cache.Entry<TestKey, TestValue>>> evtsQueues) + throws Exception { + for (BlockingQueue<Cache.Entry<TestKey, TestValue>> evtsQueue : evtsQueues) { + Cache.Entry<TestKey, TestValue> evt = evtsQueue.poll(50, MILLISECONDS); + + assertTrue(evt == null || evt.getValue() == null); + } + } + + /** + * + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @param atomicityMode Cache atomicity mode. + * @param memoryMode Cache memory mode. + * @param store If {@code true} configures dummy cache store. + * @return Cache configuration. + */ + protected CacheConfiguration<Object, Object> cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode, + boolean store) { + CacheConfiguration<TestKey, TestValue> ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setMemoryMode(memoryMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + if (store) { + ccfg.setCacheStoreFactory(new TestStoreFactory()); + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + } + + ccfg.setInterceptor(new TestInterceptor()); + + return (CacheConfiguration)ccfg; + } + + /** + * + */ + private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> { + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public CacheStore<Object, Object> create() { + return new CacheStoreAdapter() { + @Override public Object load(Object key) throws CacheLoaderException { + return null; + } + + @Override public void write(Cache.Entry entry) throws CacheWriterException { + // No-op. + } + + @Override public void delete(Object key) throws CacheWriterException { + // No-op. + } + }; + } + } + + /** + * + */ + public static class TestKey implements Serializable, Comparable { + /** */ + private final Integer key; + + /** + * @param key Key. + */ + public TestKey(Integer key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestKey that = (TestKey)o; + + return key.equals(that.key); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestKey.class, this); + } + + /** {@inheritDoc} */ + @Override public int compareTo(Object o) { + return key - ((TestKey)o).key; + } + } + + /** + * + */ + private static class TestInterceptor extends CacheInterceptorAdapter<TestKey, TestValue> { + /** {@inheritDoc} */ + @Override public void onAfterPut(Cache.Entry<TestKey, TestValue> e) { + e.getKey(); + e.getValue(); + + UUID id = e.unwrap(Ignite.class).cluster().localNode().id(); + + BlockingQueue<Cache.Entry<TestKey, TestValue>> ents = afterPutEvts.get(id); + + if (ents == null) { + ents = new BlockingArrayQueue<>(); + + BlockingQueue<Cache.Entry<TestKey, TestValue>> oldVal = afterPutEvts.putIfAbsent(id, ents); + + ents = oldVal == null ? ents : oldVal; + } + + ents.add(e); + } + + /** {@inheritDoc} */ + @Override public void onAfterRemove(Cache.Entry<TestKey, TestValue> e) { + e.getKey(); + e.getValue(); + + UUID id = e.unwrap(Ignite.class).cluster().localNode().id(); + + BlockingQueue<Cache.Entry<TestKey, TestValue>> ents = afterRmvEvts.get(id); + + if (ents == null) { + ents = new BlockingArrayQueue<>(); + + BlockingQueue<Cache.Entry<TestKey, TestValue>> oldVal = afterRmvEvts.putIfAbsent(id, ents); + + ents = oldVal == null ? ents : oldVal; + } + + ents.add(e); + } + } + + /** + * + */ + public static class TestValue implements Serializable { + /** */ + @GridToStringInclude + protected final Integer val1; + + /** */ + @GridToStringInclude + protected final String val2; + + /** + * @param val Value. + */ + public TestValue(Integer val) { + this.val1 = val; + this.val2 = String.valueOf(val); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestValue that = (TestValue) o; + + return val1.equals(that.val1) && val2.equals(that.val2); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = val1.hashCode(); + + res = 31 * res + val2.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestValue.class, this); + } + } + + /** + * + */ + protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, Object> { + /** */ + private Object val; + + /** */ + private boolean retOld; + + /** + * @param val Value to set. + * @param retOld Return old value flag. + */ + public EntrySetValueProcessor(Object val, boolean retOld) { + this.val = val; + this.retOld = retOld; + } + + /** {@inheritDoc} */ + @Override public Object process(MutableEntry<Object, Object> e, Object... args) { + Object old = retOld ? e.getValue() : null; + + if (val != null) + e.setValue(val); + else + e.remove(); + + return old; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(EntrySetValueProcessor.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cead1e0d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java index 9b219b7..d19ecd7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java @@ -55,6 +55,8 @@ public class IgniteCacheInterceptorSelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheOnCopyFlagReplicatedSelfTest.class); suite.addTestSuite(GridCacheOnCopyFlagLocalSelfTest.class); suite.addTestSuite(GridCacheOnCopyFlagAtomicSelfTest.class); + suite.addTestSuite(CacheInterceptorPartitionCounterRandomOperationsTest.class); + suite.addTestSuite(CacheInterceptorPartitionCounterLocalSanityTest.class); return suite; }
