cc

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ff0a2dd8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ff0a2dd8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ff0a2dd8

Branch: refs/heads/ignite-5075-cc-debug
Commit: ff0a2dd8aeaf621b79c504934f6830537323fcd1
Parents: e6ffd82
Author: sboikov <[email protected]>
Authored: Thu May 25 15:37:47 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu May 25 15:37:47 2017 +0300

----------------------------------------------------------------------
 .../CacheContinuousQueryEventBuffer.java        | 201 ++++++++++++++-----
 .../CacheContinuousQueryPartitionRecovery.java  |  23 ++-
 2 files changed, 167 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ff0a2dd8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index f0640b1..f496c8c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -75,15 +76,27 @@ public class CacheContinuousQueryEventBuffer {
      * @return Backup entries.
      */
     @Nullable Collection<CacheContinuousQueryEntry> resetBackupQueue() {
+        Collection<CacheContinuousQueryEntry> ret;
+
+        List<CacheContinuousQueryEntry> entries = null;
+
+        Batch batch = curBatch.get();
+
+        if (batch != null)
+            entries = batch.backupFlushEntries();
+
         if (!backupQ.isEmpty()) {
-            ConcurrentLinkedDeque<CacheContinuousQueryEntry> ret = 
this.backupQ;
+            if (entries != null)
+                backupQ.addAll(entries);
 
-            backupQ = new ConcurrentLinkedDeque<>();
+            ret = this.backupQ;
 
-            return ret;
+            backupQ = new ConcurrentLinkedDeque<>();
         }
+        else
+            ret = entries;
 
-        return null;
+        return ret;
     }
 
     /**
@@ -122,11 +135,9 @@ public class CacheContinuousQueryEventBuffer {
     private Object process0(long cntr, CacheContinuousQueryEntry entry, 
boolean backup) {
         assert cntr >= 0 : cntr;
 
-        Batch batch = initBatch();
+        Batch batch = initBatch(entry.topologyVersion());
 
         if (batch == null || cntr < batch.startCntr) {
-            assert entry != null : cntr;
-
             if (backup)
                 backupQ.add(entry);
 
@@ -157,9 +168,10 @@ public class CacheContinuousQueryEventBuffer {
     }
 
     /**
+     * @param topVer Current event topology version.
      * @return Current batch.
      */
-    @Nullable private Batch initBatch() {
+    @Nullable private Batch initBatch(AffinityTopologyVersion topVer) {
         Batch batch = curBatch.get();
 
         if (batch != null)
@@ -170,7 +182,7 @@ public class CacheContinuousQueryEventBuffer {
         if (curCntr == -1)
             return null;
 
-        batch = new Batch(curCntr + 1, 0L, new Object[BUF_SIZE]);
+        batch = new Batch(curCntr + 1, 0L, new 
CacheContinuousQueryEntry[BUF_SIZE], topVer);
 
         if (curBatch.compareAndSet(null, batch))
             return batch;
@@ -216,28 +228,117 @@ public class CacheContinuousQueryEventBuffer {
         private int lastProc = -1;
 
         /** */
-        private final Object[] evts;
+        private final CacheContinuousQueryEntry[] entries;
+
+        /** */
+        private final AffinityTopologyVersion topVer;
 
         /**
          * @param filtered Number of filtered events before this batch.
-         * @param evts Events array.
+         * @param entries Entries array.
+         * @param topVer Current event topology version.
          * @param startCntr Start counter.
          */
-        Batch(long startCntr, long filtered, Object[] evts) {
+        Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] 
entries, AffinityTopologyVersion topVer) {
             assert startCntr >= 0;
             assert filtered >= 0;
 
             this.startCntr = startCntr;
             this.filtered = filtered;
-            this.evts = evts;
+            this.entries = entries;
+            this.topVer = topVer;
 
             endCntr = startCntr + BUF_SIZE - 1;
         }
 
         /**
+         * @return Entries to send as part of backup queue.
+         */
+        @Nullable synchronized List<CacheContinuousQueryEntry> 
backupFlushEntries() {
+            List<CacheContinuousQueryEntry> res = null;
+
+            long filtered = this.filtered;
+            long cntr = startCntr;
+
+            for (int i = 0; i < entries.length; i++) {
+                CacheContinuousQueryEntry e = entries[i];
+
+                CacheContinuousQueryEntry flushEntry = null;
+
+                if (e == null) {
+                    if (filtered != 0) {
+                        flushEntry = filteredEntry(cntr - 1, filtered - 1);
+
+                        filtered = 0;
+                    }
+                }
+                else {
+                    if (e.isFiltered())
+                        filtered++;
+                    else {
+                        flushEntry = new CacheContinuousQueryEntry(e.cacheId(),
+                            e.eventType(),
+                            e.key(),
+                            e.value(),
+                            e.oldValue(),
+                            e.isKeepBinary(),
+                            e.partition(),
+                            e.updateCounter(),
+                            e.topologyVersion());
+
+                        flushEntry.filteredCount(filtered);
+
+                        filtered = 0;
+                    }
+                }
+
+                if (flushEntry != null) {
+                    if (res == null)
+                        res = new ArrayList<>();
+
+                    res.add(flushEntry);
+                }
+
+                cntr++;
+            }
+
+            if (filtered != 0L) {
+                if (res == null)
+                    res = new ArrayList<>();
+
+                res.add(filteredEntry(cntr - 1, filtered - 1));
+            }
+
+            return res;
+        }
+
+        /**
+         * @param cntr Entry counter.
+         * @param filtered Number of entries filtered before this entry.
+         * @return Entry.
+         */
+        private CacheContinuousQueryEntry filteredEntry(long cntr, long 
filtered) {
+            CacheContinuousQueryEntry e = new CacheContinuousQueryEntry(0,
+                null,
+                 null,
+                 null,
+                 null,
+                 false,
+                 part,
+                 cntr,
+                 topVer);
+
+            e.markFiltered();
+
+            e.filteredCount(filtered);
+
+            return e;
+        }
+
+        /**
          * @param res Current result.
-         * @param cntr Event counter.
-         * @param evt Event.
+         * @param cntr Entry counter.
+         * @param entry Entry.
          * @param backup Backup entry flag.
          * @return New result.
          */
@@ -245,60 +346,54 @@ public class CacheContinuousQueryEventBuffer {
         @Nullable private Object processEvent0(
             @Nullable Object res,
             long cntr,
-            CacheContinuousQueryEntry evt,
+            CacheContinuousQueryEntry entry,
             boolean backup) {
             int pos = (int)(cntr - startCntr);
 
             synchronized (this) {
-                evts[pos] = evt;
+                entries[pos] = entry;
 
                 int next = lastProc + 1;
 
                 if (next == pos) {
-                    for (int i = next; i < evts.length; i++) {
-                        Object e = evts[i];
-
-                        if (e != null) {
-                            if (e.getClass() == Long.class)
-                                filtered++;
-                            else {
-                                CacheContinuousQueryEntry evt0 = 
(CacheContinuousQueryEntry)e;
+                    for (int i = next; i < entries.length; i++) {
+                        CacheContinuousQueryEntry entry0 = entries[i];
 
-                                if (!evt0.isFiltered()) {
-                                    evt0.filteredCount(filtered);
+                        if (entry0 != null) {
+                            if (!entry0.isFiltered()) {
+                                entry0.filteredCount(filtered);
 
-                                    filtered = 0;
+                                filtered = 0;
 
-                                    if (res == null) {
-                                        if (backup)
-                                            backupQ.add(evt0);
-                                        else
-                                            res = evt0;
-                                    }
-                                    else {
-                                        assert !backup;
+                                if (res == null) {
+                                    if (backup)
+                                        backupQ.add(entry0);
+                                    else
+                                        res = entry0;
+                                }
+                                else {
+                                    assert !backup;
 
-                                        List<CacheContinuousQueryEntry> 
resList;
+                                    List<CacheContinuousQueryEntry> resList;
 
-                                        if (res instanceof 
CacheContinuousQueryEntry) {
-                                            resList = new ArrayList<>();
+                                    if (res instanceof 
CacheContinuousQueryEntry) {
+                                        resList = new ArrayList<>();
 
-                                            
resList.add((CacheContinuousQueryEntry)res);
-                                        }
-                                        else {
-                                            assert res instanceof List : res;
+                                        
resList.add((CacheContinuousQueryEntry)res);
+                                    }
+                                    else {
+                                        assert res instanceof List : res;
 
-                                            resList = 
(List<CacheContinuousQueryEntry>)res;
-                                        }
+                                        resList = 
(List<CacheContinuousQueryEntry>)res;
+                                    }
 
-                                        resList.add(evt0);
+                                    resList.add(entry0);
 
-                                        res = resList;
-                                    }
+                                    res = resList;
                                 }
-                                else
-                                    filtered++;
                             }
+                            else
+                                filtered++;
 
                             pos = i;
                         }
@@ -312,10 +407,10 @@ public class CacheContinuousQueryEventBuffer {
                     return res;
             }
 
-            if (pos == evts.length -1) {
-                Arrays.fill(evts, null);
+            if (pos == entries.length -1) {
+                Arrays.fill(entries, null);
 
-                Batch nextBatch = new Batch(this.startCntr + BUF_SIZE, 
filtered, evts);
+                Batch nextBatch = new Batch(this.startCntr + BUF_SIZE, 
filtered, entries, entry.topologyVersion());
 
                 curBatch.set(nextBatch);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ff0a2dd8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
index 534ce9c..59252d2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java
@@ -38,7 +38,13 @@ import org.jetbrains.annotations.Nullable;
  */
 class CacheContinuousQueryPartitionRecovery {
     /** Event which means hole in sequence. */
-    private static final CacheContinuousQueryEntry HOLE = new 
CacheContinuousQueryEntry();
+    private static final CacheContinuousQueryEntry HOLE;
+
+    static  {
+        HOLE = new CacheContinuousQueryEntry();
+
+        HOLE.markFiltered();
+    }
 
     /** */
     private final static int MAX_BUFF_SIZE = 
CacheContinuousQueryHandler.LSNR_MAX_BUF_SIZE;
@@ -53,7 +59,7 @@ class CacheContinuousQueryPartitionRecovery {
     private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE;
 
     /** */
-    private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new 
TreeMap<>();
+    private final TreeMap<Long, CacheContinuousQueryEntry> pendingEvts = new 
TreeMap<>();
 
     /**
      * @param log Logger.
@@ -212,6 +218,8 @@ class CacheContinuousQueryPartitionRecovery {
                 }
             }
             else {
+                boolean skippedFiltered = false;
+
                 while (iter.hasNext()) {
                     Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
 
@@ -232,9 +240,16 @@ class CacheContinuousQueryPartitionRecovery {
 
                         iter.remove();
                     }
-                    else
-                        break;
+                    else {
+                        if (pending.isFiltered())
+                            skippedFiltered = true;
+                        else
+                            break;
+                    }
                 }
+
+                if (skippedFiltered)
+                    pendingEvts.headMap(lastFiredEvt).clear();
             }
         }
 

Reply via email to