Merge branch 'ignite-5075-cc' into ignite-5075-cc-debug

# Conflicts:
#       
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
#       
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8ee752d8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8ee752d8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8ee752d8

Branch: refs/heads/ignite-5075-cc-debug
Commit: 8ee752d87daa3b9337f01dce3008f04c0bcc2238
Parents: 460ba11 2c68adb
Author: sboikov <[email protected]>
Authored: Thu May 25 13:21:02 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu May 25 13:21:02 2017 +0300

----------------------------------------------------------------------
 .../CacheContinuousQueryAcknowledgeBuffer.java  | 120 ++++++
 .../CacheContinuousQueryDeployableObject.java   | 110 ++++++
 .../CacheContinuousQueryEventBuffer.java        |  93 ++++-
 .../continuous/CacheContinuousQueryHandler.java | 388 +++++--------------
 .../CacheContinuousQueryHandlerV2.java          |   6 +-
 .../CacheContinuousQueryPartitionRecovery.java  | 252 ++++++++++++
 .../continuous/GridContinuousProcessor.java     |   7 +-
 ...tinuousQueryAsyncFailoverAtomicSelfTest.java |   1 -
 .../CacheContinuousQueryEventBufferTest.java    |  65 ++--
 ...ContinuousQueryFailoverAbstractSelfTest.java |  79 ++--
 ...niteCacheContinuousQueryBackupQueueTest.java |  13 +-
 11 files changed, 732 insertions(+), 402 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee752d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index b1bc7b0,f0640b1..6295b0b
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@@ -40,19 -45,47 +46,58 @@@ public class CacheContinuousQueryEventB
      private AtomicReference<Batch> curBatch = new AtomicReference<>();
  
      /** */
-     private ConcurrentSkipListMap<Long, Object> pending = new 
ConcurrentSkipListMap<>();
+     private ConcurrentLinkedDeque<CacheContinuousQueryEntry> backupQ = new 
ConcurrentLinkedDeque<>();
+ 
+     /** */
+     private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = 
new ConcurrentSkipListMap<>();
+ 
+     /**
+      * @param part Partition number.
+      */
+     CacheContinuousQueryEventBuffer(int part) {
+         this.part = part;
+     }
+ 
+     /**
+      * @param updateCntr Acknowledged counter.
+      */
+     void cleanupBackupQueue(Long updateCntr) {
+         Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
+ 
+         while (it.hasNext()) {
+             CacheContinuousQueryEntry backupEntry = it.next();
+ 
+             if (backupEntry.updateCounter() <= updateCntr)
+                 it.remove();
+         }
+     }
+ 
+     /**
+      * @return Backup entries.
+      */
+     @Nullable Collection<CacheContinuousQueryEntry> resetBackupQueue() {
+         if (!backupQ.isEmpty()) {
+             ConcurrentLinkedDeque<CacheContinuousQueryEntry> ret = 
this.backupQ;
+ 
+             backupQ = new ConcurrentLinkedDeque<>();
+ 
+             return ret;
+         }
+ 
+         return null;
+     }
  
 +    /** */
 +    private final int part;
 +
 +    public CacheContinuousQueryEventBuffer() {
 +        part = 0;
 +    }
 +
 +    public CacheContinuousQueryEventBuffer(int part) {
 +        this.part = part;
 +    }
 +
      /**
       * @return Initial partition counter.
       */
@@@ -100,28 -127,18 +139,31 @@@
          if (batch == null || cntr < batch.startCntr) {
              assert entry != null : cntr;
  
+             if (backup)
+                 backupQ.add(entry);
+ 
 +            TestDebugLog.addEntryMessage(part,
 +                cntr,
 +                "buffer rcd small start=" + batch.startCntr +
 +                    " cntr=" + cntr +
++                    ", backup=" + backup +
 +                    " topVer=" + 
((CacheContinuousQueryEntry)entry).topologyVersion());
 +
              return entry;
          }
  
          Object res = null;
  
          if (cntr <= batch.endCntr)
-             res = batch.processEvent0(null, cntr, entry);
+             res = batch.processEvent0(null, cntr, entry, backup);
 -        else
 +        else {
 +            TestDebugLog.addEntryMessage(part,
 +                cntr,
 +                "buffer add pending start=" + batch.startCntr +
 +                    " cntr=" + cntr +
 +                    " topVer=" + 
((CacheContinuousQueryEntry)entry).topologyVersion());
- 
              pending.put(cntr, entry);
 +        }
  
          Batch batch0 = curBatch.get();
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee752d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee752d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------

Reply via email to