Repository: ignite
Updated Branches:
  refs/heads/ignite-5075-cc 52b7b5bad -> 3122352ca


cc


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3122352c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3122352c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3122352c

Branch: refs/heads/ignite-5075-cc
Commit: 3122352ca0ad78f07379455a529195fb925f15e7
Parents: 52b7b5b
Author: sboikov <[email protected]>
Authored: Wed May 24 15:02:36 2017 +0300
Committer: sboikov <[email protected]>
Committed: Wed May 24 15:02:36 2017 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryEntry.java   | 18 +++++++++++-
 .../CacheContinuousQueryEventBuffer.java        |  2 +-
 .../continuous/CacheContinuousQueryHandler.java | 29 +++++++++-----------
 .../CacheContinuousQueryEventBufferTest.java    |  4 +--
 4 files changed, 33 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3122352c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index e40f83e..9db92b2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -109,7 +109,7 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
     private boolean keepBinary;
 
     /** */
-    public long filteredCnt;
+    private long filteredCnt;
 
     /**
      * Required by {@link Message}.
@@ -207,6 +207,22 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
     }
 
     /**
+     * @param filteredCnt Number of entries filtered before this entry.
+     */
+    void filteredCount(long filteredCnt) {
+        assert filteredCnt >= 0 : filteredCnt;
+
+        this.filteredCnt = filteredCnt;
+    }
+
+    /**
+     * @return Number of entries filtered before this entry.
+     */
+    long filteredCount() {
+        return filteredCnt;
+    }
+
+    /**
      * @return If entry filtered then will return light-weight <i><b>new 
entry</b></i> without values and key
      * (avoid to huge memory consumption), otherwise {@code this}.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3122352c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index e3a8eda..7ddea92 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -217,7 +217,7 @@ public class CacheContinuousQueryEventBuffer {
                                 CacheContinuousQueryEntry evt0 = 
(CacheContinuousQueryEntry)e;
 
                                 if (!evt0.isFiltered()) {
-                                    evt0.filteredCnt = filtered;
+                                    evt0.filteredCount(filtered);
 
                                     filtered = 0;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3122352c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index ab70f81..18faff4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -1097,20 +1097,8 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
                 }
 
                 // Check duplicate.
-                if (entry.updateCounter() > lastFiredEvt) {
+                if (entry.updateCounter() > lastFiredEvt)
                     pendingEvts.put(entry.updateCounter(), entry);
-
-                    // TODO
-                    if (entry.filteredCnt > 0) {
-                        long filteredCntr = entry.updateCounter() - 
entry.filteredCnt;
-
-                        for (long i = 0; i < entry.filteredCnt; i++) {
-                            pendingEvts.put(filteredCntr, HOLE);
-
-                            filteredCntr++;
-                        }
-                    }
-                }
                 else {
                     if (log.isDebugEnabled())
                         log.debug("Skip duplicate continuous query message: " 
+ entry);
@@ -1143,6 +1131,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
                     }
 
                     LT.warn(log, "Pending events reached max of buffer size 
[cache=" + cctx.name() +
+                        ", bufSize=" + MAX_BUFF_SIZE +
                         ", partId=" + entry.partition() + ']');
 
                     for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); 
i++) {
@@ -1157,12 +1146,20 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
                     }
                 }
                 else {
-                    // Elements are consistently.
                     while (iter.hasNext()) {
                         Map.Entry<Long, CacheContinuousQueryEntry> e = 
iter.next();
 
-                        if (e.getKey() == lastFiredEvt + 1) {
-                            ++lastFiredEvt;
+                        CacheContinuousQueryEntry pending = e.getValue();
+
+                        long filtered = pending.filteredCount();
+
+                        boolean fire = entry.updateCounter() == lastFiredEvt + 
1;;
+
+                        if (!fire && filtered > 0)
+                            fire = entry.updateCounter() - filtered == 
lastFiredEvt + 1;
+
+                        if (fire) {
+                            lastFiredEvt = entry.updateCounter();
 
                             if (e.getValue() != HOLE && 
!e.getValue().isFiltered())
                                 entries.add(new CacheContinuousQueryEvent<K, 
V>(cache, cctx, e.getValue()));

http://git-wip-us.apache.org/repos/asf/ignite/blob/3122352c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
index 75a664c..bc32e00 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
@@ -143,7 +143,7 @@ public class CacheContinuousQueryEventBufferTest extends 
GridCommonAbstractTest
                     cntr,
                     null);
 
-                expEntry.filteredCnt = filtered;
+                expEntry.filteredCount(filtered);
 
                 cntr++;
 
@@ -221,7 +221,7 @@ public class CacheContinuousQueryEventBufferTest extends 
GridCommonAbstractTest
             CacheContinuousQueryEntry actualEvt = actualEntries.get(i);
 
             assertEquals(expEvt.updateCounter(), actualEvt.updateCounter());
-            assertEquals(expEvt.filteredCnt, actualEvt.filteredCnt);
+            assertEquals(expEvt.filteredCount(), actualEvt.filteredCount());
         }
     }
 }

Reply via email to