Repository: ignite Updated Branches: refs/heads/ignite-5075-cc be43bf8fe -> ed2f8b3b7
cc Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ed2f8b3b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ed2f8b3b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ed2f8b3b Branch: refs/heads/ignite-5075-cc Commit: ed2f8b3b719c42f380b8b247b1b62d4aa1c73184 Parents: be43bf8 Author: sboikov <[email protected]> Authored: Mon May 29 09:03:33 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon May 29 09:03:33 2017 +0300 ---------------------------------------------------------------------- .../CacheContinuousQueryEventBuffer.java | 43 +++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ed2f8b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index 7d33614..74b3ff8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteSystemProperties; @@ -78,37 +79,37 @@ public class CacheContinuousQueryEventBuffer { * @return Backup entries. */ @Nullable Collection<CacheContinuousQueryEntry> flushOnExchange() { - Collection<CacheContinuousQueryEntry> ret = null; - - Batch batch = curBatch.get(); - - if (batch != null) - ret = batch.flushCurrentEntries(); + TreeMap<Long, CacheContinuousQueryEntry> ret = null; int size = backupQ.sizex(); if (size > 0) { - if (ret == null) - ret = new ArrayList<>(); + ret = new TreeMap<>(); for (int i = 0; i < size; i++) { CacheContinuousQueryEntry e = backupQ.pollFirst(); if (e != null) - ret.add(e); + ret.put(e.updateCounter(), e); else break; } } + Batch batch = curBatch.get(); + + if (batch != null) + ret = batch.flushCurrentEntries(ret); + if (!pending.isEmpty()) { if (ret == null) - ret = new ArrayList<>(); + ret = new TreeMap<>(); - ret.addAll(pending.values()); + for (CacheContinuousQueryEntry e : pending.values()) + ret.put(e.updateCounter(), e); } - return ret; + return ret != null ? ret.values() : null; } /** @@ -319,13 +320,13 @@ public class CacheContinuousQueryEventBuffer { } /** + * @param res Current entries. * @return Entries to send as part of backup queue. */ - @Nullable synchronized List<CacheContinuousQueryEntry> flushCurrentEntries() { + @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries( + @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) { if (entries == null) - return null; - - List<CacheContinuousQueryEntry> res = null; + return res; long filtered = this.filtered; long cntr = startCntr; @@ -365,9 +366,9 @@ public class CacheContinuousQueryEventBuffer { if (flushEntry != null) { if (res == null) - res = new ArrayList<>(); + res = new TreeMap<>(); - res.add(flushEntry); + res.put(flushEntry.updateCounter(), flushEntry); } cntr++; @@ -375,9 +376,11 @@ public class CacheContinuousQueryEventBuffer { if (filtered != 0L) { if (res == null) - res = new ArrayList<>(); + res = new TreeMap<>(); + + CacheContinuousQueryEntry flushEntry = filteredEntry(cntr - 1, filtered - 1); - res.add(filteredEntry(cntr - 1, filtered - 1)); + res.put(flushEntry.updateCounter(), flushEntry); } return res;
