cc
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ff0a2dd8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ff0a2dd8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ff0a2dd8 Branch: refs/heads/ignite-5075-cc-debug Commit: ff0a2dd8aeaf621b79c504934f6830537323fcd1 Parents: e6ffd82 Author: sboikov <[email protected]> Authored: Thu May 25 15:37:47 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu May 25 15:37:47 2017 +0300 ---------------------------------------------------------------------- .../CacheContinuousQueryEventBuffer.java | 201 ++++++++++++++----- .../CacheContinuousQueryPartitionRecovery.java | 23 ++- 2 files changed, 167 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ff0a2dd8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index f0640b1..f496c8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.jetbrains.annotations.Nullable; /** @@ -75,15 +76,27 @@ public class CacheContinuousQueryEventBuffer { * @return Backup entries. */ @Nullable Collection<CacheContinuousQueryEntry> resetBackupQueue() { + Collection<CacheContinuousQueryEntry> ret; + + List<CacheContinuousQueryEntry> entries = null; + + Batch batch = curBatch.get(); + + if (batch != null) + entries = batch.backupFlushEntries(); + if (!backupQ.isEmpty()) { - ConcurrentLinkedDeque<CacheContinuousQueryEntry> ret = this.backupQ; + if (entries != null) + backupQ.addAll(entries); - backupQ = new ConcurrentLinkedDeque<>(); + ret = this.backupQ; - return ret; + backupQ = new ConcurrentLinkedDeque<>(); } + else + ret = entries; - return null; + return ret; } /** @@ -122,11 +135,9 @@ public class CacheContinuousQueryEventBuffer { private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean backup) { assert cntr >= 0 : cntr; - Batch batch = initBatch(); + Batch batch = initBatch(entry.topologyVersion()); if (batch == null || cntr < batch.startCntr) { - assert entry != null : cntr; - if (backup) backupQ.add(entry); @@ -157,9 +168,10 @@ public class CacheContinuousQueryEventBuffer { } /** + * @param topVer Current event topology version. * @return Current batch. */ - @Nullable private Batch initBatch() { + @Nullable private Batch initBatch(AffinityTopologyVersion topVer) { Batch batch = curBatch.get(); if (batch != null) @@ -170,7 +182,7 @@ public class CacheContinuousQueryEventBuffer { if (curCntr == -1) return null; - batch = new Batch(curCntr + 1, 0L, new Object[BUF_SIZE]); + batch = new Batch(curCntr + 1, 0L, new CacheContinuousQueryEntry[BUF_SIZE], topVer); if (curBatch.compareAndSet(null, batch)) return batch; @@ -216,28 +228,117 @@ public class CacheContinuousQueryEventBuffer { private int lastProc = -1; /** */ - private final Object[] evts; + private final CacheContinuousQueryEntry[] entries; + + /** */ + private final AffinityTopologyVersion topVer; /** * @param filtered Number of filtered events before this batch. - * @param evts Events array. + * @param entries Entries array. + * @param topVer Current event topology version. * @param startCntr Start counter. */ - Batch(long startCntr, long filtered, Object[] evts) { + Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) { assert startCntr >= 0; assert filtered >= 0; this.startCntr = startCntr; this.filtered = filtered; - this.evts = evts; + this.entries = entries; + this.topVer = topVer; endCntr = startCntr + BUF_SIZE - 1; } /** + * @return Entries to send as part of backup queue. + */ + @Nullable synchronized List<CacheContinuousQueryEntry> backupFlushEntries() { + List<CacheContinuousQueryEntry> res = null; + + long filtered = this.filtered; + long cntr = startCntr; + + for (int i = 0; i < entries.length; i++) { + CacheContinuousQueryEntry e = entries[i]; + + CacheContinuousQueryEntry flushEntry = null; + + if (e == null) { + if (filtered != 0) { + flushEntry = filteredEntry(cntr - 1, filtered - 1); + + filtered = 0; + } + } + else { + if (e.isFiltered()) + filtered++; + else { + flushEntry = new CacheContinuousQueryEntry(e.cacheId(), + e.eventType(), + e.key(), + e.value(), + e.oldValue(), + e.isKeepBinary(), + e.partition(), + e.updateCounter(), + e.topologyVersion()); + + flushEntry.filteredCount(filtered); + + filtered = 0; + } + } + + if (flushEntry != null) { + if (res == null) + res = new ArrayList<>(); + + res.add(flushEntry); + } + + cntr++; + } + + if (filtered != 0L) { + if (res == null) + res = new ArrayList<>(); + + res.add(filteredEntry(cntr - 1, filtered - 1)); + } + + return res; + } + + /** + * @param cntr Entry counter. + * @param filtered Number of entries filtered before this entry. + * @return Entry. + */ + private CacheContinuousQueryEntry filteredEntry(long cntr, long filtered) { + CacheContinuousQueryEntry e = new CacheContinuousQueryEntry(0, + null, + null, + null, + null, + false, + part, + cntr, + topVer); + + e.markFiltered(); + + e.filteredCount(filtered); + + return e; + } + + /** * @param res Current result. - * @param cntr Event counter. - * @param evt Event. + * @param cntr Entry counter. + * @param entry Entry. * @param backup Backup entry flag. * @return New result. */ @@ -245,60 +346,54 @@ public class CacheContinuousQueryEventBuffer { @Nullable private Object processEvent0( @Nullable Object res, long cntr, - CacheContinuousQueryEntry evt, + CacheContinuousQueryEntry entry, boolean backup) { int pos = (int)(cntr - startCntr); synchronized (this) { - evts[pos] = evt; + entries[pos] = entry; int next = lastProc + 1; if (next == pos) { - for (int i = next; i < evts.length; i++) { - Object e = evts[i]; - - if (e != null) { - if (e.getClass() == Long.class) - filtered++; - else { - CacheContinuousQueryEntry evt0 = (CacheContinuousQueryEntry)e; + for (int i = next; i < entries.length; i++) { + CacheContinuousQueryEntry entry0 = entries[i]; - if (!evt0.isFiltered()) { - evt0.filteredCount(filtered); + if (entry0 != null) { + if (!entry0.isFiltered()) { + entry0.filteredCount(filtered); - filtered = 0; + filtered = 0; - if (res == null) { - if (backup) - backupQ.add(evt0); - else - res = evt0; - } - else { - assert !backup; + if (res == null) { + if (backup) + backupQ.add(entry0); + else + res = entry0; + } + else { + assert !backup; - List<CacheContinuousQueryEntry> resList; + List<CacheContinuousQueryEntry> resList; - if (res instanceof CacheContinuousQueryEntry) { - resList = new ArrayList<>(); + if (res instanceof CacheContinuousQueryEntry) { + resList = new ArrayList<>(); - resList.add((CacheContinuousQueryEntry)res); - } - else { - assert res instanceof List : res; + resList.add((CacheContinuousQueryEntry)res); + } + else { + assert res instanceof List : res; - resList = (List<CacheContinuousQueryEntry>)res; - } + resList = (List<CacheContinuousQueryEntry>)res; + } - resList.add(evt0); + resList.add(entry0); - res = resList; - } + res = resList; } - else - filtered++; } + else + filtered++; pos = i; } @@ -312,10 +407,10 @@ public class CacheContinuousQueryEventBuffer { return res; } - if (pos == evts.length -1) { - Arrays.fill(evts, null); + if (pos == entries.length -1) { + Arrays.fill(entries, null); - Batch nextBatch = new Batch(this.startCntr + BUF_SIZE, filtered, evts); + Batch nextBatch = new Batch(this.startCntr + BUF_SIZE, filtered, entries, entry.topologyVersion()); curBatch.set(nextBatch); } http://git-wip-us.apache.org/repos/asf/ignite/blob/ff0a2dd8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java index 534ce9c..59252d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java @@ -38,7 +38,13 @@ import org.jetbrains.annotations.Nullable; */ class CacheContinuousQueryPartitionRecovery { /** Event which means hole in sequence. */ - private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry(); + private static final CacheContinuousQueryEntry HOLE; + + static { + HOLE = new CacheContinuousQueryEntry(); + + HOLE.markFiltered(); + } /** */ private final static int MAX_BUFF_SIZE = CacheContinuousQueryHandler.LSNR_MAX_BUF_SIZE; @@ -53,7 +59,7 @@ class CacheContinuousQueryPartitionRecovery { private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE; /** */ - private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>(); + private final TreeMap<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>(); /** * @param log Logger. @@ -212,6 +218,8 @@ class CacheContinuousQueryPartitionRecovery { } } else { + boolean skippedFiltered = false; + while (iter.hasNext()) { Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); @@ -232,9 +240,16 @@ class CacheContinuousQueryPartitionRecovery { iter.remove(); } - else - break; + else { + if (pending.isFiltered()) + skippedFiltered = true; + else + break; + } } + + if (skippedFiltered) + pendingEvts.headMap(lastFiredEvt).clear(); } }
