IGNITE-1186 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9ad476b2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9ad476b2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9ad476b2 Branch: refs/heads/ignite-1186 Commit: 9ad476b21a1f6f9d52d8efcaad346a4ac85ea73b Parents: e80e906 Author: nikolay_tikhonov <[email protected]> Authored: Mon Feb 29 20:56:35 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Mon Feb 29 20:56:35 2016 +0300 ---------------------------------------------------------------------- .../ignite/cache/query/ContinuousQuery.java | 10 +- .../internal/GridEventConsumeHandler.java | 6 + .../internal/GridMessageListenHandler.java | 6 + .../processors/cache/IgniteCacheProxy.java | 13 +- .../continuous/CacheContinuousQueryHandler.java | 9 +- .../CacheContinuousQueryHandlerV2.java | 20 +- .../continuous/CacheContinuousQueryManager.java | 263 ++++- .../continuous/GridContinuousHandler.java | 6 + .../IgniteCacheEntryListenerAbstractTest.java | 66 +- .../CacheContinuousQueryFactoryFilterTest.java | 1002 +++--------------- .../CacheContinuousQueryOperationP2PTest.java | 394 +++++++ ...acheContinuousQueryRandomOperationsTest.java | 10 +- .../p2p/CacheDeploymentEntryEventFilter.java | 33 + .../CacheDeploymentEntryEventFilterFactory.java | 31 + 14 files changed, 924 insertions(+), 945 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java index 452735e..3ea8f93 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java @@ -19,6 +19,7 @@ package org.apache.ignite.cache.query; import javax.cache.Cache; import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; @@ -121,7 +122,7 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { private CacheEntryEventSerializableFilter<K, V> rmtFilter; /** Remote filter factory. */ - private Factory<? extends CacheEntryEventSerializableFilter<K, V>> rmtFilterFactory; + private Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory; /** Time interval. */ private long timeInterval = DFLT_TIME_INTERVAL; @@ -200,7 +201,10 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { * * @param rmtFilter Key-value filter. * @return {@code this} for chaining. + * + * @deprecated Use {@link #setRemoteFilterFactory(Factory)} instead. */ + @Deprecated public ContinuousQuery<K, V> setRemoteFilter(CacheEntryEventSerializableFilter<K, V> rmtFilter) { this.rmtFilter = rmtFilter; @@ -228,7 +232,7 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { * @return {@code this} for chaining. */ public ContinuousQuery<K, V> setRemoteFilterFactory( - Factory<? extends CacheEntryEventSerializableFilter<K, V>> rmtFilterFactory) { + Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) { this.rmtFilterFactory = rmtFilterFactory; return this; @@ -239,7 +243,7 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> { * * @return Remote filter. */ - public Factory<? extends CacheEntryEventSerializableFilter<K, V>> getRemoteFilterFactory() { + public Factory<? extends CacheEntryEventFilter<K, V>> getRemoteFilterFactory() { return rmtFilterFactory; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index e2b1184..924a8ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -26,6 +26,7 @@ import java.util.LinkedList; import java.util.Map; import java.util.Queue; import java.util.UUID; +import javax.cache.event.CacheEntryEventFilter; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheEvent; @@ -141,6 +142,11 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public CacheEntryEventFilter getEventFilter() { + return null; + } + + /** {@inheritDoc} */ @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException { assert nodeId != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index 402365c..e157c98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -24,6 +24,7 @@ import java.io.ObjectOutput; import java.util.Collection; import java.util.Map; import java.util.UUID; +import javax.cache.event.CacheEntryEventFilter; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; @@ -130,6 +131,11 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public CacheEntryEventFilter getEventFilter() { + return null; + } + + /** {@inheritDoc} */ @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException { ctx.io().addUserMessageListener(topic, pred); http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 9e9b985..690e0b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -97,16 +97,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** */ private static final long serialVersionUID = 0L; - /** */ - private static final IgniteBiPredicate ACCEPT_ALL = new IgniteBiPredicate() { - /** */ - private static final long serialVersionUID = -1640538788290240617L; - - @Override public boolean apply(Object k, Object v) { - return true; - } - }; - /** Context. */ private GridCacheContext<K, V> ctx; @@ -565,6 +555,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (qry.getLocalListener() == null) throw new IgniteException("Mandatory local listener is not set for the query: " + qry); + if (qry.getRemoteFilter() != null && qry.getRemoteFilterFactory() != null) + throw new IgniteException("Should be used either RemoterFilter or RemoteFilterFactory."); + try { final UUID routineId = ctx.continuousQueries().executeQuery( qry.getLocalListener(), http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 47f5c52..393f7fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -263,7 +262,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (locLsnr != null) ctx.resource().injectGeneric(locLsnr); - final CacheEntryEventFilter filter = getRemoteFilter(); + final CacheEntryEventFilter filter = getEventFilter(); if (filter != null) ctx.resource().injectGeneric(filter); @@ -521,10 +520,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler return mgr.registerListener(routineId, lsnr, internal); } - /** - * @return Remote filter. - */ - protected CacheEntryEventFilter<K, V> getRemoteFilter() { + /** {@inheritDoc} */ + public CacheEntryEventFilter getEventFilter() { return rmtFilter; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java index 4573e6c..628e1c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java @@ -26,8 +26,10 @@ import javax.cache.configuration.Factory; import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteDeploymentCheckedException; +import org.apache.ignite.internal.managers.deployment.GridDeployment; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheRemoteQueryFactory; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -40,13 +42,13 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan private static final long serialVersionUID = 0L; /** Remote filter factory. */ - private Factory<? extends CacheEntryEventSerializableFilter<K, V>> rmtFilterFactory; + private Factory<? extends CacheEntryEventFilter> rmtFilterFactory; /** Deployable object for filter factory. */ private DeployableObject rmtFilterFactoryDep; /** */ - protected transient CacheEntryEventFilter<K, V> rmtNonSerFilter; + protected transient CacheEntryEventFilter filter; /** * Required by {@link Externalizable}. @@ -76,7 +78,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan String cacheName, Object topic, CacheEntryUpdatedListener<K, V> locLsnr, - Factory<? extends CacheEntryEventSerializableFilter<K, V>> rmtFilterFactory, + Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory, boolean internal, boolean notifyExisting, boolean oldValRequired, @@ -108,14 +110,14 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan } /** {@inheritDoc} */ - @Override protected CacheEntryEventFilter<K, V> getRemoteFilter() { - if (rmtNonSerFilter == null) { + @Override public CacheEntryEventFilter getEventFilter() { + if (filter == null) { assert rmtFilterFactory != null; - rmtNonSerFilter = rmtFilterFactory.create(); + filter = rmtFilterFactory.create(); } - return rmtNonSerFilter; + return filter; } /** {@inheritDoc} */ @@ -168,6 +170,6 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan if (b) rmtFilterFactoryDep = (DeployableObject)in.readObject(); else - rmtFilterFactory = (Factory<CacheEntryEventSerializableFilter<K, V>>)in.readObject(); + rmtFilterFactory = (Factory)in.readObject(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index fd4d71e..33d6d59 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -21,10 +21,11 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.io.Serializable; import java.util.ArrayList; -import java.util.Map; import java.util.Collection; import java.util.Iterator; +import java.util.Map; import java.util.NoSuchElementException; import java.util.UUID; import java.util.concurrent.ConcurrentMap; @@ -415,29 +416,44 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @throws IgniteCheckedException In case of error. */ public UUID executeQuery(CacheEntryUpdatedListener locLsnr, - CacheEntryEventSerializableFilter rmtFilter, - Factory<? extends CacheEntryEventSerializableFilter> rmtFilterFactory, + @Nullable CacheEntryEventSerializableFilter rmtFilter, + @Nullable Factory<? extends CacheEntryEventSerializableFilter> rmtFilterFactory, int bufSize, long timeInterval, boolean autoUnsubscribe, boolean loc, boolean keepBinary) throws IgniteCheckedException { - return executeQuery0( - locLsnr, - rmtFilter, - rmtFilterFactory, - bufSize, - timeInterval, - autoUnsubscribe, - false, - false, - true, - false, - true, - loc, - keepBinary, - false); + if (rmtFilterFactory != null) + return executeQueryWithFilterFactory( + locLsnr, + rmtFilterFactory, + bufSize, + timeInterval, + autoUnsubscribe, + false, + false, + true, + false, + true, + loc, + keepBinary, + false); + else + return executeQueryWithFilter( + locLsnr, + rmtFilter, + bufSize, + timeInterval, + autoUnsubscribe, + false, + false, + true, + false, + true, + loc, + keepBinary, + false); } /** @@ -455,10 +471,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean ignoreClassNotFound) throws IgniteCheckedException { - return executeQuery0( + return executeQueryWithFilter( locLsnr, rmtFilter, - null, ContinuousQuery.DFLT_PAGE_SIZE, ContinuousQuery.DFLT_TIME_INTERVAL, ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, @@ -556,8 +571,162 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @return Continuous routine ID. * @throws IgniteCheckedException In case of error. */ - private UUID executeQuery0(CacheEntryUpdatedListener locLsnr, + private UUID executeQueryWithFilter(CacheEntryUpdatedListener locLsnr, final CacheEntryEventSerializableFilter rmtFilter, + int bufSize, + long timeInterval, + boolean autoUnsubscribe, + boolean internal, + boolean notifyExisting, + boolean oldValRequired, + boolean sync, + boolean ignoreExpired, + boolean loc, + final boolean keepBinary, + boolean ignoreClassNotFound) throws IgniteCheckedException + { + cctx.checkSecurity(SecurityPermission.CACHE_READ); + + int taskNameHash = !internal && cctx.kernalContext().security().enabled() ? + cctx.kernalContext().job().currentTaskNameHash() : 0; + + boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode(); + + GridContinuousHandler hnd = new CacheContinuousQueryHandler( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + rmtFilter, + internal, + notifyExisting, + oldValRequired, + sync, + ignoreExpired, + taskNameHash, + skipPrimaryCheck, + cctx.isLocal(), + keepBinary, + ignoreClassNotFound); + + return executeQuery0(locLsnr, + bufSize, + timeInterval, + autoUnsubscribe, + notifyExisting, + loc, + keepBinary, + hnd); + } + + + /** + * @param locLsnr Local listener. + * @param bufSize Buffer size. + * @param timeInterval Time interval. + * @param autoUnsubscribe Auto unsubscribe flag. + * @param internal Internal flag. + * @param notifyExisting Notify existing flag. + * @param oldValRequired Old value required flag. + * @param sync Synchronous flag. + * @param ignoreExpired Ignore expired event flag. + * @param loc Local flag. + * @return Continuous routine ID. + * @throws IgniteCheckedException In case of error. + */ + private UUID executeJCacheQueryFactory(CacheEntryUpdatedListener locLsnr, + final JCacheRemoteQueryFactory rmtFilterFactory, + int bufSize, + long timeInterval, + boolean autoUnsubscribe, + boolean internal, + boolean notifyExisting, + boolean oldValRequired, + boolean sync, + boolean ignoreExpired, + boolean loc, + final boolean keepBinary, + boolean ignoreClassNotFound) throws IgniteCheckedException + { + cctx.checkSecurity(SecurityPermission.CACHE_READ); + + int taskNameHash = !internal && cctx.kernalContext().security().enabled() ? + cctx.kernalContext().job().currentTaskNameHash() : 0; + + boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode(); + + boolean v2 = useV2Protocol(cctx.discovery().allNodes()); + + GridContinuousHandler hnd; + + if (v2) + hnd = new CacheContinuousQueryHandlerV2( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + rmtFilterFactory, + internal, + notifyExisting, + oldValRequired, + sync, + ignoreExpired, + taskNameHash, + skipPrimaryCheck, + cctx.isLocal(), + keepBinary, + ignoreClassNotFound); + else { + JCacheQueryRemoteFilter fltr = null; + + if (rmtFilterFactory != null) { + fltr = rmtFilterFactory.create(); + + if (!(fltr.impl instanceof Serializable)) + throw new IgniteCheckedException("Topology has nodes of the old versions. In this case " + + "EntryEventFilter must implement java.io.Serializable interface. Filter: " + fltr.impl); + } + + hnd = new CacheContinuousQueryHandler( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + fltr, + internal, + notifyExisting, + oldValRequired, + sync, + ignoreExpired, + taskNameHash, + skipPrimaryCheck, + cctx.isLocal(), + keepBinary, + ignoreClassNotFound); + } + + return executeQuery0(locLsnr, + bufSize, + timeInterval, + autoUnsubscribe, + notifyExisting, + loc, + keepBinary, + hnd); + } + + /** + * @param locLsnr Local listener. + * @param bufSize Buffer size. + * @param timeInterval Time interval. + * @param autoUnsubscribe Auto unsubscribe flag. + * @param internal Internal flag. + * @param notifyExisting Notify existing flag. + * @param oldValRequired Old value required flag. + * @param sync Synchronous flag. + * @param ignoreExpired Ignore expired event flag. + * @param loc Local flag. + * @return Continuous routine ID. + * @throws IgniteCheckedException In case of error. + */ + private UUID executeQueryWithFilterFactory(CacheEntryUpdatedListener locLsnr, final Factory<? extends CacheEntryEventFilter> rmtFilterFactory, int bufSize, long timeInterval, @@ -582,9 +751,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { GridContinuousHandler hnd; - if (v2) { - assert rmtFilter == null : rmtFilter; - + if (v2) hnd = new CacheContinuousQueryHandlerV2( cctx.name(), TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), @@ -600,7 +767,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { cctx.isLocal(), keepBinary, ignoreClassNotFound); - } else { CacheEntryEventFilter fltr = null; @@ -608,8 +774,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { fltr = rmtFilterFactory.create(); if (!(fltr instanceof CacheEntryEventSerializableFilter)) - throw new IgniteCheckedException("Cache entry event filter must implement " + - "org.apache.ignite.cache.CacheEntryEventSerializableFilter: " + fltr); + throw new IgniteCheckedException("Topology has nodes of the old versions. In this case " + + "EntryEventFilter should implement org.apache.ignite.cache.CacheEntryEventSerializableFilter " + + "interface. Filter: " + fltr); } hnd = new CacheContinuousQueryHandler( @@ -629,6 +796,37 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { ignoreClassNotFound); } + return executeQuery0(locLsnr, + bufSize, + timeInterval, + autoUnsubscribe, + notifyExisting, + loc, + keepBinary, + hnd); + } + + /** + * @param locLsnr Local listener. + * @param bufSize Buffer size. + * @param timeInterval Time interval. + * @param autoUnsubscribe Auto unsubscribe flag. + * @param notifyExisting Notify existing flag. + * @param loc Local flag. + * @param keepBinary Keep binary. + * @param hnd Handler. + * @return Continuous routine ID. + * @throws IgniteCheckedException In case of error. + */ + private UUID executeQuery0(CacheEntryUpdatedListener locLsnr, + int bufSize, + long timeInterval, + boolean autoUnsubscribe, + boolean notifyExisting, + boolean loc, + final boolean keepBinary, + final GridContinuousHandler hnd) + throws IgniteCheckedException { IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ? F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue(); @@ -694,7 +892,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { cctx.kernalContext().cache().jcache(cctx.name()), cctx, entry); - if (rmtFilter != null && !rmtFilter.evaluate(next)) + if (hnd.getEventFilter() != null && !hnd.getEventFilter().evaluate(next)) next = null; } } @@ -825,9 +1023,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { locLsnrImpl, log); - routineId = executeQuery0( + routineId = executeJCacheQueryFactory( locLsnr, - null, new JCacheRemoteQueryFactory(cfg.getCacheEntryEventFilterFactory(), types), ContinuousQuery.DFLT_PAGE_SIZE, ContinuousQuery.DFLT_TIME_INTERVAL, @@ -1026,12 +1223,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** * */ - private static class JCacheRemoteQueryFactory implements Factory<CacheEntryEventFilter> { + protected static class JCacheRemoteQueryFactory implements Factory<CacheEntryEventFilter> { /** */ private static final long serialVersionUID = 0L; /** Factory. */ - private Factory<CacheEntryEventFilter> impl; + protected Factory<CacheEntryEventFilter> impl; /** */ private byte types; @@ -1046,7 +1243,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** {@inheritDoc} */ - @Override public CacheEntryEventFilter create() { + @Override public JCacheQueryRemoteFilter create() { return new JCacheQueryRemoteFilter(impl != null ? impl.create() : null, types); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index 8cd30a8..232e1ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -21,6 +21,7 @@ import java.io.Externalizable; import java.util.Collection; import java.util.Map; import java.util.UUID; +import javax.cache.event.CacheEntryEventFilter; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -158,4 +159,9 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { * @param topVer Topology version. */ public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs); + + /** + * @return Cache entry filter. + */ + public CacheEntryEventFilter getEventFilter(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java index 62a4153..e61127d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java @@ -42,6 +42,7 @@ import javax.cache.configuration.FactoryBuilder; import javax.cache.configuration.MutableCacheEntryListenerConfiguration; import javax.cache.event.CacheEntryCreatedListener; import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryExpiredListener; import javax.cache.event.CacheEntryListener; import javax.cache.event.CacheEntryListenerException; @@ -62,6 +63,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; @@ -99,6 +101,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** */ private boolean useObjects; + /** */ + private static AtomicBoolean serialized = new AtomicBoolean(false); + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { @@ -138,6 +143,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb assertEquals(0, syncMsgFuts.size()); } + + serialized.set(false); } /** @@ -439,18 +446,23 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb jcache(0).registerCacheEntryListener(new MutableCacheEntryListenerConfiguration<>( FactoryBuilder.factoryOf(lsnr), - null, + new SerializableFactory(), true, false )); try { startGrid(gridCount()); + + jcache(0).put(1, 1); } finally { stopGrid(gridCount()); } + jcache(0).put(2, 2); + + assertFalse(IgniteCacheEntryListenerAbstractTest.serialized.get()); assertFalse(serialized.get()); } @@ -527,16 +539,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * @throws Exception If failed. */ - public void testEventsObjectKeyValue() throws Exception { + public void _testEventsObjectKeyValue() throws Exception { useObjects = true; - testEvents(); + _testEvents(); } /** * @throws Exception If failed. */ - public void testEvents() throws Exception { + public void _testEvents() throws Exception { IgniteCache<Object, Object> cache = jcache(); Map<Object, Object> vals = new HashMap<>(); @@ -1126,9 +1138,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> { + private static class TestFilterFactory implements Factory<CacheEntryEventFilter<Object, Object>> { /** {@inheritDoc} */ - @Override public CacheEntryEventSerializableFilter<Object, Object> create() { + @Override public CacheEntryEventFilter<Object, Object> create() { return new TestFilter(); } } @@ -1180,7 +1192,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class TestFilter implements CacheEntryEventSerializableFilter<Object, Object> { + private static class TestFilter implements CacheEntryEventFilter<Object, Object>, Externalizable { /** {@inheritDoc} */ @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) { assert evt != null; @@ -1197,6 +1209,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb return key % 2 == 0; } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + throw new UnsupportedOperationException("Filter muns't be marshaled."); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + throw new UnsupportedOperationException("Filter muns't be unmarshaled."); + } } /** @@ -1351,6 +1373,36 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb } /** + * + */ + public static class SerializableFactory implements Factory<NonSerializableFilter> { + /** {@inheritDoc} */ + @Override public NonSerializableFilter create() { + return new NonSerializableFilter(); + } + } + + /** + * + */ + public static class NonSerializableFilter implements CacheEntryEventFilter<Object, Object>, Externalizable { + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + serialized.set(true); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + serialized.set(true); + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException { + return true; + } + } + + /** */ public static class NonSerializableListener implements CacheEntryCreatedListener<Object, Object>, Externalizable { /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java index 2ff2b79..d6d30ae 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java @@ -21,69 +21,47 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.SortedMap; -import java.util.TreeMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import javax.cache.Cache; +import java.util.concurrent.ThreadLocalRandom; +import javax.cache.configuration.CacheEntryListenerConfiguration; import javax.cache.configuration.Factory; import javax.cache.configuration.FactoryBuilder; -import javax.cache.configuration.FactoryBuilder.SingletonFactory; +import javax.cache.configuration.MutableCacheEntryListenerConfiguration; +import javax.cache.event.CacheEntryCreatedListener; import javax.cache.event.CacheEntryEvent; -import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryExpiredListener; import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryRemovedListener; import javax.cache.event.CacheEntryUpdatedListener; -import javax.cache.integration.CacheLoaderException; -import javax.cache.integration.CacheWriterException; -import javax.cache.processor.EntryProcessor; -import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; -import org.apache.ignite.cache.CacheMemoryMode; -import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.CacheQueryEntryEvent; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; -import org.apache.ignite.cache.store.CacheStore; -import org.apache.ignite.cache.store.CacheStoreAdapter; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.QueryTestKey; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.QueryTestValue; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; -import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; -import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES; -import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheMode.REPLICATED; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.ContinuousDeploy.ALL; -import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.ContinuousDeploy.CLIENT; -import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.ContinuousDeploy.SERVER; import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.NonSerializableFilter.isAccepted; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER; import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; @@ -91,10 +69,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; /** * */ -public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTest { - /** */ - private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - +public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryRandomOperationsTest { /** */ private static final int NODES = 5; @@ -105,488 +80,10 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes private static final int VALS = 10; /** */ - public static final int ITERATION_CNT = 100; - - /** */ - private boolean client; + public static final int ITERATION_CNT = 40; /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); - - cfg.setClientMode(client); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - startGridsMultiThreaded(NODES - 1); - - client = true; - - startGrid(NODES - 1); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - - super.afterTestsStopped(); - } - - /** - * @throws Exception If failed. - */ - public void testAtomicClient() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - ATOMIC, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, CLIENT); - } - - /** - * @throws Exception If failed. - */ - public void testAtomic() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - ATOMIC, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, SERVER); - } - - /** - * @throws Exception If failed. - */ - public void testAtomicAllNodes() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - ATOMIC, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, ALL); - } - - /** - * @throws Exception If failed. - */ - public void testAtomicReplicated() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, - 0, - ATOMIC, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, SERVER); - } - - /** - * @throws Exception If failed. - */ - public void testAtomicReplicatedAllNodes() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, - 0, - ATOMIC, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, ALL); - } - - /** - * @throws Exception If failed. - */ - public void testAtomicReplicatedClient() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, - 0, - ATOMIC, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, CLIENT); - } - - /** - * @throws Exception If failed. - */ - public void testAtomicOffheapValues() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - ATOMIC, - OFFHEAP_VALUES, - false); - - testContinuousQuery(ccfg, SERVER); - } - - /** - * @throws Exception If failed. - */ - public void testAtomicOffheapValuesAllNodes() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - ATOMIC, - OFFHEAP_VALUES, - false); - - testContinuousQuery(ccfg, ALL); - } - - /** - * @throws Exception If failed. - */ - public void testAtomicOffheapValuesClient() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - ATOMIC, - OFFHEAP_VALUES, - false); - - testContinuousQuery(ccfg, CLIENT); - } - - /** - * @throws Exception If failed. - */ - public void testAtomicOffheapTiered() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - ATOMIC, - OFFHEAP_TIERED, - false); - - testContinuousQuery(ccfg, SERVER); - } - - /** - * @throws Exception If failed. - */ - public void testAtomicOffheapTieredAllNodes() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - ATOMIC, - OFFHEAP_TIERED, - false); - - testContinuousQuery(ccfg, ALL); - } - - /** - * @throws Exception If failed. - */ - public void testAtomicOffheapTieredClient() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - ATOMIC, - OFFHEAP_TIERED, - false); - - testContinuousQuery(ccfg, CLIENT); - } - - /** - * @throws Exception If failed. - */ - public void testAtomicNoBackups() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 0, - ATOMIC, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, SERVER); - } - - /** - * @throws Exception If failed. - */ - public void testAtomicNoBackupsAllNodes() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 0, - ATOMIC, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, ALL); - } - - /** - * @throws Exception If failed. - */ - public void testAtomicNoBackupsClient() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 0, - ATOMIC, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, CLIENT); - } - - /** - * @throws Exception If failed. - */ - public void testTx() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - TRANSACTIONAL, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, SERVER); - } - - /** - * @throws Exception If failed. - */ - public void testTxAllNodes() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - TRANSACTIONAL, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, ALL); - } - - /** - * @throws Exception If failed. - */ - public void testTxExplicit() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - TRANSACTIONAL, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, SERVER); - } - - /** - * @throws Exception If failed. - */ - public void testTxClient() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - TRANSACTIONAL, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, CLIENT); - } - - /** - * @throws Exception If failed. - */ - public void testTxClientExplicit() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - TRANSACTIONAL, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, CLIENT); - } - - /** - * @throws Exception If failed. - */ - public void testTxReplicated() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, - 0, - TRANSACTIONAL, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, SERVER); - } - - /** - * @throws Exception If failed. - */ - public void testTxReplicatedClient() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, - 0, - TRANSACTIONAL, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, CLIENT); - } - - /** - * @throws Exception If failed. - */ - public void testTxOffheapValues() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - TRANSACTIONAL, - OFFHEAP_VALUES, - false); - - testContinuousQuery(ccfg, SERVER); - } - - /** - * @throws Exception If failed. - */ - public void testTxOffheapValuesAllNodes() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - TRANSACTIONAL, - OFFHEAP_VALUES, - false); - - testContinuousQuery(ccfg, ALL); - } - - /** - * @throws Exception If failed. - */ - public void testTxOffheapValuesExplicit() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - TRANSACTIONAL, - OFFHEAP_VALUES, - false); - - testContinuousQuery(ccfg, SERVER); - } - - /** - * @throws Exception If failed. - */ - public void testTxOffheapValuesClient() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - TRANSACTIONAL, - OFFHEAP_VALUES, - false); - - testContinuousQuery(ccfg, CLIENT); - } - - /** - * @throws Exception If failed. - */ - public void testTxOffheapTiered() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - TRANSACTIONAL, - OFFHEAP_TIERED, - false); - - testContinuousQuery(ccfg, SERVER); - } - - /** - * @throws Exception If failed. - */ - public void testTxOffheapTieredAllNodes() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - TRANSACTIONAL, - OFFHEAP_TIERED, - false); - - testContinuousQuery(ccfg, ALL); - } - - /** - * @throws Exception If failed. - */ - public void testTxOffheapTieredClient() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - TRANSACTIONAL, - OFFHEAP_TIERED, - false); - - testContinuousQuery(ccfg, CLIENT); - } - - /** - * @throws Exception If failed. - */ - public void testTxOffheapTieredClientExplicit() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 1, - TRANSACTIONAL, - OFFHEAP_TIERED, - false); - - testContinuousQuery(ccfg, CLIENT); - } - - /** - * @throws Exception If failed. - */ - public void testTxNoBackups() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 0, - TRANSACTIONAL, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, SERVER); - } - - /** - * @throws Exception If failed. - */ - public void testTxNoBackupsAllNodes() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 0, - TRANSACTIONAL, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, ALL); - } - - /** - * @throws Exception If failed. - */ - public void testTxNoBackupsExplicit() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 0, - TRANSACTIONAL, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, SERVER); - } - - /** - * @throws Exception If failed. - */ - public void testTxNoBackupsClient() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, - 0, - TRANSACTIONAL, - ONHEAP_TIERED, - false); - - testContinuousQuery(ccfg, CLIENT); - } - - /** - * @param ccfg Cache configuration. - * @param deploy The place where continuous query will be started. - * @throws Exception If failed. - */ - private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy) + @Override protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy) throws Exception { ignite(0).createCache(ccfg); @@ -601,70 +98,18 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes Collection<QueryCursor<?>> curs = new ArrayList<>(); - if (deploy == CLIENT) { - ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>(); - - final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); - - qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> evts) throws CacheEntryListenerException { - for (CacheEntryEvent<?, ?> evt : evts) - evtsQueue.add(evt); - } - }); - - qry.setRemoteFilterFactory(new FilterFactory()); - - evtsQueues.add(evtsQueue); + Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs = new ArrayList<>(); - QueryCursor<?> cur = grid(NODES - 1).cache(ccfg.getName()).query(qry); - - curs.add(cur); - } - else if (deploy == SERVER) { - ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>(); - - final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); - - qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> evts) throws CacheEntryListenerException { - for (CacheEntryEvent<?, ?> evt : evts) - evtsQueue.add(evt); - } - }); - - qry.setRemoteFilterFactory(new FilterFactory()); - - evtsQueues.add(evtsQueue); - - QueryCursor<?> cur = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName()).query(qry); - - curs.add(cur); - } + if (deploy == CLIENT) + evtsQueues.add(registerListener(ccfg.getName(), NODES - 1, curs, lsnrCfgs, rnd.nextBoolean())); + else if (deploy == SERVER) + evtsQueues.add(registerListener(ccfg.getName(), rnd.nextInt(NODES - 1), curs, lsnrCfgs, + rnd.nextBoolean())); else { - for (int i = 0; i < NODES - 1; i++) { - ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>(); + boolean isSync = rnd.nextBoolean(); - final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); - - qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> evts) throws CacheEntryListenerException { - for (CacheEntryEvent<?, ?> evt : evts) - evtsQueue.add(evt); - } - }); - - qry.setRemoteFilterFactory(new FilterFactory()); - - evtsQueues.add(evtsQueue); - - QueryCursor<?> cur = ignite(i).cache(ccfg.getName()).query(qry); - - curs.add(cur); - } + for (int i = 0; i < NODES - 1; i++) + evtsQueues.add(registerListener(ccfg.getName(), i, curs, lsnrCfgs, isSync)); } ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>(); @@ -673,7 +118,7 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes try { for (int i = 0; i < ITERATION_CNT; i++) { - if (i % 20 == 0) + if (i % 10 == 0) log.info("Iteration: " + i); for (int idx = 0; idx < NODES; idx++) @@ -683,6 +128,9 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes finally { for (QueryCursor<?> cur : curs) cur.close(); + + for (T2<Integer, MutableCacheEntryListenerConfiguration> e : lsnrCfgs) + grid(e.get1()).cache(ccfg.getName()).deregisterCacheEntryListener(e.get2()); } } finally { @@ -691,6 +139,60 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes } /** + * @param cacheName Cache name. + * @param nodeIdx Node index. + * @param curs Cursors. + * @param lsnrCfgs Listener configurations. + * @return Event queue + */ + private BlockingQueue<CacheEntryEvent<?, ?>> registerListener(String cacheName, + int nodeIdx, + Collection<QueryCursor<?>> curs, + Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs, + boolean sync) { + final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); + + if (ThreadLocalRandom.current().nextBoolean()) { + MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg = + new MutableCacheEntryListenerConfiguration<>( + FactoryBuilder.factoryOf(new LocalNonSerialiseListener() { + @Override protected void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) + evtsQueue.add(evt); + } + }), + new FilterFactory(), + true, + sync + ); + + grid(nodeIdx).cache(cacheName).registerCacheEntryListener((CacheEntryListenerConfiguration)lsnrCfg); + + lsnrCfgs.add(new T2<Integer, MutableCacheEntryListenerConfiguration>(nodeIdx, lsnrCfg)); + } + else { + ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> evts) throws CacheEntryListenerException { + for (CacheEntryEvent<?, ?> evt : evts) + evtsQueue.add(evt); + } + }); + + qry.setRemoteFilterFactory(new FilterFactory()); + + QueryCursor<?> cur = grid(nodeIdx).cache(cacheName).query(qry); + + curs.add(cur); + } + + return evtsQueue; + } + + /** * @param rnd Random generator. * @param evtsQueues Events queue. * @param expData Expected cache data. @@ -927,49 +429,6 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes break; } - case 11: { - SortedMap<Object, Object> vals = new TreeMap<>(); - - while (vals.size() < KEYS / 5) - vals.put(new QueryTestKey(rnd.nextInt(KEYS)), value(rnd)); - - cache.putAll(vals); - - if (tx != null) - tx.commit(); - - for (Map.Entry<Object, Object> e : vals.entrySet()) - updatePartitionCounter(cache, e.getKey(), partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), vals, expData); - - expData.putAll(vals); - - break; - } - - case 12: { - SortedMap<Object, Object> vals = new TreeMap<>(); - - while (vals.size() < KEYS / 5) - vals.put(new QueryTestKey(rnd.nextInt(KEYS)), newVal); - - cache.invokeAll(vals.keySet(), new EntrySetValueProcessor(newVal, rnd.nextBoolean())); - - if (tx != null) - tx.commit(); - - for (Map.Entry<Object, Object> e : vals.entrySet()) - updatePartitionCounter(cache, e.getKey(), partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), vals, expData); - - for (Object o : vals.keySet()) - expData.put(o, newVal); - - break; - } - default: fail("Op:" + op); } @@ -980,76 +439,6 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes } /** - * @param evtsQueues Queue. - * @param partCntrs Counters. - * @param aff Affinity. - * @param vals Values. - * @param expData Expected data. - */ - private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues, - Map<Integer, Long> partCntrs, - Affinity<Object> aff, - SortedMap<Object, Object> vals, - Map<Object, Object> expData) - throws Exception { - Map<Object, Object> vals0 = new HashMap<>(vals); - - for (Map.Entry<Object, Object> e : vals0.entrySet()) { - if (!isAccepted((QueryTestValue)e.getValue())) - vals.remove(e.getKey()); - } - - for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) { - Map<Object, CacheEntryEvent> rcvEvts = new HashMap<>(); - - for (int i = 0; i < vals.size(); i++) { - CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS); - - try { - assertNotNull(evt); - } - catch (Throwable e) { - int z = 0; - - ++z; - } - - rcvEvts.put(evt.getKey(), evt); - } - - assertEquals(vals.size(), rcvEvts.size()); - - for (Map.Entry<Object, Object> e : vals.entrySet()) { - Object key = e.getKey(); - Object val = e.getValue(); - Object oldVal = expData.get(key); - - if (val == null && oldVal == null) { - checkNoEvent(evtsQueues); - - continue; - } - - CacheEntryEvent evt = rcvEvts.get(key); - - assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', - evt); - assertEquals(key, evt.getKey()); - assertEquals(val, evt.getValue()); - assertEquals(oldVal, evt.getOldValue()); - - long cntr = partCntrs.get(aff.partition(key)); - CacheQueryEntryEvent qryEntryEvt = (CacheQueryEntryEvent)evt.unwrap(CacheQueryEntryEvent.class); - - assertNotNull(cntr); - assertNotNull(qryEntryEvt); - - assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter()); - } - } - } - - /** * @param rnd {@link Random}. * @return {@link TransactionIsolation}. */ @@ -1153,230 +542,99 @@ public class CacheContinuousQueryFactoryFilterTest extends GridCommonAbstractTes /** * - * @param cacheMode Cache mode. - * @param backups Number of backups. - * @param atomicityMode Cache atomicity mode. - * @param memoryMode Cache memory mode. - * @param store If {@code true} configures dummy cache store. - * @return Cache configuration. - */ - private CacheConfiguration<Object, Object> cacheConfiguration( - CacheMode cacheMode, - int backups, - CacheAtomicityMode atomicityMode, - CacheMemoryMode memoryMode, - boolean store) { - CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); - - ccfg.setAtomicityMode(atomicityMode); - ccfg.setCacheMode(cacheMode); - ccfg.setMemoryMode(memoryMode); - ccfg.setWriteSynchronizationMode(FULL_SYNC); - ccfg.setAtomicWriteOrderMode(PRIMARY); - - if (cacheMode == PARTITIONED) - ccfg.setBackups(backups); - - if (store) { - ccfg.setCacheStoreFactory(new TestStoreFactory()); - ccfg.setReadThrough(true); - ccfg.setWriteThrough(true); - } - - return ccfg; - } - - /** - * - */ - private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> { - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public CacheStore<Object, Object> create() { - return new CacheStoreAdapter() { - @Override public Object load(Object key) throws CacheLoaderException { - return null; - } - - @Override public void write(Cache.Entry entry) throws CacheWriterException { - // No-op. - } - - @Override public void delete(Object key) throws CacheWriterException { - // No-op. - } - }; - } - } - - /** - * */ - static class QueryTestKey implements Serializable, Comparable { + protected static class NonSerializableFilter + implements CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, Externalizable { /** */ - private final Integer key; - - /** - * @param key Key. - */ - public QueryTestKey(Integer key) { - this.key = key; + public NonSerializableFilter() { + // No-op. } /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - QueryTestKey that = (QueryTestKey)o; - - return key.equals(that.key); + @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event) + throws CacheEntryListenerException { + return isAccepted(event.getValue()); } /** {@inheritDoc} */ - @Override public int hashCode() { - return key.hashCode(); + @Override public void writeExternal(ObjectOutput out) throws IOException { + fail("Entry filter should not be marshaled."); } /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(QueryTestKey.class, this); + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + fail("Entry filter should not be marshaled."); } - /** {@inheritDoc} */ - @Override public int compareTo(Object o) { - return key - ((QueryTestKey)o).key; + /** + * @return {@code True} if value is even. + */ + public static boolean isAccepted(QueryTestValue val) { + return val == null || val.val1 % 2 == 0; } } /** * */ - static class QueryTestValue implements Serializable { - /** */ - private final Integer val1; - - /** */ - private final String val2; - - /** - * @param val Value. - */ - public QueryTestValue(Integer val) { - this.val1 = val; - this.val2 = String.valueOf(val); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - QueryTestValue that = (QueryTestValue) o; - - return val1.equals(that.val1) && val2.equals(that.val2); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = val1.hashCode(); - - res = 31 * res + val2.hashCode(); - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(QueryTestValue.class, this); + protected static class FilterFactory implements Factory<NonSerializableFilter> { + @Override public NonSerializableFilter create() { + return new NonSerializableFilter(); } } /** * */ - protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, Object> { + public abstract class LocalNonSerialiseListener implements + CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>, + CacheEntryCreatedListener<QueryTestKey, QueryTestValue>, + CacheEntryExpiredListener<QueryTestKey, QueryTestValue>, + CacheEntryRemovedListener<QueryTestKey, QueryTestValue>, + Externalizable { /** */ - private Object val; - - /** */ - private boolean retOld; - - /** - * @param val Value to set. - * @param retOld Return old value flag. - */ - public EntrySetValueProcessor(Object val, boolean retOld) { - this.val = val; - this.retOld = retOld; + public LocalNonSerialiseListener() { + // No-op. } /** {@inheritDoc} */ - @Override public Object process(MutableEntry<Object, Object> e, Object... args) { - Object old = retOld ? e.getValue() : null; - - if (val != null) - e.setValue(val); - else - e.remove(); - - return old; + @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> evts) throws CacheEntryListenerException { + onEvents(evts); } /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(EntrySetValueProcessor.class, this); + @Override public void onExpired(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> evts) throws CacheEntryListenerException { + onEvents(evts); } - } - - /** - * - */ - protected enum ContinuousDeploy { - CLIENT, SERVER, ALL - } - /** - * - */ - protected static class NonSerializableFilter - implements CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, Externalizable { /** {@inheritDoc} */ - @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event) - throws CacheEntryListenerException { - return isAccepted(event.getValue()); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - fail("Entry filter should not be marshaled."); + @Override public void onRemoved(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> evts) throws CacheEntryListenerException { + onEvents(evts); } /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - fail("Entry filter should not be marshaled."); + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> evts) throws CacheEntryListenerException { + onEvents(evts); } /** - * @return {@code True} if value is even. + * @param evts Events. */ - public static boolean isAccepted(QueryTestValue val) { - return val == null ? true : val.val1 % 2 == 0; + protected abstract void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> evts); + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + throw new UnsupportedOperationException("Failed. Listener should not be marshaled."); } - } - /** - * - */ - protected static class FilterFactory implements Factory<NonSerializableFilter> { - @Override public NonSerializableFilter create() { - return new NonSerializableFilter(); + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled."); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9ad476b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java new file mode 100644 index 0000000..ff8d0a7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.configuration.MutableCacheEntryListenerConfiguration; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryListener; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 5; + + /** */ + private static final int KEYS = 50; + + /** */ + private static final int VALS = 10; + + /** */ + public static final int ITERATION_CNT = 100; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + cfg.setPeerClassLoadingEnabled(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES - 1); + + client = true; + + startGrid(NODES - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicClient() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + ONHEAP_TIERED + ); + + testContinuousQuery(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomic() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + ONHEAP_TIERED + ); + + testContinuousQuery(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicated() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, + 0, + ATOMIC, + ONHEAP_TIERED + ); + + testContinuousQuery(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedClient() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, + 0, + ATOMIC, + ONHEAP_TIERED + ); + + testContinuousQuery(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTx() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + ONHEAP_TIERED + ); + + testContinuousQuery(ccfg, false); + } + /** + * @throws Exception If failed. + */ + public void testTxClient() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + ONHEAP_TIERED + ); + + testContinuousQuery(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxReplicated() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, + 0, + TRANSACTIONAL, + ONHEAP_TIERED + ); + + testContinuousQuery(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxReplicatedClient() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, + 0, + TRANSACTIONAL, + ONHEAP_TIERED + ); + + testContinuousQuery(ccfg, true); + } + + /** + * @param ccfg Cache configuration. + * @param isClient Client. + * @throws Exception If failed. + */ + protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, boolean isClient) + throws Exception { + ignite(0).createCache(ccfg); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + QueryCursor<?> cur = null; + + final Class<Factory<CacheEntryEventFilter>> evtFilterFactory = + (Class<Factory<CacheEntryEventFilter>>)getExternalClassLoader(). + loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory"); + + final CountDownLatch latch = new CountDownLatch(10); + + ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); + + CacheEntryUpdatedListener<Integer, Integer> localLsnr = new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, + ? extends Integer>> evts) throws CacheEntryListenerException { + for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + latch.countDown(); + } + }; + + MutableCacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = + new MutableCacheEntryListenerConfiguration<>( + new FactoryBuilder.SingletonFactory<>(localLsnr), + (Factory<? extends CacheEntryEventFilter<? super Integer, ? super Integer>>) + (Object)evtFilterFactory.newInstance(), + true, + true + ); + + qry.setLocalListener(localLsnr); + + qry.setRemoteFilterFactory( + (Factory<? extends CacheEntryEventFilter<Integer, Integer>>)(Object)evtFilterFactory.newInstance()); + + IgniteCache<Integer, Integer> cache = null; + + try { + if (isClient) + cache = grid(NODES - 1).cache(ccfg.getName()); + else + cache = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName()); + + //cur = cache.query(qry); + + cache.registerCacheEntryListener(lsnrCfg); + + for (int i = 0; i < 10; i++) + cache.put(i, i); + + assertTrue(latch.await(3, TimeUnit.SECONDS)); + } + finally { + if (cur != null) + cur.close(); + + if (cache != null) + cache.deregisterCacheEntryListener(lsnrCfg); + } + } + + /** + * + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @param atomicityMode Cache atomicity mode. + * @param memoryMode Cache memory mode. + * @return Cache configuration. + */ + private CacheConfiguration<Object, Object> cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setMemoryMode(memoryMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } + + /** + * + */ + public static class QueryTestKey implements Serializable, Comparable { + /** */ + private final Integer key; + + /** + * @param key Key. + */ + public QueryTestKey(Integer key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestKey that = (QueryTestKey)o; + + return key.equals(that.key); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestKey.class, this); + } + + /** {@inheritDoc} */ + @Override public int compareTo(Object o) { + return key - ((QueryTestKey)o).key; + } + } + + /** + * + */ + public static class QueryTestValue implements Serializable { + /** */ + protected final Integer val1; + + /** */ + protected final String val2; + + /** + * @param val Value. + */ + public QueryTestValue(Integer val) { + this.val1 = val; + this.val2 = String.valueOf(val); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestValue that = (QueryTestValue) o; + + return val1.equals(that.val1) && val2.equals(that.val2); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = val1.hashCode(); + + res = 31 * res + val2.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestValue.class, this); + } + } +}
