IGNITE-1186 Added P2P tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5455a9fe Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5455a9fe Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5455a9fe Branch: refs/heads/ignite-1186 Commit: 5455a9feeeb95841bb4a1a8a0ca70972586c2711 Parents: 9ad476b Author: nikolay_tikhonov <[email protected]> Authored: Tue Mar 1 13:38:11 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Tue Mar 1 13:41:33 2016 +0300 ---------------------------------------------------------------------- .../CacheContinuousQueryHandlerV2.java | 50 ++++++- .../continuous/CacheContinuousQueryManager.java | 63 ++++----- .../IgniteCacheEntryListenerAbstractTest.java | 6 +- .../CacheContinuousQueryOperationP2PTest.java | 130 +++++-------------- .../IgniteCacheQuerySelfTestSuite.java | 4 + 5 files changed, 107 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5455a9fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java index 628e1c3..6fc2041 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java @@ -27,12 +27,11 @@ import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteDeploymentCheckedException; -import org.apache.ignite.internal.managers.deployment.GridDeployment; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheRemoteQueryFactory; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; /** * Continuous query handler V2 version. Contains {@link Factory} for remote listener. @@ -47,6 +46,9 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan /** Deployable object for filter factory. */ private DeployableObject rmtFilterFactoryDep; + /** Event types for JCache API. */ + private byte types = 0; + /** */ protected transient CacheEntryEventFilter filter; @@ -73,6 +75,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan * @param taskHash Task name hash code. * @param locCache {@code True} if local cache. * @param keepBinary Keep binary flag. + * @param types Event types. */ public CacheContinuousQueryHandlerV2( String cacheName, @@ -88,7 +91,8 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan boolean skipPrimaryCheck, boolean locCache, boolean keepBinary, - boolean ignoreClsNotFound) { + boolean ignoreClsNotFound, + @Nullable Byte types) { super(cacheName, topic, locLsnr, @@ -107,6 +111,9 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan assert rmtFilterFactory != null; this.rmtFilterFactory = rmtFilterFactory; + + if (types != null) + this.types = types; } /** {@inheritDoc} */ @@ -114,6 +121,9 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan if (filter == null) { assert rmtFilterFactory != null; + if (types != 0) + rmtFilterFactory = new JCacheRemoteQueryFactory(rmtFilterFactory, types); + filter = rmtFilterFactory.create(); } @@ -158,6 +168,8 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan out.writeObject(rmtFilterFactoryDep); else out.writeObject(rmtFilterFactory); + + out.writeByte(types); } /** {@inheritDoc} */ @@ -171,5 +183,35 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan rmtFilterFactoryDep = (DeployableObject)in.readObject(); else rmtFilterFactory = (Factory)in.readObject(); + + types = in.readByte(); + } + + /** + * + */ + private static class JCacheRemoteQueryFactory implements Factory<CacheEntryEventFilter> { + /** */ + private static final long serialVersionUID = 0L; + + /** Factory. */ + protected Factory<? extends CacheEntryEventFilter> impl; + + /** */ + private byte types; + + /** + * @param impl Factory. + * @param types Types. + */ + public JCacheRemoteQueryFactory(@Nullable Factory<? extends CacheEntryEventFilter> impl, byte types) { + this.impl = impl; + this.types = types; + } + + /** {@inheritDoc} */ + @Override public JCacheQueryRemoteFilter create() { + return new JCacheQueryRemoteFilter(impl != null ? impl.create() : null, types); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5455a9fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 33d6d59..2a05865 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -621,6 +621,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** * @param locLsnr Local listener. + * @param types JCache event types. * @param bufSize Buffer size. * @param timeInterval Time interval. * @param autoUnsubscribe Auto unsubscribe flag. @@ -634,7 +635,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @throws IgniteCheckedException In case of error. */ private UUID executeJCacheQueryFactory(CacheEntryUpdatedListener locLsnr, - final JCacheRemoteQueryFactory rmtFilterFactory, + final Factory<CacheEntryEventFilter> rmtFilterFactory, + byte types, int bufSize, long timeInterval, boolean autoUnsubscribe, @@ -647,6 +649,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { final boolean keepBinary, boolean ignoreClassNotFound) throws IgniteCheckedException { + assert types != 0 : types; + cctx.checkSecurity(SecurityPermission.CACHE_READ); int taskNameHash = !internal && cctx.kernalContext().security().enabled() ? @@ -654,7 +658,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode(); - boolean v2 = useV2Protocol(cctx.discovery().allNodes()); + boolean v2 = rmtFilterFactory != null && useV2Protocol(cctx.discovery().allNodes()); GridContinuousHandler hnd; @@ -673,23 +677,28 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { skipPrimaryCheck, cctx.isLocal(), keepBinary, - ignoreClassNotFound); + ignoreClassNotFound, + types); else { - JCacheQueryRemoteFilter fltr = null; + JCacheQueryRemoteFilter jCacheFilter; + + CacheEntryEventFilter filter = null; if (rmtFilterFactory != null) { - fltr = rmtFilterFactory.create(); + filter = rmtFilterFactory.create(); - if (!(fltr.impl instanceof Serializable)) + if (!(filter instanceof Serializable)) throw new IgniteCheckedException("Topology has nodes of the old versions. In this case " + - "EntryEventFilter must implement java.io.Serializable interface. Filter: " + fltr.impl); + "EntryEventFilter must implement java.io.Serializable interface. Filter: " + filter); } + jCacheFilter = new JCacheQueryRemoteFilter(filter, types); + hnd = new CacheContinuousQueryHandler( cctx.name(), TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), locLsnr, - fltr, + jCacheFilter, internal, notifyExisting, oldValRequired, @@ -766,7 +775,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { skipPrimaryCheck, cctx.isLocal(), keepBinary, - ignoreClassNotFound); + ignoreClassNotFound, + (byte)0); else { CacheEntryEventFilter fltr = null; @@ -1025,7 +1035,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { routineId = executeJCacheQueryFactory( locLsnr, - new JCacheRemoteQueryFactory(cfg.getCacheEntryEventFilterFactory(), types), + cfg.getCacheEntryEventFilterFactory(), + types, ContinuousQuery.DFLT_PAGE_SIZE, ContinuousQuery.DFLT_TIME_INTERVAL, ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, @@ -1139,7 +1150,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** * For handler version 2.0 this filter should not be serialized. */ - private static class JCacheQueryRemoteFilter implements CacheEntryEventSerializableFilter, Externalizable { + protected static class JCacheQueryRemoteFilter implements CacheEntryEventSerializableFilter, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -1164,7 +1175,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param impl Filter. * @param types Types. */ - JCacheQueryRemoteFilter(CacheEntryEventFilter impl, byte types) { + JCacheQueryRemoteFilter(@Nullable CacheEntryEventFilter impl, byte types) { assert types != 0; this.impl = impl; @@ -1221,34 +1232,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** - * - */ - protected static class JCacheRemoteQueryFactory implements Factory<CacheEntryEventFilter> { - /** */ - private static final long serialVersionUID = 0L; - - /** Factory. */ - protected Factory<CacheEntryEventFilter> impl; - - /** */ - private byte types; - - /** - * @param impl Factory. - * @param types Types. - */ - public JCacheRemoteQueryFactory(@Nullable Factory<CacheEntryEventFilter> impl, byte types) { - this.impl = impl; - this.types = types; - } - - /** {@inheritDoc} */ - @Override public JCacheQueryRemoteFilter create() { - return new JCacheQueryRemoteFilter(impl != null ? impl.create() : null, types); - } - } - - /** * Task flash backup queue. */ private static final class BackupCleaner implements Runnable { http://git-wip-us.apache.org/repos/asf/ignite/blob/5455a9fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java index e61127d..43ca283 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java @@ -539,16 +539,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * @throws Exception If failed. */ - public void _testEventsObjectKeyValue() throws Exception { + public void testEventsObjectKeyValue() throws Exception { useObjects = true; - _testEvents(); + testEvents(); } /** * @throws Exception If failed. */ - public void _testEvents() throws Exception { + public void testEvents() throws Exception { IgniteCache<Object, Object> cache = jcache(); Map<Object, Object> vals = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5455a9fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java index ff8d0a7..97f9e0e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java @@ -17,28 +17,25 @@ package org.apache.ignite.internal.processors.cache.query.continuous; -import java.io.Serializable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import javax.cache.configuration.Factory; import javax.cache.configuration.FactoryBuilder; import javax.cache.configuration.MutableCacheEntryListenerConfiguration; +import javax.cache.event.CacheEntryCreatedListener; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryEventFilter; -import javax.cache.event.CacheEntryListener; import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -63,15 +60,6 @@ public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest private static final int NODES = 5; /** */ - private static final int KEYS = 50; - - /** */ - private static final int VALS = 10; - - /** */ - public static final int ITERATION_CNT = 100; - - /** */ private boolean client; /** {@inheritDoc} */ @@ -87,8 +75,15 @@ public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest } /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); startGridsMultiThreaded(NODES - 1); @@ -98,10 +93,10 @@ public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest } /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); + @Override protected void afterTest() throws Exception { + super.afterTest(); - super.afterTestsStopped(); + stopAllGrids(); } /** @@ -228,11 +223,14 @@ public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); - CacheEntryUpdatedListener<Integer, Integer> localLsnr = new CacheEntryUpdatedListener<Integer, Integer>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, - ? extends Integer>> evts) throws CacheEntryListenerException { - for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + TestLocalListener localLsnr = new TestLocalListener() { + @Override public void onEvent(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) + throws CacheEntryListenerException { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { latch.countDown(); + + log.info("Received event: " + evt); + } } }; @@ -258,7 +256,7 @@ public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest else cache = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName()); - //cur = cache.query(qry); + cur = cache.query(qry); cache.registerCacheEntryListener(lsnrCfg); @@ -306,89 +304,23 @@ public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest /** * */ - public static class QueryTestKey implements Serializable, Comparable { - /** */ - private final Integer key; - - /** - * @param key Key. - */ - public QueryTestKey(Integer key) { - this.key = key; - } - + private static abstract class TestLocalListener implements CacheEntryUpdatedListener<Integer, Integer>, + CacheEntryCreatedListener<Integer, Integer> { /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - QueryTestKey that = (QueryTestKey)o; - - return key.equals(that.key); + @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) + throws CacheEntryListenerException { + onEvent(evts); } /** {@inheritDoc} */ - @Override public int hashCode() { - return key.hashCode(); + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) + throws CacheEntryListenerException { + onEvent(evts); } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(QueryTestKey.class, this); - } - - /** {@inheritDoc} */ - @Override public int compareTo(Object o) { - return key - ((QueryTestKey)o).key; - } - } - - /** - * - */ - public static class QueryTestValue implements Serializable { - /** */ - protected final Integer val1; - - /** */ - protected final String val2; - /** - * @param val Value. + * @param evts Events. */ - public QueryTestValue(Integer val) { - this.val1 = val; - this.val2 = String.valueOf(val); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - QueryTestValue that = (QueryTestValue) o; - - return val1.equals(that.val1) && val2.equals(that.val2); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = val1.hashCode(); - - res = 31 * res + val2.hashCode(); - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(QueryTestValue.class, this); - } + protected abstract void onEvent(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5455a9fe/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 457ab9b..5df10a7 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -78,12 +78,14 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterReplicatedTxTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxOffheapTieredTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest; @@ -224,6 +226,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class); suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class); suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class); + suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class); + suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class); suite.addTestSuite(CacheContinuousBatchAckTest.class); suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);
