Repository: ignite Updated Branches: refs/heads/ignite-5075-cc 52b7b5bad -> 3122352ca
cc Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3122352c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3122352c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3122352c Branch: refs/heads/ignite-5075-cc Commit: 3122352ca0ad78f07379455a529195fb925f15e7 Parents: 52b7b5b Author: sboikov <[email protected]> Authored: Wed May 24 15:02:36 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed May 24 15:02:36 2017 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryEntry.java | 18 +++++++++++- .../CacheContinuousQueryEventBuffer.java | 2 +- .../continuous/CacheContinuousQueryHandler.java | 29 +++++++++----------- .../CacheContinuousQueryEventBufferTest.java | 4 +-- 4 files changed, 33 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3122352c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index e40f83e..9db92b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -109,7 +109,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { private boolean keepBinary; /** */ - public long filteredCnt; + private long filteredCnt; /** * Required by {@link Message}. @@ -207,6 +207,22 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { } /** + * @param filteredCnt Number of entries filtered before this entry. + */ + void filteredCount(long filteredCnt) { + assert filteredCnt >= 0 : filteredCnt; + + this.filteredCnt = filteredCnt; + } + + /** + * @return Number of entries filtered before this entry. + */ + long filteredCount() { + return filteredCnt; + } + + /** * @return If entry filtered then will return light-weight <i><b>new entry</b></i> without values and key * (avoid to huge memory consumption), otherwise {@code this}. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3122352c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index e3a8eda..7ddea92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -217,7 +217,7 @@ public class CacheContinuousQueryEventBuffer { CacheContinuousQueryEntry evt0 = (CacheContinuousQueryEntry)e; if (!evt0.isFiltered()) { - evt0.filteredCnt = filtered; + evt0.filteredCount(filtered); filtered = 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/3122352c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index ab70f81..18faff4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -1097,20 +1097,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } // Check duplicate. - if (entry.updateCounter() > lastFiredEvt) { + if (entry.updateCounter() > lastFiredEvt) pendingEvts.put(entry.updateCounter(), entry); - - // TODO - if (entry.filteredCnt > 0) { - long filteredCntr = entry.updateCounter() - entry.filteredCnt; - - for (long i = 0; i < entry.filteredCnt; i++) { - pendingEvts.put(filteredCntr, HOLE); - - filteredCntr++; - } - } - } else { if (log.isDebugEnabled()) log.debug("Skip duplicate continuous query message: " + entry); @@ -1143,6 +1131,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } LT.warn(log, "Pending events reached max of buffer size [cache=" + cctx.name() + + ", bufSize=" + MAX_BUFF_SIZE + ", partId=" + entry.partition() + ']'); for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) { @@ -1157,12 +1146,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } } else { - // Elements are consistently. while (iter.hasNext()) { Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); - if (e.getKey() == lastFiredEvt + 1) { - ++lastFiredEvt; + CacheContinuousQueryEntry pending = e.getValue(); + + long filtered = pending.filteredCount(); + + boolean fire = entry.updateCounter() == lastFiredEvt + 1;; + + if (!fire && filtered > 0) + fire = entry.updateCounter() - filtered == lastFiredEvt + 1; + + if (fire) { + lastFiredEvt = entry.updateCounter(); if (e.getValue() != HOLE && !e.getValue().isFiltered()) entries.add(new CacheContinuousQueryEvent<K, V>(cache, cctx, e.getValue())); http://git-wip-us.apache.org/repos/asf/ignite/blob/3122352c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java index 75a664c..bc32e00 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java @@ -143,7 +143,7 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest cntr, null); - expEntry.filteredCnt = filtered; + expEntry.filteredCount(filtered); cntr++; @@ -221,7 +221,7 @@ public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest CacheContinuousQueryEntry actualEvt = actualEntries.get(i); assertEquals(expEvt.updateCounter(), actualEvt.updateCounter()); - assertEquals(expEvt.filteredCnt, actualEvt.filteredCnt); + assertEquals(expEvt.filteredCount(), actualEvt.filteredCount()); } } }
