cc
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d2942a58 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d2942a58 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d2942a58 Branch: refs/heads/ignite-5075-cc-debug Commit: d2942a58971a6d3797781151283c42549f0cc8f6 Parents: 8fafde2 Author: sboikov <[email protected]> Authored: Thu May 25 15:25:32 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu May 25 15:25:32 2017 +0300 ---------------------------------------------------------------------- .../CacheContinuousQueryEventBuffer.java | 220 ++++++++++++++----- .../CacheContinuousQueryPartitionRecovery.java | 19 +- 2 files changed, 184 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d2942a58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index 59b92eb..264a6f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.spi.communication.tcp.TestDebugLog; import org.jetbrains.annotations.Nullable; @@ -76,15 +77,39 @@ public class CacheContinuousQueryEventBuffer { * @return Backup entries. */ @Nullable Collection<CacheContinuousQueryEntry> resetBackupQueue() { + Collection<CacheContinuousQueryEntry> ret = null; + + List<CacheContinuousQueryEntry> entries = null; + + Batch batch = curBatch.get(); + + if (batch != null) + entries = batch.backupFlushEntries(); + if (!backupQ.isEmpty()) { - ConcurrentLinkedDeque<CacheContinuousQueryEntry> ret = this.backupQ; + ret = this.backupQ; + + if (entries != null) { + for (CacheContinuousQueryEntry e : entries) + ((ConcurrentLinkedDeque)ret).addFirst(e); + } backupQ = new ConcurrentLinkedDeque<>(); + } + else + ret = entries; - return ret; + if (ret != null) { + for (CacheContinuousQueryEntry e : ret) + TestDebugLog.addEntryMessage(part, + e.updateCounter(), + "filtered1 " + e.filteredCount() + + " reset backup"); } + else + TestDebugLog.addEntryMessage(part, part, "no backup"); - return null; + return entries; } /** @@ -123,7 +148,7 @@ public class CacheContinuousQueryEventBuffer { private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean backup) { assert cntr >= 0 : cntr; - Batch batch = initBatch(); + Batch batch = initBatch(entry.topologyVersion()); if (batch == null || cntr < batch.startCntr) { assert entry != null : cntr; @@ -173,7 +198,7 @@ public class CacheContinuousQueryEventBuffer { /** * @return Current batch. */ - @Nullable private Batch initBatch() { + @Nullable private Batch initBatch(AffinityTopologyVersion topVer) { Batch batch = curBatch.get(); if (batch != null) @@ -186,7 +211,7 @@ public class CacheContinuousQueryEventBuffer { TestDebugLog.addEntryMessage(part, curCntr, "created batch"); - batch = new Batch(curCntr + 1, 0L, new Object[BUF_SIZE]); + batch = new Batch(curCntr + 1, 0L, new CacheContinuousQueryEntry[BUF_SIZE], topVer); if (curBatch.compareAndSet(null, batch)) return batch; @@ -232,28 +257,108 @@ public class CacheContinuousQueryEventBuffer { private int lastProc = -1; /** */ - private final Object[] evts; + private final CacheContinuousQueryEntry[] entries; + + /** */ + private final AffinityTopologyVersion topVer; /** * @param filtered Number of filtered events before this batch. - * @param evts Events array. + * @param entries Entries array. * @param startCntr Start counter. */ - Batch(long startCntr, long filtered, Object[] evts) { + Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) { assert startCntr >= 0; assert filtered >= 0; this.startCntr = startCntr; this.filtered = filtered; - this.evts = evts; + this.entries = entries; + this.topVer = topVer; endCntr = startCntr + BUF_SIZE - 1; } + synchronized List<CacheContinuousQueryEntry> backupFlushEntries() { + List<CacheContinuousQueryEntry> res = null; + + long filtered = this.filtered; + long cntr = startCntr; + + for (int i = 0; i < entries.length; i++) { + CacheContinuousQueryEntry e = entries[i]; + + CacheContinuousQueryEntry flushEntry = null; + + if (e == null) { + if (filtered != 0) { + flushEntry = filteredEntry(cntr - 1, filtered - 1); + + filtered = 0; + } + } + else { + if (e.isFiltered()) + filtered++; + else { + flushEntry = new CacheContinuousQueryEntry(e.cacheId(), + e.eventType(), + e.key(), + e.value(), + e.oldValue(), + e.isKeepBinary(), + e.partition(), + e.updateCounter(), + e.topologyVersion()); + + flushEntry.filteredCount(filtered); + + filtered = 0; + } + } + + if (flushEntry != null) { + if (res == null) + res = new ArrayList<>(); + + res.add(flushEntry); + } + + cntr++; + } + + if (filtered != 0L) { + if (res == null) + res = new ArrayList<>(); + + res.add(filteredEntry(cntr - 1, filtered - 1)); + } + + return res; + } + + private CacheContinuousQueryEntry filteredEntry(long cntr, long filtered) { + CacheContinuousQueryEntry e = new CacheContinuousQueryEntry(0, + null, + null, + null, + null, + false, + part, + cntr, + topVer); + + e.markFiltered(); + + e.filteredCount(filtered); + + return e; + } + /** * @param res Current result. - * @param cntr Event counter. - * @param evt Event. + * @param cntr Entry counter. + * @param entry Entry. * @param backup Backup entry flag. * @return New result. */ @@ -261,7 +366,7 @@ public class CacheContinuousQueryEventBuffer { @Nullable private Object processEvent0( @Nullable Object res, long cntr, - CacheContinuousQueryEntry evt, + CacheContinuousQueryEntry entry, boolean backup) { int pos = (int)(cntr - startCntr); @@ -271,56 +376,67 @@ public class CacheContinuousQueryEventBuffer { "buffer process start=" + startCntr + ", lastProc=" + lastProc + " pos=" + pos + - " topVer=" + ((CacheContinuousQueryEntry)evt).topologyVersion()); + " topVer=" + entry.topologyVersion()); - evts[pos] = evt; + entries[pos] = entry; int next = lastProc + 1; if (next == pos) { - for (int i = next; i < evts.length; i++) { - Object e = evts[i]; - - if (e != null) { - if (e.getClass() == Long.class) - filtered++; - else { - CacheContinuousQueryEntry evt0 = (CacheContinuousQueryEntry)e; + for (int i = next; i < entries.length; i++) { + CacheContinuousQueryEntry entry0 = entries[i]; + + if (entry0 != null) { + if (!entry0.isFiltered()) { + TestDebugLog.addEntryMessage(part, + cntr, + "buffer process res start=" + startCntr + + ", lastProc=" + lastProc + + " pos=" + pos + + ", filtered=" + filtered + + " topVer=" + entry0.topologyVersion()); + + entry0.filteredCount(filtered); + + filtered = 0; + + if (res == null) { + if (backup) + backupQ.add(entry0); + else + res = entry0; + } + else { + assert !backup; - if (!evt0.isFiltered()) { - evt0.filteredCount(filtered); + List<CacheContinuousQueryEntry> resList; - filtered = 0; + if (res instanceof CacheContinuousQueryEntry) { + resList = new ArrayList<>(); - if (res == null) { - if (backup) - backupQ.add(evt0); - else - res = evt0; + resList.add((CacheContinuousQueryEntry)res); } else { - assert !backup; - - List<CacheContinuousQueryEntry> resList; - - if (res instanceof CacheContinuousQueryEntry) { - resList = new ArrayList<>(); - - resList.add((CacheContinuousQueryEntry)res); - } - else { - assert res instanceof List : res; + assert res instanceof List : res; - resList = (List<CacheContinuousQueryEntry>)res; - } + resList = (List<CacheContinuousQueryEntry>)res; + } - resList.add(evt0); + resList.add(entry0); - res = resList; - } + res = resList; } - else - filtered++; + } + else { + filtered++; + + TestDebugLog.addEntryMessage(part, + cntr, + "buffer process inc filtered start=" + startCntr + + ", lastProc=" + lastProc + + " pos=" + pos + + ", filtered=" + filtered + + " topVer=" + entry0.topologyVersion()); } pos = i; @@ -335,10 +451,10 @@ public class CacheContinuousQueryEventBuffer { return res; } - if (pos == evts.length -1) { - Arrays.fill(evts, null); + if (pos == entries.length -1) { + Arrays.fill(entries, null); - Batch nextBatch = new Batch(this.startCntr + BUF_SIZE, filtered, evts); + Batch nextBatch = new Batch(this.startCntr + BUF_SIZE, filtered, entries, entry.topologyVersion()); curBatch.set(nextBatch); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d2942a58/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java index 12eaa20..e031428 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java @@ -41,7 +41,13 @@ import static org.apache.ignite.internal.processors.cache.query.continuous.Cache */ class CacheContinuousQueryPartitionRecovery { /** Event which means hole in sequence. */ - private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry(); + private static final CacheContinuousQueryEntry HOLE; + + static { + HOLE = new CacheContinuousQueryEntry(); + + HOLE.markFiltered(); + } /** */ private final static int MAX_BUFF_SIZE = LSNR_MAX_BUF_SIZE; @@ -56,7 +62,7 @@ class CacheContinuousQueryPartitionRecovery { private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE; /** */ - private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>(); + private final TreeMap<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>(); /** * @param log Logger. @@ -240,6 +246,8 @@ class CacheContinuousQueryPartitionRecovery { } } else { + boolean skip = false; + while (iter.hasNext()) { Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next(); @@ -269,7 +277,9 @@ class CacheContinuousQueryPartitionRecovery { iter.remove(); } - else { + else if (!pending.isFiltered()) { + skip = true; + TestDebugLog.addEntryMessage(entry.partition(), entry.updateCounter(), "stop process last=" + lastFiredEvt + " cntr=" + e.getKey() + " topVer=" + e.getValue().topologyVersion() + " f=" + pending.filteredCount()); @@ -277,6 +287,9 @@ class CacheContinuousQueryPartitionRecovery { break; } } + + if (skip) + pendingEvts.headMap(lastFiredEvt).clear(); } }
