cc
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/31f32998 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/31f32998 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/31f32998 Branch: refs/heads/ignite-5075-cc-debug Commit: 31f32998c8cbcd51797d34069711e27db65e48ab Parents: 7bf63c0 Author: sboikov <[email protected]> Authored: Thu May 25 11:47:09 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu May 25 11:47:09 2017 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryEventBuffer.java | 14 +++++++------- .../query/continuous/CacheContinuousQueryHandler.java | 6 ++++++ .../IgniteCacheContinuousQueryBackupQueueTest.java | 13 +++++++++---- 3 files changed, 22 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/31f32998/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index 949ea67..f0640b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -41,13 +41,6 @@ public class CacheContinuousQueryEventBuffer { /** */ protected final int part; - /** - * @param part Partition number. - */ - CacheContinuousQueryEventBuffer(int part) { - this.part = part; - } - /** */ private AtomicReference<Batch> curBatch = new AtomicReference<>(); @@ -58,6 +51,13 @@ public class CacheContinuousQueryEventBuffer { private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>(); /** + * @param part Partition number. + */ + CacheContinuousQueryEventBuffer(int part) { + this.part = part; + } + + /** * @param updateCntr Acknowledged counter. */ void cleanupBackupQueue(Long updateCntr) { http://git-wip-us.apache.org/repos/asf/ignite/blob/31f32998/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 540f871..9866e7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -954,6 +954,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** {@inheritDoc} */ @Override public void onNodeLeft() { nodeLeft = true; + + for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE : entryBufs.entrySet()) { + CacheContinuousQueryEventBuffer buf = bufE.getValue(); + + buf.resetBackupQueue(); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/31f32998/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java index 26c7d41..85d68d3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java @@ -262,11 +262,16 @@ public class IgniteCacheContinuousQueryBackupQueueTest extends GridCommonAbstrac GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd"); if (hnd.isQuery() && hnd.cacheName().equals(CACHE_NAME)) { - Collection<Object> q = GridTestUtils.getFieldValue(hnd, - CacheContinuousQueryHandler.class, "backupQueue"); + Map<Integer, CacheContinuousQueryEventBuffer> map = GridTestUtils.getFieldValue(hnd, + CacheContinuousQueryHandler.class, "entryBufs"); - if (q != null) - backupQueues.add(q); + for (CacheContinuousQueryEventBuffer buf : map.values()) { + Collection<Object> q = GridTestUtils.getFieldValue(buf, + CacheContinuousQueryEventBuffer.class, "backupQ"); + + if (q != null) + backupQueues.add(q); + } } }
