cc
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e24b538d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e24b538d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e24b538d Branch: refs/heads/ignite-5075-cc-debug Commit: e24b538d106aa219d6bd141a699eaf879753cc80 Parents: 674e7dd Author: sboikov <sboi...@gridgain.com> Authored: Fri May 26 12:46:16 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri May 26 12:46:16 2017 +0300 ---------------------------------------------------------------------- .../CacheContinuousQueryEventBuffer.java | 44 +++++++------------- 1 file changed, 15 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e24b538d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index afe34c4..acc6d50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -81,40 +81,26 @@ public class CacheContinuousQueryEventBuffer { @Nullable Collection<CacheContinuousQueryEntry> flushOnExchange() { Collection<CacheContinuousQueryEntry> ret = null; - for (;;) { - Batch batch = curBatch.get(); - - if (batch != null) { - Collection<CacheContinuousQueryEntry> ret0 = batch.flushAndReset(); - - if (ret0 != null) { - if (ret == null) - ret = ret0; - else - ret.addAll(ret0); - } - } + Batch batch = curBatch.get(); - if (!backupQ.isEmpty()) { - if (ret == null) - ret = new ArrayList<>(); + if (batch != null) + ret = batch.flushCurrentEntries(); - CacheContinuousQueryEntry e; + if (!backupQ.isEmpty()) { + if (ret == null) + ret = new ArrayList<>(); - while ((e = backupQ.pollFirst()) != null) - ret.add(e); - } + CacheContinuousQueryEntry e; - if (!pending.isEmpty()) { - if (ret == null) - ret = new ArrayList<>(); + while ((e = backupQ.pollFirst()) != null) + ret.add(e); + } - ret.addAll(pending.values()); - } + if (!pending.isEmpty()) { + if (ret == null) + ret = new ArrayList<>(); - break; -// if (curBatch.compareAndSet(batch, null)) -// break; + ret.addAll(pending.values()); } return ret; @@ -330,7 +316,7 @@ public class CacheContinuousQueryEventBuffer { /** * @return Entries to send as part of backup queue. */ - @Nullable synchronized List<CacheContinuousQueryEntry> flushAndReset() { + @Nullable synchronized List<CacheContinuousQueryEntry> flushCurrentEntries() { if (entries == null) return null;