ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9db0d486 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9db0d486 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9db0d486 Branch: refs/heads/ignite-5075 Commit: 9db0d4862070da6db7973e2b66067dd00da0ba8d Parents: 681454c Author: sboikov <[email protected]> Authored: Mon May 29 17:19:53 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon May 29 17:19:53 2017 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryHandler.java | 15 ++++++++ .../CacheContinuousQueryListener.java | 16 ++++++++ .../continuous/CacheContinuousQueryManager.java | 9 +++-- .../query/continuous/CounterSkipContext.java | 40 ++++++++++++-------- 4 files changed, 61 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9db0d486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 82b57b4..8d6aa2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -487,6 +487,21 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler onEntryUpdated(evt, primary, false, null); } + @Override public CounterSkipContext skipUpdateCounter(GridCacheContext cctx, + @Nullable CounterSkipContext ctx, + int part, + long cntr, + AffinityTopologyVersion topVer) { + CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, part); + + if (ctx == null) + ctx = new CounterSkipContext(part, cntr, topVer); + + buf.processEntry(ctx.entry(), true); + + return ctx; + } + @Override public void onPartitionEvicted(int part) { entryBufs.remove(part); } http://git-wip-us.apache.org/repos/asf/ignite/blob/9db0d486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index 84b22f9..fe9c198 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.util.Map; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture; import org.jetbrains.annotations.Nullable; @@ -76,6 +77,21 @@ public interface CacheContinuousQueryListener<K, V> { public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer, boolean primary); /** + * @param cctx Cache context. + * @param skipCtx Context. + * @param part Partition. + * @param cntr Counter to skip. + * @param topVer Topology version. + * @return Context. + */ + public CounterSkipContext skipUpdateCounter( + GridCacheContext cctx, + @Nullable CounterSkipContext skipCtx, + int part, + long cntr, + AffinityTopologyVersion topVer); + + /** * @param part Partition. */ public void onPartitionEvicted(int part); http://git-wip-us.apache.org/repos/asf/ignite/blob/9db0d486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 6a10ed5..9910955 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -204,17 +204,20 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** - * @param ctx Context. + * @param skipCtx Context. * @param part Partition number. * @param cntr Update counter. * @param topVer Topology version. * @return Context. */ - @Nullable public CounterSkipContext skipUpdateCounter(@Nullable CounterSkipContext ctx, + @Nullable public CounterSkipContext skipUpdateCounter(@Nullable CounterSkipContext skipCtx, int part, long cntr, AffinityTopologyVersion topVer) { - return null; + for (CacheContinuousQueryListener lsnr : lsnrs.values()) + skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr, topVer); + + return skipCtx; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9db0d486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java index 89ac6f9..41183c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java @@ -26,27 +26,35 @@ import org.jetbrains.annotations.Nullable; */ public class CounterSkipContext { /** */ - private CacheContinuousQueryEntry entry; + private final CacheContinuousQueryEntry entry; /** */ private List<Runnable> readySendC; - CacheContinuousQueryEntry entry(int part, long cntr, AffinityTopologyVersion topVer) { - if (entry == null) { - entry = new CacheContinuousQueryEntry(0, - null, - null, - null, - null, - false, - part, - cntr, - topVer, - (byte)0); - - entry.markFiltered(); - } + /** + * @param part Partition. + * @param cntr Filtered counter. + * @param topVer Topology version. + */ + CounterSkipContext(int part, long cntr, AffinityTopologyVersion topVer) { + entry = new CacheContinuousQueryEntry(0, + null, + null, + null, + null, + false, + part, + cntr, + topVer, + (byte)0); + + entry.markFiltered(); + } + /** + * @return Entry for filtered counter. + */ + CacheContinuousQueryEntry entry() { return entry; }
