Repository: ignite Updated Branches: refs/heads/ignite-426-2-reb 5e31596bb -> b133235de
IGNITE-426 WIP. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b133235d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b133235d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b133235d Branch: refs/heads/ignite-426-2-reb Commit: b133235de92cd7bbb006f507c2763256e41bd96d Parents: 5e31596 Author: nikolay_tikhonov <[email protected]> Authored: Thu Oct 29 13:03:28 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Thu Oct 29 13:03:28 2015 +0300 ---------------------------------------------------------------------- .../query/continuous/CacheContinuousQueryHandler.java | 10 +++------- .../processors/continuous/GridContinuousProcessor.java | 3 ++- .../IgniteCacheContinuousQueryClientReconnectTest.java | 2 +- 3 files changed, 6 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b133235d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index e40b2d7..1240ad1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -637,7 +637,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { private long lastFiredEvt; /** */ - private AffinityTopologyVersion curTop; + private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE; /** */ private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>(); @@ -669,7 +669,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { synchronized (pendingEvts) { // Received first event. - if (curTop == null) { + if (curTop == AffinityTopologyVersion.NONE) { lastFiredEvt = entry.updateIndex(); curTop = entry.topologyVersion(); @@ -678,11 +678,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } if (curTop.compareTo(entry.topologyVersion()) < 0) { - GridCacheAffinityManager aff = cctx.affinity(); - - if (cctx.affinity().backups(entry.partition(), entry.topologyVersion()).isEmpty() && - !aff.primary(entry.partition(), curTop).id().equals(aff.primary(entry.partition(), - entry.topologyVersion()).id())) { + if (entry.updateIndex() == 1 && !entry.isBackup()) { entries = new ArrayList<>(pendingEvts.size()); for (CacheContinuousQueryEntry evt : pendingEvts.values()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b133235d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 0804ffa..497c6e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -61,6 +61,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; @@ -216,7 +217,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ctx.cache().internalCache(routine.handler().cacheName()); if (interCache != null && idxs != null && interCache.context() != null - && !interCache.isLocal()) { + && !interCache.isLocal() && !CU.clientNode(ctx.grid().localNode())) { Map<Integer, Long> map = interCache.context().topology().updateCounters(); for (Map.Entry<Integer, Long> e : map.entrySet()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b133235d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java index 560f2e0..2e1d78d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java @@ -94,7 +94,7 @@ public class IgniteCacheContinuousQueryClientReconnectTest extends IgniteClientR int keyCnt = 100; - for (int i = 0; i < 30; i++) { + for (int i = 0; i < 10; i++) { lsnr.latch = new CountDownLatch(keyCnt); for (int key = 0; key < keyCnt; key++)
