cc

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/31f32998
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/31f32998
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/31f32998

Branch: refs/heads/ignite-5075-cc-debug
Commit: 31f32998c8cbcd51797d34069711e27db65e48ab
Parents: 7bf63c0
Author: sboikov <[email protected]>
Authored: Thu May 25 11:47:09 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu May 25 11:47:09 2017 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryEventBuffer.java   | 14 +++++++-------
 .../query/continuous/CacheContinuousQueryHandler.java |  6 ++++++
 .../IgniteCacheContinuousQueryBackupQueueTest.java    | 13 +++++++++----
 3 files changed, 22 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/31f32998/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index 949ea67..f0640b1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -41,13 +41,6 @@ public class CacheContinuousQueryEventBuffer {
     /** */
     protected final int part;
 
-    /**
-     * @param part Partition number.
-     */
-    CacheContinuousQueryEventBuffer(int part) {
-        this.part = part;
-    }
-
     /** */
     private AtomicReference<Batch> curBatch = new AtomicReference<>();
 
@@ -58,6 +51,13 @@ public class CacheContinuousQueryEventBuffer {
     private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = 
new ConcurrentSkipListMap<>();
 
     /**
+     * @param part Partition number.
+     */
+    CacheContinuousQueryEventBuffer(int part) {
+        this.part = part;
+    }
+
+    /**
      * @param updateCntr Acknowledged counter.
      */
     void cleanupBackupQueue(Long updateCntr) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/31f32998/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 540f871..9866e7b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -954,6 +954,12 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     /** {@inheritDoc} */
     @Override public void onNodeLeft() {
         nodeLeft = true;
+
+        for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE : 
entryBufs.entrySet()) {
+            CacheContinuousQueryEventBuffer buf = bufE.getValue();
+
+            buf.resetBackupQueue();
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/31f32998/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
index 26c7d41..85d68d3 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
@@ -262,11 +262,16 @@ public class IgniteCacheContinuousQueryBackupQueueTest 
extends GridCommonAbstrac
             GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, 
"hnd");
 
             if (hnd.isQuery() && hnd.cacheName().equals(CACHE_NAME)) {
-                Collection<Object> q = GridTestUtils.getFieldValue(hnd,
-                    CacheContinuousQueryHandler.class, "backupQueue");
+                Map<Integer, CacheContinuousQueryEventBuffer> map = 
GridTestUtils.getFieldValue(hnd,
+                    CacheContinuousQueryHandler.class, "entryBufs");
 
-                if (q != null)
-                    backupQueues.add(q);
+                for (CacheContinuousQueryEventBuffer buf : map.values()) {
+                    Collection<Object> q = GridTestUtils.getFieldValue(buf,
+                        CacheContinuousQueryEventBuffer.class, "backupQ");
+
+                    if (q != null)
+                        backupQueues.add(q);
+                }
             }
         }
 

Reply via email to