Repository: ignite
Updated Branches:
  refs/heads/ignite-2791 5692df6ad -> 4731a6bb4


ignite-2791 Review


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4731a6bb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4731a6bb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4731a6bb

Branch: refs/heads/ignite-2791
Commit: 4731a6bb4b7a55281b9cf5a58ade51a42ab5c692
Parents: 5692df6
Author: sboikov <[email protected]>
Authored: Mon Mar 21 17:20:04 2016 +0300
Committer: sboikov <[email protected]>
Committed: Mon Mar 21 17:20:04 2016 +0300

----------------------------------------------------------------------
 .../query/continuous/CacheContinuousQueryHandler.java   | 12 +++++++-----
 .../processors/continuous/GridContinuousProcessor.java  |  6 ++----
 .../continuous/StartRoutineDiscoveryMessage.java        |  4 +++-
 3 files changed, 12 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4731a6bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 23cd48c..e794f46 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -148,10 +148,10 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     private transient int cacheId;
 
     /** */
-    private transient Map<UUID, Map<Integer, Long>> initUpdCntrs;
+    private transient volatile Map<UUID, Map<Integer, Long>> initUpdCntrs;
 
     /** */
-    private transient AffinityTopologyVersion initTopVer;
+    private transient volatile AffinityTopologyVersion initTopVer;
 
     /** */
     private transient boolean ignoreClsNotFound;
@@ -267,8 +267,8 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
 
     /** {@inheritDoc} */
     @Override public void updateCounters(AffinityTopologyVersion topVer, 
Map<UUID, Map<Integer, Long>> cntrs) {
-        this.initTopVer = topVer;
         this.initUpdCntrs = cntrs;
+        this.initTopVer = topVer;
     }
 
     /** {@inheritDoc} */
@@ -682,7 +682,9 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         if (rec == null) {
             Long partCntr = null;
 
-            if (initTopVer != null && 
!initTopVer.equals(AffinityTopologyVersion.NONE)) {
+            AffinityTopologyVersion initTopVer0 = initTopVer;
+
+            if (initTopVer0 != null && 
!initTopVer0.equals(AffinityTopologyVersion.NONE)) {
                 GridCacheAffinityManager aff = cacheContext(ctx).affinity();
 
                 for (ClusterNode node : aff.nodes(partId, initTopVer)) {
@@ -696,7 +698,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
                 }
             }
 
-            rec = new PartitionRecovery(ctx.log(getClass()), initTopVer, 
partCntr);
+            rec = new PartitionRecovery(ctx.log(getClass()), initTopVer0, 
partCntr);
 
             PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4731a6bb/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index f29d413..80ab092 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -224,12 +224,10 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                                 GridCacheAdapter<Object, Object> interCache =
                                     
ctx.cache().internalCache(routine.handler().cacheName());
 
-                                if (interCache != null && cnrtsPerNode != null 
&& interCache.context() != null
-                                    && !interCache.isLocal() && 
!CU.clientNode(ctx.grid().localNode())) {
-                                    GridCacheContext<Object, Object> cctx = 
interCache.context();
+                                GridCacheContext cctx = interCache != null ? 
interCache.context() : null;
 
+                                if (cctx != null && cnrtsPerNode != null && 
!cctx.isLocal() && cctx.affinityNode())
                                     cnrtsPerNode.put(ctx.localNodeId(), 
cctx.topology().updateCounters());
-                                }
 
                                 routine.handler().updateCounters(topVer, 
cnrtsPerNode);
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4731a6bb/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index ea966e3..24eb050 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -98,7 +98,9 @@ public class StartRoutineDiscoveryMessage extends 
AbstractContinuousMessage {
         if (updateCntrsPerNode == null)
             updateCntrsPerNode = new HashMap<>();
 
-        assert updateCntrsPerNode.put(nodeId, cntrs) == null;
+        Map<Integer, Long> old = updateCntrsPerNode.put(nodeId, cntrs);
+
+        assert old == null : old;
     }
 
     /**

Reply via email to