cc
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/78725456 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/78725456 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/78725456 Branch: refs/heads/ignite-5075-cc-debug Commit: 78725456c3f19fddab3a8cfce4c700c1f2a77b2f Parents: 7bf63c0 Author: sboikov <[email protected]> Authored: Thu May 25 11:47:09 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu May 25 11:49:33 2017 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryEventBuffer.java | 14 +++++++------- .../query/continuous/CacheContinuousQueryHandler.java | 6 ++++++ ...cheContinuousQueryAsyncFailoverAtomicSelfTest.java | 1 - .../IgniteCacheContinuousQueryBackupQueueTest.java | 13 +++++++++---- 4 files changed, 22 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/78725456/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index 949ea67..f0640b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -41,13 +41,6 @@ public class CacheContinuousQueryEventBuffer { /** */ protected final int part; - /** - * @param part Partition number. - */ - CacheContinuousQueryEventBuffer(int part) { - this.part = part; - } - /** */ private AtomicReference<Batch> curBatch = new AtomicReference<>(); @@ -58,6 +51,13 @@ public class CacheContinuousQueryEventBuffer { private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>(); /** + * @param part Partition number. + */ + CacheContinuousQueryEventBuffer(int part) { + this.part = part; + } + + /** * @param updateCntr Acknowledged counter. */ void cleanupBackupQueue(Long updateCntr) { http://git-wip-us.apache.org/repos/asf/ignite/blob/78725456/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 540f871..9866e7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -954,6 +954,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** {@inheritDoc} */ @Override public void onNodeLeft() { nodeLeft = true; + + for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE : entryBufs.entrySet()) { + CacheContinuousQueryEventBuffer buf = bufE.getValue(); + + buf.resetBackupQueue(); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/78725456/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java index 3cab9e0..d505d19 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicSelfTest.java @@ -25,7 +25,6 @@ import org.apache.ignite.cache.CacheMode; */ public class CacheContinuousQueryAsyncFailoverAtomicSelfTest extends CacheContinuousQueryFailoverAbstractSelfTest { - /** {@inheritDoc} */ @Override protected CacheMode cacheMode() { return CacheMode.PARTITIONED; http://git-wip-us.apache.org/repos/asf/ignite/blob/78725456/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java index 26c7d41..85d68d3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java @@ -262,11 +262,16 @@ public class IgniteCacheContinuousQueryBackupQueueTest extends GridCommonAbstrac GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd"); if (hnd.isQuery() && hnd.cacheName().equals(CACHE_NAME)) { - Collection<Object> q = GridTestUtils.getFieldValue(hnd, - CacheContinuousQueryHandler.class, "backupQueue"); + Map<Integer, CacheContinuousQueryEventBuffer> map = GridTestUtils.getFieldValue(hnd, + CacheContinuousQueryHandler.class, "entryBufs"); - if (q != null) - backupQueues.add(q); + for (CacheContinuousQueryEventBuffer buf : map.values()) { + Collection<Object> q = GridTestUtils.getFieldValue(buf, + CacheContinuousQueryEventBuffer.class, "backupQ"); + + if (q != null) + backupQueues.add(q); + } } }
