cc
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/674e7dd2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/674e7dd2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/674e7dd2 Branch: refs/heads/ignite-5075-cc-debug Commit: 674e7dd23edaaff66b84d27794a006896647c408 Parents: f651e87 Author: sboikov <sboi...@gridgain.com> Authored: Fri May 26 11:44:27 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri May 26 11:51:03 2017 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryEntry.java | 23 +++++++++++++------- .../CacheContinuousQueryEventBuffer.java | 18 ++++++++------- .../continuous/CacheContinuousQueryManager.java | 12 ++++++---- .../CacheContinuousQueryPartitionRecovery.java | 2 +- .../CacheContinuousQueryEventBufferTest.java | 7 +++--- ...eCacheContinuousQueryImmutableEntryTest.java | 4 ++-- 6 files changed, 40 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/674e7dd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index 28fdee3..3f463a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -128,6 +128,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { * @param part Partition. * @param updateCntr Update partition counter. * @param topVer Topology version if applicable. + * @param flags Flags. */ CacheContinuousQueryEntry( int cacheId, @@ -138,7 +139,8 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { boolean keepBinary, int part, long updateCntr, - @Nullable AffinityTopologyVersion topVer) { + @Nullable AffinityTopologyVersion topVer, + byte flags) { this.cacheId = cacheId; this.evtType = evtType; this.key = key; @@ -147,9 +149,17 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { this.part = part; this.updateCntr = updateCntr; this.topVer = topVer; + this.flags = flags; if (keepBinary) - flags |= KEEP_BINARY; + this.flags |= KEEP_BINARY; + } + + /** + * @return Flags. + */ + public byte flags() { + return flags; } /** @@ -233,7 +243,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { if (!isFiltered()) return this; - CacheContinuousQueryEntry e = new CacheContinuousQueryEntry( + return new CacheContinuousQueryEntry( cacheId, null, null, @@ -242,11 +252,8 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { false, part, updateCntr, - topVer); - - e.flags = flags; - - return e; + topVer, + flags); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/674e7dd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index a072240..afe34c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -363,7 +363,8 @@ public class CacheContinuousQueryEventBuffer { e.isKeepBinary(), e.partition(), e.updateCounter(), - e.topologyVersion()); + e.topologyVersion(), + e.flags()); flushEntry.filteredCount(filtered); @@ -399,13 +400,14 @@ public class CacheContinuousQueryEventBuffer { private CacheContinuousQueryEntry filteredEntry(long cntr, long filtered) { CacheContinuousQueryEntry e = new CacheContinuousQueryEntry(0, null, - null, - null, - null, - false, - part, - cntr, - topVer); + null, + null, + null, + false, + part, + cntr, + topVer, + (byte)0); e.markFiltered(); http://git-wip-us.apache.org/repos/asf/ignite/blob/674e7dd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 7cbb1e1..1a655e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -193,7 +193,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { lsnr.keepBinary(), partId, updCntr, - topVer); + topVer, + (byte)0); CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); @@ -339,7 +340,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { lsnr.keepBinary(), partId, updateCntr, - topVer); + topVer, + (byte)0); IgniteCacheProxy jcache = cctx.kernalContext().cache().jcacheProxy(cctx.name()); @@ -400,7 +402,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { lsnr.keepBinary(), e.partition(), -1, - null); + null, + (byte)0); CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); @@ -703,7 +706,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { keepBinary, 0, -1, - null); + null, + (byte)0); next = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), http://git-wip-us.apache.org/repos/asf/ignite/blob/674e7dd2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java index 59252d2..e210c24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java @@ -236,7 +236,7 @@ class CacheContinuousQueryPartitionRecovery { lastFiredEvt = e.getKey(); if (e.getValue() != HOLE && !e.getValue().isFiltered()) - entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue())); + entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, pending)); iter.remove(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/674e7dd2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java index 4710593..382f166 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java @@ -120,8 +120,8 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest false, 0, cntr, - null); - + null, + (byte)0); entries.add(entry); @@ -140,7 +140,8 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest false, 0, cntr, - null); + null, + (byte)0); expEntry.filteredCount(filtered); http://git-wip-us.apache.org/repos/asf/ignite/blob/674e7dd2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java index d230320..81a7515 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java @@ -35,7 +35,6 @@ import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObjectImpl; import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; -import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -138,7 +137,8 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst true, 1, 1L, - new AffinityTopologyVersion(1L)); + new AffinityTopologyVersion(1L), + (byte)0); e0.markFiltered();