Repository: ignite Updated Branches: refs/heads/ignite-1186 950eea125 -> c26aa48a3
Merge branch 'master' into ignite-1186 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c26aa48a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c26aa48a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c26aa48a Branch: refs/heads/ignite-1186 Commit: c26aa48a3da70a59cf684381fabb6c19965bbd96 Parents: 950eea1 Author: nikolay_tikhonov <[email protected]> Authored: Tue Mar 1 16:54:15 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Tue Mar 1 16:54:15 2016 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 6 - .../internal/GridMessageListenHandler.java | 6 - .../continuous/CacheContinuousQueryHandler.java | 57 ++- .../CacheContinuousQueryHandlerV2.java | 52 +-- .../continuous/CacheContinuousQueryManager.java | 458 +++++++------------ .../continuous/GridContinuousHandler.java | 5 - .../CacheContinuousQueryFactoryFilterTest.java | 78 +++- ...acheContinuousQueryRandomOperationsTest.java | 2 +- 8 files changed, 272 insertions(+), 392 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c26aa48a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index 924a8ea..e2b1184 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -26,7 +26,6 @@ import java.util.LinkedList; import java.util.Map; import java.util.Queue; import java.util.UUID; -import javax.cache.event.CacheEntryEventFilter; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheEvent; @@ -142,11 +141,6 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public CacheEntryEventFilter getEventFilter() { - return null; - } - - /** {@inheritDoc} */ @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException { assert nodeId != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/c26aa48a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index e157c98..402365c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -24,7 +24,6 @@ import java.io.ObjectOutput; import java.util.Collection; import java.util.Map; import java.util.UUID; -import javax.cache.event.CacheEntryEventFilter; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; @@ -131,11 +130,6 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public CacheEntryEventFilter getEventFilter() { - return null; - } - - /** {@inheritDoc} */ @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException { ctx.io().addUserMessageListener(topic, pred); http://git-wip-us.apache.org/repos/asf/ignite/blob/c26aa48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 393f7fb..10fbd89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -168,30 +168,18 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * @param topic Topic for ordered messages. * @param locLsnr Local listener. * @param rmtFilter Remote filter. - * @param internal Internal flag. - * @param notifyExisting Notify existing flag. * @param oldValRequired Old value required flag. * @param sync Synchronous flag. * @param ignoreExpired Ignore expired events flag. - * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. - * @param taskHash Task name hash code. - * @param locCache {@code True} if local cache. - * @param keepBinary Keep binary flag. */ public CacheContinuousQueryHandler( String cacheName, Object topic, CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventSerializableFilter<K, V> rmtFilter, - boolean internal, - boolean notifyExisting, boolean oldValRequired, boolean sync, boolean ignoreExpired, - int taskHash, - boolean skipPrimaryCheck, - boolean locCache, - boolean keepBinary, boolean ignoreClsNotFound) { assert topic != null; assert locLsnr != null; @@ -200,20 +188,49 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler this.topic = topic; this.locLsnr = locLsnr; this.rmtFilter = rmtFilter; - this.internal = internal; - this.notifyExisting = notifyExisting; this.oldValRequired = oldValRequired; this.sync = sync; this.ignoreExpired = ignoreExpired; - this.taskHash = taskHash; - this.skipPrimaryCheck = skipPrimaryCheck; - this.locCache = locCache; - this.keepBinary = keepBinary; this.ignoreClsNotFound = ignoreClsNotFound; cacheId = CU.cacheId(cacheName); } + /** + * @param internal Internal query. + */ + public void internal(boolean internal) { + this.internal = internal; + } + + /** + * @param notifyExisting Notify existing. + */ + public void notifyExisting(boolean notifyExisting) { + this.notifyExisting = notifyExisting; + } + + /** + * @param locCache Local cache. + */ + public void localCache(boolean locCache) { + this.locCache = locCache; + } + + /** + * @param taskHash Task hash. + */ + public void taskNameHash(int taskHash) { + this.taskHash = taskHash; + } + + /** + * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. + */ + public void skipPrimaryCheck(boolean skipPrimaryCheck) { + this.skipPrimaryCheck = skipPrimaryCheck; + } + /** {@inheritDoc} */ @Override public boolean isEvents() { return false; @@ -520,7 +537,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler return mgr.registerListener(routineId, lsnr, internal); } - /** {@inheritDoc} */ + /** + * @return Cache entry event filter. + */ public CacheEntryEventFilter getEventFilter() { return rmtFilter; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c26aa48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java index a660dce..7aef4dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java @@ -66,15 +66,9 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan * @param topic Topic for ordered messages. * @param locLsnr Local listener. * @param rmtFilterFactory Remote filter factory. - * @param internal Internal flag. - * @param notifyExisting Notify existing flag. * @param oldValRequired Old value required flag. * @param sync Synchronous flag. * @param ignoreExpired Ignore expired events flag. - * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. - * @param taskHash Task name hash code. - * @param locCache {@code True} if local cache. - * @param keepBinary Keep binary flag. * @param types Event types. */ public CacheContinuousQueryHandlerV2( @@ -82,30 +76,18 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan Object topic, CacheEntryUpdatedListener<K, V> locLsnr, Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory, - boolean internal, - boolean notifyExisting, boolean oldValRequired, boolean sync, boolean ignoreExpired, - int taskHash, - boolean skipPrimaryCheck, - boolean locCache, - boolean keepBinary, boolean ignoreClsNotFound, @Nullable Byte types) { super(cacheName, topic, locLsnr, null, - internal, - notifyExisting, oldValRequired, sync, ignoreExpired, - taskHash, - skipPrimaryCheck, - locCache, - keepBinary, ignoreClsNotFound); assert rmtFilterFactory != null; @@ -126,10 +108,10 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan Factory<? extends CacheEntryEventFilter> factory = rmtFilterFactory; - if (types != 0) - factory = new JCacheRemoteQueryFactory(rmtFilterFactory, types); - filter = factory.create(); + + if (types != 0) + filter = new JCacheQueryRemoteFilter(filter, types); } return filter; @@ -191,32 +173,4 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan types = in.readByte(); } - - /** - * - */ - private static class JCacheRemoteQueryFactory implements Factory<CacheEntryEventFilter> { - /** */ - private static final long serialVersionUID = 0L; - - /** Factory. */ - protected Factory<? extends CacheEntryEventFilter> impl; - - /** */ - private byte types; - - /** - * @param impl Factory. - * @param types Types. - */ - public JCacheRemoteQueryFactory(@Nullable Factory<? extends CacheEntryEventFilter> impl, byte types) { - this.impl = impl; - this.types = types; - } - - /** {@inheritDoc} */ - @Override public JCacheQueryRemoteFilter create() { - return new JCacheQueryRemoteFilter(impl != null ? impl.create() : null, types); - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c26aa48a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 06f14e1..50253fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.resources.LoggerResource; @@ -415,45 +416,80 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @return Continuous routine ID. * @throws IgniteCheckedException In case of error. */ - public UUID executeQuery(CacheEntryUpdatedListener locLsnr, - @Nullable CacheEntryEventSerializableFilter rmtFilter, - @Nullable Factory<? extends CacheEntryEventSerializableFilter> rmtFilterFactory, + public UUID executeQuery(final CacheEntryUpdatedListener locLsnr, + @Nullable final CacheEntryEventSerializableFilter rmtFilter, + @Nullable final Factory<? extends CacheEntryEventFilter> rmtFilterFactory, int bufSize, long timeInterval, boolean autoUnsubscribe, boolean loc, - boolean keepBinary) throws IgniteCheckedException + final boolean keepBinary) throws IgniteCheckedException { + IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr; + if (rmtFilterFactory != null) - return executeQueryWithFilterFactory( - locLsnr, - rmtFilterFactory, - bufSize, - timeInterval, - autoUnsubscribe, - false, - false, - true, - false, - true, - loc, - keepBinary, - false); + clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() { + @Override public CacheContinuousQueryHandler apply(Boolean v2) { + CacheContinuousQueryHandler hnd; + + if (v2) + hnd = new CacheContinuousQueryHandlerV2( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + rmtFilterFactory, + true, + false, + true, + false, + null); + else { + CacheEntryEventFilter fltr = rmtFilterFactory.create(); + + if (!(fltr instanceof CacheEntryEventSerializableFilter)) + throw new IgniteException("Topology has nodes of the old versions. In this case " + + "EntryEventFilter should implement " + + "org.apache.ignite.cache.CacheEntryEventSerializableFilter interface. Filter: " + fltr); + + hnd = new CacheContinuousQueryHandler( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + (CacheEntryEventSerializableFilter)fltr, + true, + false, + true, + false); + } + + return hnd; + } + }; else - return executeQueryWithFilter( - locLsnr, - rmtFilter, - bufSize, - timeInterval, - autoUnsubscribe, - false, - false, - true, - false, - true, - loc, - keepBinary, - false); + clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() { + @Override public CacheContinuousQueryHandler apply(Boolean ignore) { + return new CacheContinuousQueryHandler( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + rmtFilter, + true, + false, + true, + false); + } + }; + + return executeQuery0( + locLsnr, + clsr, + bufSize, + timeInterval, + autoUnsubscribe, + false, + false, + loc, + keepBinary); } /** @@ -464,27 +500,35 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @return Continuous routine ID. * @throws IgniteCheckedException In case of error. */ - public UUID executeInternalQuery(CacheEntryUpdatedListener<?, ?> locLsnr, - CacheEntryEventSerializableFilter rmtFilter, - boolean loc, - boolean notifyExisting, - boolean ignoreClassNotFound) + public UUID executeInternalQuery(final CacheEntryUpdatedListener<?, ?> locLsnr, + final CacheEntryEventSerializableFilter rmtFilter, + final boolean loc, + final boolean notifyExisting, + final boolean ignoreClassNotFound) throws IgniteCheckedException { - return executeQueryWithFilter( + return executeQuery0( locLsnr, - rmtFilter, + new IgniteClosure<Boolean, CacheContinuousQueryHandler>() { + @Override public CacheContinuousQueryHandler apply(Boolean aBoolean) { + return new CacheContinuousQueryHandler( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + rmtFilter, + true, + false, + true, + ignoreClassNotFound); + } + }, ContinuousQuery.DFLT_PAGE_SIZE, ContinuousQuery.DFLT_TIME_INTERVAL, ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, true, notifyExisting, - true, - false, - true, loc, - false, - ignoreClassNotFound); + false); } /** @@ -558,195 +602,24 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** * @param locLsnr Local listener. - * @param rmtFilter Remote filter. - * @param bufSize Buffer size. - * @param timeInterval Time interval. - * @param autoUnsubscribe Auto unsubscribe flag. - * @param internal Internal flag. - * @param notifyExisting Notify existing flag. - * @param oldValRequired Old value required flag. - * @param sync Synchronous flag. - * @param ignoreExpired Ignore expired event flag. - * @param loc Local flag. - * @return Continuous routine ID. - * @throws IgniteCheckedException In case of error. - */ - private UUID executeQueryWithFilter(CacheEntryUpdatedListener locLsnr, - final CacheEntryEventSerializableFilter rmtFilter, - int bufSize, - long timeInterval, - boolean autoUnsubscribe, - boolean internal, - boolean notifyExisting, - boolean oldValRequired, - boolean sync, - boolean ignoreExpired, - boolean loc, - final boolean keepBinary, - boolean ignoreClassNotFound) throws IgniteCheckedException - { - cctx.checkSecurity(SecurityPermission.CACHE_READ); - - int taskNameHash = !internal && cctx.kernalContext().security().enabled() ? - cctx.kernalContext().job().currentTaskNameHash() : 0; - - boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode(); - - GridContinuousHandler hnd = new CacheContinuousQueryHandler( - cctx.name(), - TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), - locLsnr, - rmtFilter, - internal, - notifyExisting, - oldValRequired, - sync, - ignoreExpired, - taskNameHash, - skipPrimaryCheck, - cctx.isLocal(), - keepBinary, - ignoreClassNotFound); - - return executeQuery0(locLsnr, - bufSize, - timeInterval, - autoUnsubscribe, - notifyExisting, - loc, - keepBinary, - hnd); - } - - /** - * @param locLsnr Local listener. - * @param types JCache event types. * @param bufSize Buffer size. * @param timeInterval Time interval. * @param autoUnsubscribe Auto unsubscribe flag. * @param internal Internal flag. * @param notifyExisting Notify existing flag. - * @param oldValRequired Old value required flag. - * @param sync Synchronous flag. - * @param ignoreExpired Ignore expired event flag. * @param loc Local flag. * @return Continuous routine ID. * @throws IgniteCheckedException In case of error. */ - private UUID executeJCacheQueryFactory(CacheEntryUpdatedListener locLsnr, - final Factory<CacheEntryEventFilter> rmtFilterFactory, - byte types, - int bufSize, - long timeInterval, - boolean autoUnsubscribe, - boolean internal, - boolean notifyExisting, - boolean oldValRequired, - boolean sync, - boolean ignoreExpired, - boolean loc, - final boolean keepBinary, - boolean ignoreClassNotFound) throws IgniteCheckedException - { - assert types != 0 : types; - - cctx.checkSecurity(SecurityPermission.CACHE_READ); - - int taskNameHash = !internal && cctx.kernalContext().security().enabled() ? - cctx.kernalContext().job().currentTaskNameHash() : 0; - - boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode(); - - boolean v2 = rmtFilterFactory != null && useV2Protocol(cctx.discovery().allNodes()); - - GridContinuousHandler hnd; - - if (v2) - hnd = new CacheContinuousQueryHandlerV2( - cctx.name(), - TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), - locLsnr, - rmtFilterFactory, - internal, - notifyExisting, - oldValRequired, - sync, - ignoreExpired, - taskNameHash, - skipPrimaryCheck, - cctx.isLocal(), - keepBinary, - ignoreClassNotFound, - types); - else { - JCacheQueryRemoteFilter jCacheFilter; - - CacheEntryEventFilter filter = null; - - if (rmtFilterFactory != null) { - filter = rmtFilterFactory.create(); - - if (!(filter instanceof Serializable)) - throw new IgniteCheckedException("Topology has nodes of the old versions. In this case " + - "EntryEventFilter must implement java.io.Serializable interface. Filter: " + filter); - } - - jCacheFilter = new JCacheQueryRemoteFilter(filter, types); - - hnd = new CacheContinuousQueryHandler( - cctx.name(), - TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), - locLsnr, - jCacheFilter, - internal, - notifyExisting, - oldValRequired, - sync, - ignoreExpired, - taskNameHash, - skipPrimaryCheck, - cctx.isLocal(), - keepBinary, - ignoreClassNotFound); - } - - return executeQuery0(locLsnr, - bufSize, - timeInterval, - autoUnsubscribe, - notifyExisting, - loc, - keepBinary, - hnd); - } - - /** - * @param locLsnr Local listener. - * @param bufSize Buffer size. - * @param timeInterval Time interval. - * @param autoUnsubscribe Auto unsubscribe flag. - * @param internal Internal flag. - * @param notifyExisting Notify existing flag. - * @param oldValRequired Old value required flag. - * @param sync Synchronous flag. - * @param ignoreExpired Ignore expired event flag. - * @param loc Local flag. - * @return Continuous routine ID. - * @throws IgniteCheckedException In case of error. - */ - private UUID executeQueryWithFilterFactory(CacheEntryUpdatedListener locLsnr, - final Factory<? extends CacheEntryEventFilter> rmtFilterFactory, + private UUID executeQuery0(CacheEntryUpdatedListener locLsnr, + IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr, int bufSize, long timeInterval, boolean autoUnsubscribe, boolean internal, boolean notifyExisting, - boolean oldValRequired, - boolean sync, - boolean ignoreExpired, boolean loc, - final boolean keepBinary, - boolean ignoreClassNotFound) throws IgniteCheckedException + final boolean keepBinary) throws IgniteCheckedException { cctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -755,87 +628,17 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode(); - boolean v2 = rmtFilterFactory != null && useV2Protocol(cctx.discovery().allNodes()); + boolean v2 = useV2Protocol(cctx.discovery().allNodes()); - GridContinuousHandler hnd; + final CacheContinuousQueryHandler hnd = clsr.apply(v2); - if (v2) - hnd = new CacheContinuousQueryHandlerV2( - cctx.name(), - TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), - locLsnr, - rmtFilterFactory, - internal, - notifyExisting, - oldValRequired, - sync, - ignoreExpired, - taskNameHash, - skipPrimaryCheck, - cctx.isLocal(), - keepBinary, - ignoreClassNotFound, - null); - else { - CacheEntryEventFilter fltr = null; + hnd.taskNameHash(taskNameHash); + hnd.skipPrimaryCheck(skipPrimaryCheck); + hnd.notifyExisting(notifyExisting); + hnd.internal(internal); + hnd.keepBinary(keepBinary); + hnd.localCache(loc); - if (rmtFilterFactory != null) { - fltr = rmtFilterFactory.create(); - - if (!(fltr instanceof CacheEntryEventSerializableFilter)) - throw new IgniteCheckedException("Topology has nodes of the old versions. In this case " + - "EntryEventFilter should implement org.apache.ignite.cache.CacheEntryEventSerializableFilter " + - "interface. Filter: " + fltr); - } - - hnd = new CacheContinuousQueryHandler( - cctx.name(), - TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), - locLsnr, - (CacheEntryEventSerializableFilter)fltr, - internal, - notifyExisting, - oldValRequired, - sync, - ignoreExpired, - taskNameHash, - skipPrimaryCheck, - cctx.isLocal(), - keepBinary, - ignoreClassNotFound); - } - - return executeQuery0(locLsnr, - bufSize, - timeInterval, - autoUnsubscribe, - notifyExisting, - loc, - keepBinary, - hnd); - } - - /** - * @param locLsnr Local listener. - * @param bufSize Buffer size. - * @param timeInterval Time interval. - * @param autoUnsubscribe Auto unsubscribe flag. - * @param notifyExisting Notify existing flag. - * @param loc Local flag. - * @param keepBinary Keep binary. - * @param hnd Handler. - * @return Continuous routine ID. - * @throws IgniteCheckedException In case of error. - */ - private UUID executeQuery0(CacheEntryUpdatedListener locLsnr, - int bufSize, - long timeInterval, - boolean autoUnsubscribe, - boolean notifyExisting, - boolean loc, - final boolean keepBinary, - final GridContinuousHandler hnd) - throws IgniteCheckedException { IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ? F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue(); @@ -1028,25 +831,70 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { if (types == 0) throw new IgniteCheckedException("Listener must implement one of CacheEntryListener sub-interfaces."); - CacheEntryUpdatedListener locLsnr = new JCacheQueryLocalListener( + final byte types0 = types; + + final CacheEntryUpdatedListener locLsnr = new JCacheQueryLocalListener( locLsnrImpl, log); - routineId = executeJCacheQueryFactory( + routineId = executeQuery0( locLsnr, - cfg.getCacheEntryEventFilterFactory(), - types, + new IgniteClosure<Boolean, CacheContinuousQueryHandler>() { + @Override public CacheContinuousQueryHandler apply(Boolean v2) { + CacheContinuousQueryHandler hnd; + Factory<CacheEntryEventFilter> rmtFilterFactory = cfg.getCacheEntryEventFilterFactory(); + + v2 = rmtFilterFactory != null && v2; + + if (v2) + hnd = new CacheContinuousQueryHandlerV2( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + rmtFilterFactory, + cfg.isOldValueRequired(), + cfg.isSynchronous(), + false, + false, + types0); + else { + JCacheQueryRemoteFilter jCacheFilter; + + CacheEntryEventFilter filter = null; + + if (rmtFilterFactory != null) { + filter = rmtFilterFactory.create(); + + if (!(filter instanceof Serializable)) + throw new IgniteException("Topology has nodes of the old versions. " + + "In this case EntryEventFilter must implement java.io.Serializable " + + "interface. Filter: " + filter); + } + + jCacheFilter = new JCacheQueryRemoteFilter(filter, types0); + + hnd = new CacheContinuousQueryHandler( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + jCacheFilter, + cfg.isOldValueRequired(), + cfg.isSynchronous(), + false, + false); + } + + return hnd; + } + }, ContinuousQuery.DFLT_PAGE_SIZE, ContinuousQuery.DFLT_TIME_INTERVAL, ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, false, false, - cfg.isOldValueRequired(), - cfg.isSynchronous(), - false, false, - keepBinary, - false); + keepBinary + ); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/c26aa48a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index 232e1ff..48227fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -159,9 +159,4 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { * @param topVer Topology version. */ public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs); - - /** - * @return Cache entry filter. - */ - public CacheEntryEventFilter getEventFilter(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/c26aa48a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java index baa5a62..6143fa9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java @@ -26,10 +26,12 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import javax.cache.configuration.CacheEntryListenerConfiguration; import javax.cache.configuration.Factory; @@ -56,7 +58,10 @@ import org.apache.ignite.transactions.TransactionIsolation; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.NonSerializableFilter.isAccepted; import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT; import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER; @@ -80,6 +85,53 @@ public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryR /** */ public static final int ITERATION_CNT = 40; + /** + * @throws Exception If failed. + */ + public void testInternalQuery() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, + 1, + ATOMIC, + ONHEAP_TIERED, + false); + + final IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg); + + UUID uuid = null; + + try { + for (int i = 0; i < 10; i++) + cache.put(i, i); + + final CountDownLatch latch = new CountDownLatch(5); + + CacheEntryUpdatedListener lsnr = new CacheEntryUpdatedListener() { + @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { + for (Object evt : iterable) { + latch.countDown(); + + log.info("Received event: " + evt); + } + } + }; + + uuid = grid(0).context().cache().cache(cache.getName()).context().continuousQueries() + .executeInternalQuery(lsnr, new SerializableFilter(), false, true, true); + + for (int i = 10; i < 20; i++) + cache.put(i, i); + + assertTrue(latch.await(3, SECONDS)); + } + finally { + if (uuid != null) + grid(0).context().cache().cache(cache.getName()).context().continuousQueries() + .cancelInternalQuery(uuid); + + cache.destroy(); + } + } + /** {@inheritDoc} */ @Override protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy) throws Exception { @@ -542,7 +594,8 @@ public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryR * */ protected static class NonSerializableFilter - implements CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, Externalizable { + implements CacheEntryEventSerializableFilter<CacheContinuousQueryRandomOperationsTest.QueryTestKey, + CacheContinuousQueryRandomOperationsTest.QueryTestValue>, Externalizable { /** */ public NonSerializableFilter() { // No-op. @@ -575,6 +628,29 @@ public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryR /** * */ + protected static class SerializableFilter implements CacheEntryEventSerializableFilter<Integer, Integer>{ + /** */ + public SerializableFilter() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> event) + throws CacheEntryListenerException { + return isAccepted(event.getValue()); + } + + /** + * @return {@code True} if value is even. + */ + public static boolean isAccepted(Integer val) { + return val == null || val % 2 == 0; + } + } + + /** + * + */ protected static class FilterFactory implements Factory<NonSerializableFilter> { @Override public NonSerializableFilter create() { return new NonSerializableFilter(); http://git-wip-us.apache.org/repos/asf/ignite/blob/c26aa48a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java index c18cf35..cdf4ffd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java @@ -1175,7 +1175,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract * @param store If {@code true} configures dummy cache store. * @return Cache configuration. */ - private CacheConfiguration<Object, Object> cacheConfiguration( + protected CacheConfiguration<Object, Object> cacheConfiguration( CacheMode cacheMode, int backups, CacheAtomicityMode atomicityMode,
