Repository: ignite
Updated Branches:
  refs/heads/ignite-426-2-reb 5e31596bb -> b133235de


IGNITE-426 WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b133235d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b133235d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b133235d

Branch: refs/heads/ignite-426-2-reb
Commit: b133235de92cd7bbb006f507c2763256e41bd96d
Parents: 5e31596
Author: nikolay_tikhonov <[email protected]>
Authored: Thu Oct 29 13:03:28 2015 +0300
Committer: nikolay_tikhonov <[email protected]>
Committed: Thu Oct 29 13:03:28 2015 +0300

----------------------------------------------------------------------
 .../query/continuous/CacheContinuousQueryHandler.java     | 10 +++-------
 .../processors/continuous/GridContinuousProcessor.java    |  3 ++-
 .../IgniteCacheContinuousQueryClientReconnectTest.java    |  2 +-
 3 files changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b133235d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index e40b2d7..1240ad1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -637,7 +637,7 @@ class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
         private long lastFiredEvt;
 
         /** */
-        private AffinityTopologyVersion curTop;
+        private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE;
 
         /** */
         private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new 
TreeMap<>();
@@ -669,7 +669,7 @@ class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
 
             synchronized (pendingEvts) {
                 // Received first event.
-                if (curTop == null) {
+                if (curTop == AffinityTopologyVersion.NONE) {
                     lastFiredEvt = entry.updateIndex();
 
                     curTop = entry.topologyVersion();
@@ -678,11 +678,7 @@ class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler {
                 }
 
                 if (curTop.compareTo(entry.topologyVersion()) < 0) {
-                    GridCacheAffinityManager aff = cctx.affinity();
-
-                    if (cctx.affinity().backups(entry.partition(), 
entry.topologyVersion()).isEmpty() &&
-                        !aff.primary(entry.partition(), 
curTop).id().equals(aff.primary(entry.partition(),
-                            entry.topologyVersion()).id())) {
+                    if (entry.updateIndex() == 1 && !entry.isBackup()) {
                         entries = new ArrayList<>(pendingEvts.size());
 
                         for (CacheContinuousQueryEntry evt : 
pendingEvts.values()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b133235d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 0804ffa..497c6e9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -61,6 +61,7 @@ import 
org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
@@ -216,7 +217,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                                     
ctx.cache().internalCache(routine.handler().cacheName());
 
                                 if (interCache != null && idxs != null && 
interCache.context() != null
-                                    && !interCache.isLocal()) {
+                                    && !interCache.isLocal() && 
!CU.clientNode(ctx.grid().localNode())) {
                                     Map<Integer, Long> map = 
interCache.context().topology().updateCounters();
 
                                     for (Map.Entry<Integer, Long> e : 
map.entrySet()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b133235d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java
index 560f2e0..2e1d78d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientReconnectTest.java
@@ -94,7 +94,7 @@ public class IgniteCacheContinuousQueryClientReconnectTest 
extends IgniteClientR
 
         int keyCnt = 100;
 
-        for (int i = 0; i < 30; i++) {
+        for (int i = 0; i < 10; i++) {
             lsnr.latch = new CountDownLatch(keyCnt);
 
             for (int key = 0; key < keyCnt; key++)

Reply via email to