Merge branch 'ignite-5075-cc' into ignite-5075-cc-debug # Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8ee752d8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8ee752d8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8ee752d8 Branch: refs/heads/ignite-5075-cc-debug Commit: 8ee752d87daa3b9337f01dce3008f04c0bcc2238 Parents: 460ba11 2c68adb Author: sboikov <[email protected]> Authored: Thu May 25 13:21:02 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu May 25 13:21:02 2017 +0300 ---------------------------------------------------------------------- .../CacheContinuousQueryAcknowledgeBuffer.java | 120 ++++++ .../CacheContinuousQueryDeployableObject.java | 110 ++++++ .../CacheContinuousQueryEventBuffer.java | 93 ++++- .../continuous/CacheContinuousQueryHandler.java | 388 +++++-------------- .../CacheContinuousQueryHandlerV2.java | 6 +- .../CacheContinuousQueryPartitionRecovery.java | 252 ++++++++++++ .../continuous/GridContinuousProcessor.java | 7 +- ...tinuousQueryAsyncFailoverAtomicSelfTest.java | 1 - .../CacheContinuousQueryEventBufferTest.java | 65 ++-- ...ContinuousQueryFailoverAbstractSelfTest.java | 79 ++-- ...niteCacheContinuousQueryBackupQueueTest.java | 13 +- 11 files changed, 732 insertions(+), 402 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee752d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index b1bc7b0,f0640b1..6295b0b --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@@ -40,19 -45,47 +46,58 @@@ public class CacheContinuousQueryEventB private AtomicReference<Batch> curBatch = new AtomicReference<>(); /** */ - private ConcurrentSkipListMap<Long, Object> pending = new ConcurrentSkipListMap<>(); + private ConcurrentLinkedDeque<CacheContinuousQueryEntry> backupQ = new ConcurrentLinkedDeque<>(); + + /** */ + private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>(); + + /** + * @param part Partition number. + */ + CacheContinuousQueryEventBuffer(int part) { + this.part = part; + } + + /** + * @param updateCntr Acknowledged counter. + */ + void cleanupBackupQueue(Long updateCntr) { + Iterator<CacheContinuousQueryEntry> it = backupQ.iterator(); + + while (it.hasNext()) { + CacheContinuousQueryEntry backupEntry = it.next(); + + if (backupEntry.updateCounter() <= updateCntr) + it.remove(); + } + } + + /** + * @return Backup entries. + */ + @Nullable Collection<CacheContinuousQueryEntry> resetBackupQueue() { + if (!backupQ.isEmpty()) { + ConcurrentLinkedDeque<CacheContinuousQueryEntry> ret = this.backupQ; + + backupQ = new ConcurrentLinkedDeque<>(); + + return ret; + } + + return null; + } + /** */ + private final int part; + + public CacheContinuousQueryEventBuffer() { + part = 0; + } + + public CacheContinuousQueryEventBuffer(int part) { + this.part = part; + } + /** * @return Initial partition counter. */ @@@ -100,28 -127,18 +139,31 @@@ if (batch == null || cntr < batch.startCntr) { assert entry != null : cntr; + if (backup) + backupQ.add(entry); + + TestDebugLog.addEntryMessage(part, + cntr, + "buffer rcd small start=" + batch.startCntr + + " cntr=" + cntr + ++ ", backup=" + backup + + " topVer=" + ((CacheContinuousQueryEntry)entry).topologyVersion()); + return entry; } Object res = null; if (cntr <= batch.endCntr) - res = batch.processEvent0(null, cntr, entry); + res = batch.processEvent0(null, cntr, entry, backup); - else + else { + TestDebugLog.addEntryMessage(part, + cntr, + "buffer add pending start=" + batch.startCntr + + " cntr=" + cntr + + " topVer=" + ((CacheContinuousQueryEntry)entry).topologyVersion()); - pending.put(cntr, entry); + } Batch batch0 = curBatch.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee752d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee752d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ----------------------------------------------------------------------
