IGNITE-3004 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0986fa98 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0986fa98 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0986fa98 Branch: refs/heads/ignite-3004 Commit: 0986fa988f4ce0155351a9fa577aca59893d3f9f Parents: bfeb3b6 Author: Tikhonov Nikolay <[email protected]> Authored: Sun Apr 17 18:19:14 2016 +0300 Committer: Tikhonov Nikolay <[email protected]> Committed: Sun Apr 17 18:19:14 2016 +0300 ---------------------------------------------------------------------- .../CacheContinuousQueryVariationsTest.java | 236 ++++++++++++++++--- .../IgniteBinaryCacheQueryTestSuite4.java | 4 +- 2 files changed, 205 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0986fa98/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryVariationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryVariationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryVariationsTest.java index 8aae50f..c85246b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryVariationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryVariationsTest.java @@ -17,6 +17,10 @@ package org.apache.ignite.internal.processors.cache.query.continuous; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; @@ -28,14 +32,22 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.configuration.MutableCacheEntryListenerConfiguration; +import javax.cache.event.CacheEntryCreatedListener; import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryExpiredListener; +import javax.cache.event.CacheEntryListener; import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryRemovedListener; import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.event.EventType; import javax.cache.processor.EntryProcessor; import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; @@ -55,6 +67,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static javax.cache.event.EventType.CREATED; import static javax.cache.event.EventType.REMOVED; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryVariationsTest.SerializableFilter.isAccepted; import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; @@ -75,51 +88,132 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati /** * @throws Exception If failed. */ - private void testRandomSingleOperation() throws Exception { - long seed = System.currentTimeMillis(); + public void testRandomSingleOperationJCacheApi() throws Exception { + testRandomSingleOperation(true, false, false); + } - Random rnd = new Random(seed); + /** + * @throws Exception If failed. + */ + public void testRandomSingleOperationJCacheApiWithFilter() throws Exception { + testRandomSingleOperation(true, false, true); + } - log.info("Random seed: " + seed); + /** + * @throws Exception If failed. + */ + public void testRandomSingleOperationJCacheApiSync() throws Exception { + testRandomSingleOperation(true, true, false); + } - // Register listener on all nodes. - List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>(); + /** + * @throws Exception If failed. + */ + public void testRandomSingleOperationJCacheApiSyncWithFilter() throws Exception { + testRandomSingleOperation(true, true, true); + } - Collection<QueryCursor<?>> curs = new ArrayList<>(); + /** + * @throws Exception If failed. + */ + public void testRandomSingleOperation() throws Exception { + testRandomSingleOperation(true, true, false); + } - for (int idx = 0; idx < G.allGrids().size(); idx++) { - final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); + /** + * @throws Exception If failed. + */ + public void testRandomSingleOperationWithFilter() throws Exception { + testRandomSingleOperation(true, true, true); + } - ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + /** + * @param jcacheApi Use JCache API. + * @param syncNtf Use sync notification. + * @param withFilter Use filter. + * @throws Exception If failed. + */ + private void testRandomSingleOperation(final boolean jcacheApi, final boolean syncNtf, final boolean withFilter) + throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + long seed = System.currentTimeMillis(); - qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) - throws CacheEntryListenerException { - for (CacheEntryEvent<?, ?> evt : evts) - evtsQueue.add(evt); - } - }); + Random rnd = new Random(seed); - curs.add(jcache(idx).query(qry)); + log.info("Random seed: " + seed); - evtsQueues.add(evtsQueue); - } + // Register listener on all nodes. + List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>(); - ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>(); + Collection<QueryCursor<?>> curs = new ArrayList<>(); - try { - for (int i = 0; i < ITERATION_CNT; i++) { - if (i % 20 == 0) - log.info("Iteration: " + i); + Collection<MutableCacheEntryListenerConfiguration> lsnrCfgs = new ArrayList<>(); + + for (int idx = 0; idx < G.allGrids().size(); idx++) { + final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); + + final CacheEntryUpdatedListener<Object, Object> lsnr = + new LocalNonSerialiseListener() { + @Override protected void onEvents(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) + evtsQueue.add(evt); + } + }; + + if (jcacheApi) { + MutableCacheEntryListenerConfiguration<Object, Object> lsnrCfg = + new MutableCacheEntryListenerConfiguration<>( + new Factory<CacheEntryListener<? super Object, ? super Object>>() { + @Override public CacheEntryListener<? super Object, ? super Object> create() { + return lsnr; + } + }, + withFilter ? FactoryBuilder.factoryOf(SerializableFilter.class) : null, + true, + syncNtf + ); + + jcache(idx).registerCacheEntryListener(lsnrCfg); + + lsnrCfgs.add(lsnrCfg); + } + else { + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + qry.setRemoteFilterFactory(withFilter ? + FactoryBuilder.factoryOf(SerializableFilter.class) : null); - for (int idx = 0; idx < G.allGrids().size(); idx++) - randomUpdate(rnd, evtsQueues, expData, jcache(idx)); + curs.add(jcache(idx).query(qry)); + + evtsQueues.add(evtsQueue); + } + } + + ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>(); + + try { + for (int i = 0; i < ITERATION_CNT; i++) { + if (i % 20 == 0) + log.info("Iteration: " + i); + + for (int idx = 0; idx < G.allGrids().size(); idx++) + randomUpdate(rnd, evtsQueues, expData, jcache(idx)); + } + } + finally { + for (QueryCursor<?> cur : curs) + cur.close(); + + for (int i = 0; i < G.allGrids().size(); i++) { + for (MutableCacheEntryListenerConfiguration cfg : lsnrCfgs) + jcache(i).deregisterCacheEntryListener(cfg); + } + } } - } - finally { - for (QueryCursor<?> cur : curs) - cur.close(); - } + }); } /** @@ -383,7 +477,7 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati Object val, Object oldVal) throws Exception { - if (val == null && oldVal == null) { + if (val == null && oldVal == null || (val != null && !isAccepted(val))) { checkNoEvent(evtsQueues); return; @@ -677,4 +771,80 @@ public class CacheContinuousQueryVariationsTest extends IgniteCacheConfigVariati return S.toString(EntrySetValueProcessor.class, this); } } + + /** + * + */ + protected static class SerializableFilter implements CacheEntryEventSerializableFilter<Object, Object> { + /** */ + public SerializableFilter() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<?, ?> event) + throws CacheEntryListenerException { + return isAccepted(event.getValue()); + } + + /** + * @return {@code True} if value is even. + */ + public static boolean isAccepted(Object val) { + if (val != null) + assert val instanceof TestObject; + + return val == null || ((TestObject)val).value() % 2 == 0; + } + } + + /** + * + */ + public abstract class LocalNonSerialiseListener implements + CacheEntryUpdatedListener<Object, Object>, + CacheEntryCreatedListener<Object, Object>, + CacheEntryExpiredListener<Object, Object>, + CacheEntryRemovedListener<Object, Object>, + Externalizable { + /** */ + public LocalNonSerialiseListener() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) throws CacheEntryListenerException { + onEvents(evts); + } + + /** {@inheritDoc} */ + @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) throws CacheEntryListenerException { + onEvents(evts); + } + + /** {@inheritDoc} */ + @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) throws CacheEntryListenerException { + onEvents(evts); + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) throws CacheEntryListenerException { + onEvents(evts); + } + + /** + * @param evts Events. + */ + protected abstract void onEvents(Iterable<CacheEntryEvent<?, ?>> evts); + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + throw new UnsupportedOperationException("Failed. Listener should not be marshaled."); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled."); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0986fa98/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java index 32a693f..75ac68a 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java @@ -30,9 +30,9 @@ public class IgniteBinaryCacheQueryTestSuite4 extends TestSuite { * @throws Exception In case of error. */ public static TestSuite suite() throws Exception { - GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName()); + //GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName()); - TestSuite suite = IgniteCacheQuerySelfTestSuite4.suite(); + TestSuite suite = IgniteCacheQuerySelfTestSuite5.suite();//IgniteCacheQuerySelfTestSuite4.suite(); return suite; }
