Repository: ignite Updated Branches: refs/heads/ignite-2791 5692df6ad -> 4731a6bb4
ignite-2791 Review Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4731a6bb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4731a6bb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4731a6bb Branch: refs/heads/ignite-2791 Commit: 4731a6bb4b7a55281b9cf5a58ade51a42ab5c692 Parents: 5692df6 Author: sboikov <[email protected]> Authored: Mon Mar 21 17:20:04 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon Mar 21 17:20:04 2016 +0300 ---------------------------------------------------------------------- .../query/continuous/CacheContinuousQueryHandler.java | 12 +++++++----- .../processors/continuous/GridContinuousProcessor.java | 6 ++---- .../continuous/StartRoutineDiscoveryMessage.java | 4 +++- 3 files changed, 12 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4731a6bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 23cd48c..e794f46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -148,10 +148,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler private transient int cacheId; /** */ - private transient Map<UUID, Map<Integer, Long>> initUpdCntrs; + private transient volatile Map<UUID, Map<Integer, Long>> initUpdCntrs; /** */ - private transient AffinityTopologyVersion initTopVer; + private transient volatile AffinityTopologyVersion initTopVer; /** */ private transient boolean ignoreClsNotFound; @@ -267,8 +267,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** {@inheritDoc} */ @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer, Long>> cntrs) { - this.initTopVer = topVer; this.initUpdCntrs = cntrs; + this.initTopVer = topVer; } /** {@inheritDoc} */ @@ -682,7 +682,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (rec == null) { Long partCntr = null; - if (initTopVer != null && !initTopVer.equals(AffinityTopologyVersion.NONE)) { + AffinityTopologyVersion initTopVer0 = initTopVer; + + if (initTopVer0 != null && !initTopVer0.equals(AffinityTopologyVersion.NONE)) { GridCacheAffinityManager aff = cacheContext(ctx).affinity(); for (ClusterNode node : aff.nodes(partId, initTopVer)) { @@ -696,7 +698,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } } - rec = new PartitionRecovery(ctx.log(getClass()), initTopVer, partCntr); + rec = new PartitionRecovery(ctx.log(getClass()), initTopVer0, partCntr); PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec); http://git-wip-us.apache.org/repos/asf/ignite/blob/4731a6bb/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index f29d413..80ab092 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -224,12 +224,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { GridCacheAdapter<Object, Object> interCache = ctx.cache().internalCache(routine.handler().cacheName()); - if (interCache != null && cnrtsPerNode != null && interCache.context() != null - && !interCache.isLocal() && !CU.clientNode(ctx.grid().localNode())) { - GridCacheContext<Object, Object> cctx = interCache.context(); + GridCacheContext cctx = interCache != null ? interCache.context() : null; + if (cctx != null && cnrtsPerNode != null && !cctx.isLocal() && cctx.affinityNode()) cnrtsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters()); - } routine.handler().updateCounters(topVer, cnrtsPerNode); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4731a6bb/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java index ea966e3..24eb050 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java @@ -98,7 +98,9 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { if (updateCntrsPerNode == null) updateCntrsPerNode = new HashMap<>(); - assert updateCntrsPerNode.put(nodeId, cntrs) == null; + Map<Integer, Long> old = updateCntrsPerNode.put(nodeId, cntrs); + + assert old == null : old; } /**
