Repository: ignite
Updated Branches:
  refs/heads/ignite-2791 fa97a07c0 -> 716cce428


IGNITE-2791 Fixed review notes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/716cce42
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/716cce42
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/716cce42

Branch: refs/heads/ignite-2791
Commit: 716cce4289d066fad9a534e27942fb61167132da
Parents: fa97a07
Author: nikolay_tikhonov <[email protected]>
Authored: Mon Mar 21 15:50:10 2016 +0300
Committer: nikolay_tikhonov <[email protected]>
Committed: Mon Mar 21 15:50:10 2016 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |   2 +-
 .../internal/GridMessageListenHandler.java      |   2 +-
 .../continuous/CacheContinuousQueryHandler.java |  68 +++++---
 .../continuous/CacheContinuousQueryManager.java |  10 ++
 .../continuous/GridContinuousHandler.java       |   2 +-
 .../continuous/GridContinuousProcessor.java     |  39 +----
 .../StartRoutineAckDiscoveryMessage.java        |  22 ++-
 .../StartRoutineDiscoveryMessage.java           |  20 ++-
 .../GridCacheContinuousQueryConcurrentTest.java | 164 ++++++++++++++++++-
 9 files changed, 259 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index e2b1184..bc43195 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -136,7 +136,7 @@ class GridEventConsumeHandler implements 
GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public void updateCounters(AffinityTopologyVersion topVer, 
Map<Integer, Long> cntrs) {
+    @Override public void updateCounters(AffinityTopologyVersion topVer, 
Map<UUID, Map<Integer, Long>> cntrs) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 402365c..089091b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -125,7 +125,7 @@ public class GridMessageListenHandler implements 
GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public void updateCounters(AffinityTopologyVersion topVer, 
Map<Integer, Long> cntrs) {
+    @Override public void updateCounters(AffinityTopologyVersion topVer, 
Map<UUID, Map<Integer, Long>> cntrs) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 10fbd89..23cd48c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -55,6 +55,7 @@ import 
org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
@@ -72,6 +73,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedDeque8;
 
@@ -146,10 +148,10 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     private transient int cacheId;
 
     /** */
-    private Map<Integer, Long> initUpdCntrs;
+    private transient Map<UUID, Map<Integer, Long>> initUpdCntrs;
 
     /** */
-    private AffinityTopologyVersion initTopVer;
+    private transient AffinityTopologyVersion initTopVer;
 
     /** */
     private transient boolean ignoreClsNotFound;
@@ -264,7 +266,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     }
 
     /** {@inheritDoc} */
-    @Override public void updateCounters(AffinityTopologyVersion topVer, 
Map<Integer, Long> cntrs) {
+    @Override public void updateCounters(AffinityTopologyVersion topVer, 
Map<UUID, Map<Integer, Long>> cntrs) {
         this.initTopVer = topVer;
         this.initUpdCntrs = cntrs;
     }
@@ -296,20 +298,6 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
 
         assert !skipPrimaryCheck || loc;
 
-        final GridCacheContext<K, V> cctx = cacheContext(ctx);
-
-        if (!internal && cctx != null && initUpdCntrs != null) {
-            Map<Integer, Long> map = cctx.topology().updateCounters();
-
-            for (Map.Entry<Integer, Long> e : map.entrySet()) {
-                Long cntr0 = initUpdCntrs.get(e.getKey());
-                Long cntr1 = e.getValue();
-
-                if (cntr0 == null || cntr1 > cntr0)
-                    initUpdCntrs.put(e.getKey(), cntr1);
-            }
-        }
-
         CacheContinuousQueryListener<K, V> lsnr = new 
CacheContinuousQueryListener<K, V>() {
             @Override public void onExecution() {
                 if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -561,6 +549,16 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
             entry.prepareMarshal(cctx);
     }
 
+    /**
+     * Wait topology.
+     */
+    public void waitTopologyFuture(GridKernalContext ctx) throws 
IgniteCheckedException {
+        cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get();
+
+        for (int partId = 0; partId < 
cacheContext(ctx).affinity().partitions(); partId++)
+            getOrCreatePartitionRecovery(ctx, partId);
+    }
+
     /** {@inheritDoc} */
     @Override public void onListenerRegistered(UUID routineId, 
GridKernalContext ctx) {
         // No-op.
@@ -668,19 +666,45 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         if (e.updateCounter() == -1L)
             return F.asList(e);
 
-        PartitionRecovery rec = rcvs.get(e.partition());
+        PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, 
e.partition());
+
+        return rec.collectEntries(e);
+    }
+
+    /**
+     * @param ctx Context.
+     * @param partId Partition id.
+     * @return Partition recovery.
+     */
+    @NotNull private PartitionRecovery 
getOrCreatePartitionRecovery(GridKernalContext ctx, int partId) {
+        PartitionRecovery rec = rcvs.get(partId);
 
         if (rec == null) {
-            rec = new PartitionRecovery(ctx.log(getClass()), initTopVer,
-                initUpdCntrs == null ? null : initUpdCntrs.get(e.partition()));
+            Long partCntr = null;
 
-            PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec);
+            if (initTopVer != null && 
!initTopVer.equals(AffinityTopologyVersion.NONE)) {
+                GridCacheAffinityManager aff = cacheContext(ctx).affinity();
+
+                for (ClusterNode node : aff.nodes(partId, initTopVer)) {
+                    Map<Integer, Long> map = initUpdCntrs.get(node.id());
+
+                    if (map != null) {
+                        partCntr = map.get(partId);
+
+                        break;
+                    }
+                }
+            }
+
+            rec = new PartitionRecovery(ctx.log(getClass()), initTopVer, 
partCntr);
+
+            PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
 
             if (oldRec != null)
                 rec = oldRec;
         }
 
-        return rec.collectEntries(e);
+        return rec;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 353043f..2847063 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -649,6 +649,16 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
             autoUnsubscribe,
             pred).get();
 
+        try {
+            if (hnd.isQuery() && cctx.userCache())
+                hnd.waitTopologyFuture(cctx.kernalContext());
+        }
+        catch (IgniteCheckedException e) {
+            log.warning("Failed to start continuous query.", e);
+
+            cctx.kernalContext().continuous().stopRoutine(id);
+        }
+
         if (notifyExisting) {
             final Iterator<GridCacheEntryEx> it = 
cctx.cache().allEntries().iterator();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 8cd30a8..2ab75d5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -157,5 +157,5 @@ public interface GridContinuousHandler extends 
Externalizable, Cloneable {
      * @param cntrs Init state for partition counters.
      * @param topVer Topology version.
      */
-    public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, 
Long> cntrs);
+    public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, 
Map<Integer, Long>> cntrs);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index c1f4d22..f29d413 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -219,32 +219,19 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
                             // Update partition counters.
                             if (routine != null && 
routine.handler().isQuery()) {
-                                Map<Integer, Long> cntrs = 
msg.updateCounters();
+                                Map<UUID, Map<Integer, Long>> cnrtsPerNode = 
msg.updateCountersPerNode();
 
                                 GridCacheAdapter<Object, Object> interCache =
                                     
ctx.cache().internalCache(routine.handler().cacheName());
 
-                                if (interCache != null && cntrs != null && 
interCache.context() != null
+                                if (interCache != null && cnrtsPerNode != null 
&& interCache.context() != null
                                     && !interCache.isLocal() && 
!CU.clientNode(ctx.grid().localNode())) {
                                     GridCacheContext<Object, Object> cctx = 
interCache.context();
 
-                                    Map<Integer, Long> map = 
cctx.topology().updateCounters();
-
-                                    for (Map.Entry<Integer, Long> e : 
map.entrySet()) {
-                                        if 
(!ctx.cache().context().exchange().hasPendingExchange() &&
-                                            
!cctx.affinity().primary(cctx.localNode(), e.getKey(),
-                                                
cctx.affinity().affinityTopologyVersion()))
-                                            continue;
-
-                                        Long cntr0 = cntrs.get(e.getKey());
-                                        Long cntr1 = e.getValue();
-
-                                        if (cntr0 == null || cntr1 > cntr0)
-                                            cntrs.put(e.getKey(), cntr1);
-                                    }
+                                    cnrtsPerNode.put(ctx.localNodeId(), 
cctx.topology().updateCounters());
                                 }
 
-                                routine.handler().updateCounters(topVer, 
cntrs);
+                                routine.handler().updateCounters(topVer, 
cnrtsPerNode);
                             }
 
                             fut.onRemoteRegistered();
@@ -929,22 +916,8 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
             if (proc != null) {
                 GridCacheAdapter cache = 
ctx.cache().internalCache(hnd0.cacheName());
 
-                if (cache != null && !cache.isLocal()) {
-                    GridCacheContext cctx = cache.context();
-
-                    Map<Integer, Long> cntrs = 
cache.context().topology().updateCounters();
-
-                    AffinityTopologyVersion topVer = 
cctx.affinity().affinityTopologyVersion();
-
-                    if 
(!ctx.cache().context().exchange().hasPendingExchange()) {
-                        for (Integer partId : new ArrayList<>(cntrs.keySet())) 
{
-                            if (!cctx.affinity().primary(cctx.localNode(), 
partId, topVer))
-                                cntrs.remove(partId);
-                        }
-                    }
-
-                    req.addUpdateCounters(cntrs);
-                }
+                if (cache != null && !cache.isLocal())
+                    req.addUpdateCounters(ctx.localNodeId(), 
cache.context().topology().updateCounters());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
index 9644372..ca34b27 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
@@ -36,18 +37,28 @@ public class StartRoutineAckDiscoveryMessage extends 
AbstractContinuousMessage {
     private final Map<UUID, IgniteCheckedException> errs;
 
     /** */
+    @GridToStringExclude
     private final Map<Integer, Long> updateCntrs;
 
+    /** */
+    @GridToStringExclude
+    private final Map<UUID, Map<Integer, Long>> updateCntrsPerNode;
+
     /**
      * @param routineId Routine id.
      * @param errs Errs.
+     * @param cntrs Partition counters.
+     * @param cntrsPerNode Partition counters per node.
      */
-    public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, 
IgniteCheckedException> errs,
-        Map<Integer, Long> cntrs) {
+    public StartRoutineAckDiscoveryMessage(UUID routineId,
+        Map<UUID, IgniteCheckedException> errs,
+        Map<Integer, Long> cntrs,
+        Map<UUID, Map<Integer, Long>> cntrsPerNode) {
         super(routineId);
 
         this.errs = new HashMap<>(errs);
         this.updateCntrs = cntrs;
+        this.updateCntrsPerNode = cntrsPerNode;
     }
 
     /** {@inheritDoc} */
@@ -63,6 +74,13 @@ public class StartRoutineAckDiscoveryMessage extends 
AbstractContinuousMessage {
     }
 
     /**
+     * @return Update counters for partitions per each node.
+     */
+    public Map<UUID, Map<Integer, Long>> updateCountersPerNode() {
+        return updateCntrsPerNode;
+    }
+
+    /**
      * @return Errs.
      */
     public Map<UUID, IgniteCheckedException> errs() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index ff037d4..4df9599 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -40,6 +40,9 @@ public class StartRoutineDiscoveryMessage extends 
AbstractContinuousMessage {
     /** */
     private Map<Integer, Long> updateCntrs;
 
+    /** */
+    private Map<UUID, Map<Integer, Long>> updateCntrsPerNode;
+
     /** Keep binary flag. */
     private boolean keepBinary;
 
@@ -72,7 +75,7 @@ public class StartRoutineDiscoveryMessage extends 
AbstractContinuousMessage {
     /**
      * @param cntrs Update counters.
      */
-    public void addUpdateCounters(Map<Integer, Long> cntrs) {
+    private void addUpdateCounters(Map<Integer, Long> cntrs) {
         if (updateCntrs == null)
             updateCntrs = new HashMap<>();
 
@@ -86,6 +89,19 @@ public class StartRoutineDiscoveryMessage extends 
AbstractContinuousMessage {
     }
 
     /**
+     * @param nodeId Local node ID.
+     * @param cntrs Update counters.
+     */
+    public void addUpdateCounters(UUID nodeId, Map<Integer, Long> cntrs) {
+        //addUpdateCounters(cntrs);
+
+        if (updateCntrsPerNode == null)
+            updateCntrsPerNode = new HashMap<>();
+
+        assert updateCntrsPerNode.put(nodeId, cntrs) == null;
+    }
+
+    /**
      * @return Errs.
      */
     public Map<UUID, IgniteCheckedException> errs() {
@@ -106,7 +122,7 @@ public class StartRoutineDiscoveryMessage extends 
AbstractContinuousMessage {
 
     /** {@inheritDoc} */
     @Override public DiscoveryCustomMessage ackMessage() {
-        return new StartRoutineAckDiscoveryMessage(routineId, errs, 
updateCntrs);
+        return new StartRoutineAckDiscoveryMessage(routineId, errs, 
updateCntrs, updateCntrsPerNode);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/716cce42/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
index 8803e8e..4995c6f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.cache.configuration.CacheEntryListenerConfiguration;
@@ -38,17 +39,22 @@ import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static javax.cache.configuration.FactoryBuilder.factoryOf;
 
 /**
@@ -59,7 +65,7 @@ public class GridCacheContinuousQueryConcurrentTest extends 
GridCommonAbstractTe
     private static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
 
     /** */
-    private static final int NODES = 2;
+    private static final int NODES = 3;
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
@@ -81,6 +87,11 @@ public class GridCacheContinuousQueryConcurrentTest extends 
GridCommonAbstractTe
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
+        cfg.setPeerClassLoadingEnabled(false);
+
+        if (gridName.endsWith(String.valueOf(NODES)))
+            cfg.setClientMode(ThreadLocalRandom.current().nextBoolean());
+
         return cfg;
     }
 
@@ -94,15 +105,43 @@ public class GridCacheContinuousQueryConcurrentTest 
extends GridCommonAbstractTe
     /**
      * @throws Exception If failed.
      */
+    public void testRestartReplicatedTx() throws Exception {
+        testRestartRegistration(cacheConfiguration(CacheMode.REPLICATED, 
CacheAtomicityMode.TRANSACTIONAL, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartReplicated() throws Exception {
+        testRestartRegistration(cacheConfiguration(CacheMode.REPLICATED, 
CacheAtomicityMode.ATOMIC, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartPartition() throws Exception {
+        testRestartRegistration(cacheConfiguration(CacheMode.PARTITIONED, 
CacheAtomicityMode.ATOMIC, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartPartitionTx() throws Exception {
+        testRestartRegistration(cacheConfiguration(CacheMode.PARTITIONED, 
CacheAtomicityMode.TRANSACTIONAL, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testReplicatedAtomic() throws Exception {
-        testRegistration(cacheConfiguration(CacheMode.REPLICATED, 
CacheAtomicityMode.ATOMIC, 1));
+        testRegistration(cacheConfiguration(CacheMode.REPLICATED, 
CacheAtomicityMode.ATOMIC, 2));
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPartitionTx() throws Exception {
-        testRegistration(cacheConfiguration(CacheMode.PARTITIONED, 
CacheAtomicityMode.TRANSACTIONAL, 1));
+        testRegistration(cacheConfiguration(CacheMode.PARTITIONED, 
CacheAtomicityMode.TRANSACTIONAL, 2));
     }
 
     /**
@@ -135,7 +174,7 @@ public class GridCacheContinuousQueryConcurrentTest extends 
GridCommonAbstractTe
         try {
             final IgniteCache<Integer, String> cache = 
grid(0).getOrCreateCache(ccfg);
 
-            for (int i = 0; i < 100; i++) {
+            for (int i = 0; i < 30; i++) {
                 log.info("Start iteration: " + i);
 
                 final int i0 = i;
@@ -155,7 +194,97 @@ public class GridCacheContinuousQueryConcurrentTest 
extends GridCommonAbstractTe
                                 if (log.isDebugEnabled())
                                     log.debug("Started cont query count: " + 
count);
 
-                                if (++count == conQryCnt)
+                                if (++count >= conQryCnt)
+                                    latch.countDown();
+                            }
+
+                            return futures;
+                        }
+                    });
+
+                assert U.await(latch, 1, MINUTES);
+
+                cache.put(i, "v");
+
+                stop.set(true);
+
+                List<IgniteFuture<String>> contQries = fut.get();
+
+                for (IgniteFuture<String> contQry : contQries)
+                    contQry.get(2, TimeUnit.SECONDS);
+            }
+        }
+        finally {
+            execSrv.shutdownNow();
+
+            grid(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartRegistration(CacheConfiguration ccfg) throws 
Exception {
+        ExecutorService execSrv = newSingleThreadExecutor();
+
+        final AtomicBoolean stopRes = new AtomicBoolean(false);
+
+        IgniteInternalFuture<?> restartFut = null;
+
+        try {
+            final IgniteCache<Integer, String> cache = 
grid(0).getOrCreateCache(ccfg);
+
+            restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    while (!stopRes.get()) {
+                        startGrid(NODES);
+
+                        assert GridTestUtils.waitForCondition(new PA() {
+                            @Override public boolean apply() {
+                                return grid(0).cluster().nodes().size() == 
NODES + 1;
+                            }
+                        }, 5000L);
+
+                        Thread.sleep(300);
+
+                        stopGrid(NODES);
+
+                        assert GridTestUtils.waitForCondition(new PA() {
+                            @Override public boolean apply() {
+                                return grid(0).cluster().nodes().size() == 
NODES;
+                            }
+                        }, 5000L);
+
+                        Thread.sleep(300);
+                    }
+
+                    return null;
+                }
+            });
+
+            U.sleep(100);
+
+            for (int i = 0; i < 30; i++) {
+                log.info("Start iteration: " + i);
+
+                final int i0 = i;
+                final AtomicBoolean stop = new AtomicBoolean(false);
+                final CountDownLatch latch = new CountDownLatch(1);
+                final int conQryCnt = 50;
+
+                Future<List<IgniteFuture<String>>> fut = execSrv.submit(
+                    new Callable<List<IgniteFuture<String>>>() {
+                        @Override public List<IgniteFuture<String>> call() 
throws Exception {
+                            int count = 0;
+                            List<IgniteFuture<String>> futures = new 
ArrayList<>();
+
+                            while (!stop.get()) {
+                                futures.add(waitForKey(i0, cache, count));
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Started cont query count: " + 
count);
+
+                                if (++count >= conQryCnt)
                                     latch.countDown();
                             }
 
@@ -167,16 +296,28 @@ public class GridCacheContinuousQueryConcurrentTest 
extends GridCommonAbstractTe
 
                 cache.put(i, "v");
 
+                assertEquals("v", cache.get(i));
+
                 stop.set(true);
 
                 List<IgniteFuture<String>> contQries = fut.get();
 
-                for (int j = 0; j < contQries.size(); j++)
-                    contQries.get(j).get(2, TimeUnit.SECONDS);
+                for (IgniteFuture<String> contQry : contQries)
+                    contQry.get(5, TimeUnit.SECONDS);
             }
         }
         finally {
             execSrv.shutdownNow();
+
+            grid(0).destroyCache(ccfg.getName());
+
+            if (restartFut != null) {
+                stopRes.set(true);
+
+                restartFut.get();
+
+                stopGrid(NODES);
+            }
         }
     }
 
@@ -201,7 +342,13 @@ public class GridCacheContinuousQueryConcurrentTest 
extends GridCommonAbstractTe
 
         promise.listen(new IgniteInClosure<IgniteFuture<String>>() {
             @Override public void apply(IgniteFuture<String> future) {
-                cache.deregisterCacheEntryListener(cfg);
+                GridTestUtils.runAsync(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        cache.deregisterCacheEntryListener(cfg);
+
+                        return null;
+                    }
+                });
             }
         });
 
@@ -262,6 +409,7 @@ public class GridCacheContinuousQueryConcurrentTest extends 
GridCommonAbstractTe
         cfg.setAtomicityMode(atomicMode);
         
cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
         cfg.setBackups(backups);
+        cfg.setReadFromBackup(false);
 
         return cfg;
     }

Reply via email to