Repository: ignite
Updated Branches:
  refs/heads/ignite-2515 1bfd9e470 -> 0295a56e1


IGNITE-2515 Fixed review notes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0295a56e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0295a56e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0295a56e

Branch: refs/heads/ignite-2515
Commit: 0295a56e174713bdf19dc79dd27e37b16178a931
Parents: 1bfd9e4
Author: nikolay_tikhonov <[email protected]>
Authored: Mon Feb 15 21:05:51 2016 +0300
Committer: nikolay_tikhonov <[email protected]>
Committed: Mon Feb 15 21:05:51 2016 +0300

----------------------------------------------------------------------
 .../cache/query/CacheQueryEntryEvent.java       |  47 +++
 .../continuous/CacheContinuousQueryEvent.java   |  11 +-
 .../continuous/CacheContinuousQueryHandler.java |  67 ++--
 ...acheContinuousQueryRandomOperationsTest.java | 352 +++++++++++++------
 4 files changed, 353 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0295a56e/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java
new file mode 100644
index 0000000..6fda0cf
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.EventType;
+
+/**
+ * A Cache continuous query entry event.
+ *
+ * @param <K> the type of key
+ * @param <V> the type of value
+ */
+public abstract class CacheQueryEntryEvent<K, V> extends CacheEntryEvent<K, V> 
{
+    /**
+     * Constructs a cache entry event from a given cache as source.
+     *
+     * @param source the cache that originated the event
+     * @param eventType Event type.
+     */
+    public CacheQueryEntryEvent(Cache source, EventType eventType) {
+        super(source, eventType);
+    }
+
+    /**
+     * On each update occurred to increment counter. For
+     *
+     * @return Value of counter for this event.
+     */
+    public abstract long getPartitionUpdateCounter();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0295a56e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index d1c7c28..eab5dbd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import javax.cache.Cache;
-import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -26,7 +26,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 /**
  * Continuous query event.
  */
-class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
+class CacheContinuousQueryEvent<K, V> extends CacheQueryEntryEvent<K, V> {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -77,11 +77,14 @@ class CacheContinuousQueryEvent<K, V> extends 
CacheEntryEvent<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override public long getPartitionUpdateCounter() {
+        return e.updateCounter();
+    }
+
+    /** {@inheritDoc} */
     @Override public <T> T unwrap(Class<T> cls) {
         if (cls.isAssignableFrom(getClass()))
             return cls.cast(this);
-        else if (cls == Long.class)
-            return cls.cast(e.updateCounter());
 
         throw new IllegalArgumentException("Unwrapping to class is not 
supported: " + cls);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0295a56e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 267948d..cf622a0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -368,11 +368,55 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
 
                                     if (!internal && !skipPrimaryCheck)
                                         
sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
+
+                                    if (recordIgniteEvt) {
+                                        for (CacheEntryEvent<? extends K, ? 
extends V> e : evts) {
+                                            ctx.event().record(new 
CacheQueryReadEvent<>(
+                                                ctx.discovery().localNode(),
+                                                "Continuous query executed.",
+                                                EVT_CACHE_QUERY_OBJECT_READ,
+                                                
CacheQueryType.CONTINUOUS.name(),
+                                                cacheName,
+                                                null,
+                                                null,
+                                                null,
+                                                rmtFilter,
+                                                null,
+                                                nodeId,
+                                                taskName(),
+                                                e.getKey(),
+                                                e.getValue(),
+                                                e.getOldValue(),
+                                                null
+                                            ));
+                                        }
+                                    }
                                 }
                             }
                             else {
-                                if (!entry.isFiltered())
+                                if (!entry.isFiltered()) {
                                     locLsnr.onUpdated(F.<CacheEntryEvent<? 
extends K, ? extends V>>asList(evt));
+
+                                    if (recordIgniteEvt)
+                                        ctx.event().record(new 
CacheQueryReadEvent<>(
+                                            ctx.discovery().localNode(),
+                                            "Continuous query executed.",
+                                            EVT_CACHE_QUERY_OBJECT_READ,
+                                            CacheQueryType.CONTINUOUS.name(),
+                                            cacheName,
+                                            null,
+                                            null,
+                                            null,
+                                            rmtFilter,
+                                            null,
+                                            nodeId,
+                                            taskName(),
+                                            evt.getKey(),
+                                            evt.getValue(),
+                                            evt.getOldValue(),
+                                            null
+                                        ));
+                                }
                             }
                         }
                         else {
@@ -406,27 +450,6 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
                 catch (IgniteCheckedException ex) {
                     U.error(ctx.log(getClass()), "Failed to send event 
notification to node: " + nodeId, ex);
                 }
-
-                if (recordIgniteEvt && notify) {
-                    ctx.event().record(new CacheQueryReadEvent<>(
-                        ctx.discovery().localNode(),
-                        "Continuous query executed.",
-                        EVT_CACHE_QUERY_OBJECT_READ,
-                        CacheQueryType.CONTINUOUS.name(),
-                        cacheName,
-                        null,
-                        null,
-                        null,
-                        rmtFilter,
-                        null,
-                        nodeId,
-                        taskName(),
-                        evt.getKey(),
-                        evt.getValue(),
-                        evt.getOldValue(),
-                        null
-                    ));
-                }
             }
 
             @Override public void onUnregister() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0295a56e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index 3cd58d6..abbb807 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -20,13 +20,16 @@ package 
org.apache.ignite.internal.processors.cache.query.continuous;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ThreadLocalRandom;
 import javax.cache.Cache;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEvent;
@@ -41,6 +44,7 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.store.CacheStore;
@@ -53,6 +57,8 @@ import 
org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -65,6 +71,9 @@ import static 
org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.ALL;
+import static 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
+import static 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
 
 /**
  *
@@ -77,13 +86,13 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
     private static final int NODES = 5;
 
     /** */
-    private static final int KEYS = 10;
+    private static final int KEYS = 50;
 
     /** */
     private static final int VALS = 10;
 
     /** */
-    public static final int ITERATION_CNT = 1000;
+    public static final int ITERATION_CNT = 100;
 
     /** */
     private boolean client;
@@ -127,7 +136,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, true, false);
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -140,7 +149,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, false, false);
+        testContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -153,7 +162,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, null, false);
+        testContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -166,7 +175,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, false, false);
+        testContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -179,7 +188,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, false, false);
+        testContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -192,7 +201,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, true, false);
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -205,7 +214,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg, false, false);
+        testContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -218,7 +227,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg, null, false);
+        testContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -231,7 +240,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg, true, false);
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -244,7 +253,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, false, false);
+        testContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -257,7 +266,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, null, false);
+        testContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -270,7 +279,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, true, false);
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -283,7 +292,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, false, false);
+        testContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -296,7 +305,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, null, false);
+        testContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -309,7 +318,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, true, false);
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -322,7 +331,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, false, false);
+        testContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -335,7 +344,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, null, false);
+        testContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -348,7 +357,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, false, true);
+        testContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -361,7 +370,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, true, false);
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -374,7 +383,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, true, false);
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -387,7 +396,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, false, false);
+        testContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -400,7 +409,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, true, false);
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -413,7 +422,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg, false, false);
+        testContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -426,7 +435,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg, null, false);
+        testContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -439,7 +448,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg, false, true);
+        testContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -452,7 +461,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             OFFHEAP_VALUES,
             false);
 
-        testContinuousQuery(ccfg, true, false);
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -465,7 +474,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, false, false);
+        testContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -478,7 +487,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, null, false);
+        testContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -491,7 +500,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, true, false);
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -504,7 +513,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             OFFHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, true, true);
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
@@ -517,7 +526,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, false, false);
+        testContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -530,7 +539,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, null, false);
+        testContinuousQuery(ccfg, ALL);
     }
 
     /**
@@ -543,7 +552,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, false, false);
+        testContinuousQuery(ccfg, SERVER);
     }
 
     /**
@@ -556,43 +565,52 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             ONHEAP_TIERED,
             false);
 
-        testContinuousQuery(ccfg, true, false);
+        testContinuousQuery(ccfg, CLIENT);
     }
 
     /**
      * @param ccfg Cache configuration.
-     * @param client Client. If {@code null} then listener will be registered 
on all nodes.
-     * @param expTx Explicit tx.
+     * @param deploy The place where continuous query will be started.
      * @throws Exception If failed.
      */
-    private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, 
Boolean client, boolean expTx)
+    private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, 
ContinuousDeploy deploy)
         throws Exception {
         ignite(0).createCache(ccfg);
 
         try {
-            IgniteCache<Object, Object> cache = null;
-
-            if (client != null) {
-                if (client)
-                    cache = ignite(NODES - 1).cache(ccfg.getName());
-                else
-                    cache = ignite(0).cache(ccfg.getName());
-            }
-
             long seed = System.currentTimeMillis();
 
             Random rnd = new Random(seed);
 
             log.info("Random seed: " + seed);
 
-            final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue =
-                new ArrayBlockingQueue<>(50_000);
+            List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new 
ArrayList<>();
 
             Collection<QueryCursor<?>> curs = new ArrayList<>();
 
-            if (cache != null) {
+            if (deploy == CLIENT) {
+                ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+                final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new 
ArrayBlockingQueue<>(50_000);
+
+                qry.setLocalListener(new CacheEntryUpdatedListener<Object, 
Object>() {
+                    @Override public void 
onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                        for (CacheEntryEvent<?, ?> evt : evts)
+                            evtsQueue.add(evt);
+                    }
+                });
+
+                evtsQueues.add(evtsQueue);
+
+                QueryCursor<?> cur = grid(NODES - 
1).cache(ccfg.getName()).query(qry);
+
+                curs.add(cur);
+            }
+            else if (deploy == SERVER) {
                 ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
+                final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new 
ArrayBlockingQueue<>(50_000);
+
                 qry.setLocalListener(new CacheEntryUpdatedListener<Object, 
Object>() {
                     @Override public void 
onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
                         for (CacheEntryEvent<?, ?> evt : evts)
@@ -600,7 +618,9 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
                     }
                 });
 
-                QueryCursor<?> cur = cache.query(qry);
+                evtsQueues.add(evtsQueue);
+
+                QueryCursor<?> cur = grid(rnd.nextInt(NODES - 
1)).cache(ccfg.getName()).query(qry);
 
                 curs.add(cur);
             }
@@ -608,6 +628,8 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
                 for (int i = 0; i < NODES - 1; i++) {
                     ContinuousQuery<Object, Object> qry = new 
ContinuousQuery<>();
 
+                    final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new 
ArrayBlockingQueue<>(50_000);
+
                     qry.setLocalListener(new CacheEntryUpdatedListener<Object, 
Object>() {
                         @Override public void 
onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
                             for (CacheEntryEvent<?, ?> evt : evts)
@@ -615,12 +637,12 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
                         }
                     });
 
+                    evtsQueues.add(evtsQueue);
+
                     QueryCursor<?> cur = 
ignite(i).cache(ccfg.getName()).query(qry);
 
                     curs.add(cur);
                 }
-
-                cache = ignite(ThreadLocalRandom.current().nextInt(NODES - 
1)).cache(ccfg.getName());
             }
 
             ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
@@ -629,10 +651,11 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
 
             try {
                 for (int i = 0; i < ITERATION_CNT; i++) {
-                    if (i % 100 == 0)
+                    if (i % 20 == 0)
                         log.info("Iteration: " + i);
 
-                    randomUpdate(rnd, evtsQueue, expData, partCntr, cache, 
expTx, curs.size());
+                    for (int idx = 0; idx < NODES; idx++)
+                        randomUpdate(rnd, evtsQueues, expData, partCntr, 
grid(idx).cache(ccfg.getName()));
                 }
             }
             finally {
@@ -647,35 +670,31 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
 
     /**
      * @param rnd Random generator.
-     * @param evtsQueue Events queue.
+     * @param evtsQueues Events queue.
      * @param expData Expected cache data.
      * @param partCntr Partition counter.
      * @param cache Cache.
-     * @param expTx Explicit TX.
-     * @param qryCnt Query count.
      * @throws Exception If failed.
      */
     private void randomUpdate(
         Random rnd,
-        BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue,
+        List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
         ConcurrentMap<Object, Object> expData,
         Map<Integer, Long> partCntr,
-        IgniteCache<Object, Object> cache,
-        boolean expTx,
-        int qryCnt)
+        IgniteCache<Object, Object> cache)
         throws Exception {
         Object key = new QueryTestKey(rnd.nextInt(KEYS));
         Object newVal = value(rnd);
         Object oldVal = expData.get(key);
 
-        int op = rnd.nextInt(11);
+        int op = rnd.nextInt(13);
 
         Ignite ignite = cache.unwrap(Ignite.class);
 
         Transaction tx = null;
 
-        if (expTx && 
cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == 
TRANSACTIONAL)
-            tx = ignite.transactions().txStart();
+        if 
(cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == 
TRANSACTIONAL && rnd.nextBoolean())
+            tx = ignite.transactions().txStart(txRandomConcurrency(rnd), 
txRandomIsolation(rnd));
 
         try {
             // log.info("Random operation [key=" + key + ", op=" + op + ']');
@@ -689,7 +708,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
 
                     updatePartitionCounter(cache, key, partCntr);
 
-                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), 
key, newVal, oldVal, qryCnt);
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), 
key, newVal, oldVal);
 
                     expData.put(key, newVal);
 
@@ -704,7 +723,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
 
                     updatePartitionCounter(cache, key, partCntr);
 
-                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), 
key, newVal, oldVal, qryCnt);
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), 
key, newVal, oldVal);
 
                     expData.put(key, newVal);
 
@@ -719,7 +738,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
 
                     updatePartitionCounter(cache, key, partCntr);
 
-                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), 
key, null, oldVal, qryCnt);
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), 
key, null, oldVal);
 
                     expData.remove(key);
 
@@ -734,7 +753,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
 
                     updatePartitionCounter(cache, key, partCntr);
 
-                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), 
key, null, oldVal, qryCnt);
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), 
key, null, oldVal);
 
                     expData.remove(key);
 
@@ -749,7 +768,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
 
                     updatePartitionCounter(cache, key, partCntr);
 
-                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), 
key, newVal, oldVal, qryCnt);
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), 
key, newVal, oldVal);
 
                     expData.put(key, newVal);
 
@@ -764,7 +783,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
 
                     updatePartitionCounter(cache, key, partCntr);
 
-                    waitAndCheckEvent(evtsQueue, partCntr, affinity(cache), 
key, null, oldVal, qryCnt);
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), 
key, null, oldVal);
 
                     expData.remove(key);
 
@@ -780,12 +799,12 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
                     if (oldVal == null) {
                         updatePartitionCounter(cache, key, partCntr);
 
-                        waitAndCheckEvent(evtsQueue, partCntr, 
affinity(cache), key, newVal, null, qryCnt);
+                        waitAndCheckEvent(evtsQueues, partCntr, 
affinity(cache), key, newVal, null);
 
                         expData.put(key, newVal);
                     }
                     else
-                        checkNoEvent(evtsQueue);
+                        checkNoEvent(evtsQueues);
 
                     break;
                 }
@@ -799,12 +818,12 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
                     if (oldVal == null) {
                         updatePartitionCounter(cache, key, partCntr);
 
-                        waitAndCheckEvent(evtsQueue, partCntr, 
affinity(cache), key, newVal, null, qryCnt);
+                        waitAndCheckEvent(evtsQueues, partCntr, 
affinity(cache), key, newVal, null);
 
                         expData.put(key, newVal);
                     }
                     else
-                        checkNoEvent(evtsQueue);
+                        checkNoEvent(evtsQueues);
 
                     break;
                 }
@@ -818,12 +837,12 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
                     if (oldVal != null) {
                         updatePartitionCounter(cache, key, partCntr);
 
-                        waitAndCheckEvent(evtsQueue, partCntr, 
affinity(cache), key, newVal, oldVal, qryCnt);
+                        waitAndCheckEvent(evtsQueues, partCntr, 
affinity(cache), key, newVal, oldVal);
 
                         expData.put(key, newVal);
                     }
                     else
-                        checkNoEvent(evtsQueue);
+                        checkNoEvent(evtsQueues);
 
                     break;
                 }
@@ -837,12 +856,12 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
                     if (oldVal != null) {
                         updatePartitionCounter(cache, key, partCntr);
 
-                        waitAndCheckEvent(evtsQueue, partCntr, 
affinity(cache), key, newVal, oldVal, qryCnt);
+                        waitAndCheckEvent(evtsQueues, partCntr, 
affinity(cache), key, newVal, oldVal);
 
                         expData.put(key, newVal);
                     }
                     else
-                        checkNoEvent(evtsQueue);
+                        checkNoEvent(evtsQueues);
 
                     break;
                 }
@@ -861,7 +880,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
 
                             updatePartitionCounter(cache, key, partCntr);
 
-                            waitAndCheckEvent(evtsQueue, partCntr, 
affinity(cache), key, newVal, oldVal, qryCnt);
+                            waitAndCheckEvent(evtsQueues, partCntr, 
affinity(cache), key, newVal, oldVal);
 
                             expData.put(key, newVal);
                         }
@@ -871,7 +890,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
                             if (tx != null)
                                 tx.commit();
 
-                            checkNoEvent(evtsQueue);
+                            checkNoEvent(evtsQueues);
                         }
                     }
                     else {
@@ -880,14 +899,57 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
                         if (tx != null)
                             tx.commit();
 
-                        checkNoEvent(evtsQueue);
+                        checkNoEvent(evtsQueues);
                     }
 
                     break;
                 }
 
+                case 11: {
+                    SortedMap<Object, Object> vals = new TreeMap<>();
+
+                    while (vals.size() < KEYS / 5)
+                        vals.put(new QueryTestKey(rnd.nextInt(KEYS)), 
value(rnd));
+
+                    cache.putAll(vals);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    for (Map.Entry<Object, Object> e : vals.entrySet())
+                        updatePartitionCounter(cache, e.getKey(), partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), 
vals, expData);
+
+                    expData.putAll(vals);
+
+                    break;
+                }
+
+                case 12: {
+                    SortedMap<Object, Object> vals = new TreeMap<>();
+
+                    while (vals.size() < KEYS / 5)
+                        vals.put(new QueryTestKey(rnd.nextInt(KEYS)), newVal);
+
+                    cache.invokeAll(vals.keySet(), new 
EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    for (Map.Entry<Object, Object> e : vals.entrySet())
+                        updatePartitionCounter(cache, e.getKey(), partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), 
vals, expData);
+
+                    for (Object o : vals.keySet())
+                        expData.put(o, newVal);
+
+                    break;
+                }
+
                 default:
-                    fail();
+                    fail("Op:" + op);
             }
         } finally {
             if (tx != null)
@@ -896,6 +958,83 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
     }
 
     /**
+     *  @param evtsQueues Queue.
+     * @param partCntrs Counters.
+     * @param aff Affinity.
+     * @param vals Values.
+     * @param expData Expected data.
+     */
+    private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> 
evtsQueues,
+        Map<Integer, Long> partCntrs,
+        Affinity<Object> aff,
+        SortedMap<Object, Object> vals,
+        Map<Object, Object> expData)
+        throws Exception {
+        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+            Map<Object, CacheEntryEvent> rcvEvts = new HashMap<>();
+
+            for (int i = 0; i < vals.size(); i++) {
+                CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+                rcvEvts.put(evt.getKey(), evt);
+            }
+
+            assertEquals(vals.size(), rcvEvts.size());
+
+            for (Map.Entry<Object, Object> e : vals.entrySet()) {
+                Object key = e.getKey();
+                Object val = e.getValue();
+                Object oldVal = expData.get(key);
+
+                if (val == null && oldVal == null) {
+                    checkNoEvent(evtsQueues);
+
+                    return;
+                }
+
+                CacheEntryEvent evt = rcvEvts.get(key);
+
+                assertNotNull("Failed to wait for event [key=" + key + ", 
val=" + val + ", oldVal=" + oldVal + ']',
+                    evt);
+                assertEquals(key, evt.getKey());
+                assertEquals(val, evt.getValue());
+                assertEquals(oldVal, evt.getOldValue());
+
+                long cntr = partCntrs.get(aff.partition(key));
+                CacheQueryEntryEvent qryEntryEvt = 
(CacheQueryEntryEvent)evt.unwrap(CacheQueryEntryEvent.class);
+
+                assertNotNull(cntr);
+                assertNotNull(qryEntryEvt);
+
+                assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
+            }
+        }
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionIsolation}.
+     */
+    private TransactionIsolation txRandomIsolation(Random rnd) {
+        double val = rnd.nextDouble();
+
+        if (val < 1/3)
+            return TransactionIsolation.READ_COMMITTED;
+        else if (val < 2/3)
+            return TransactionIsolation.REPEATABLE_READ;
+        else
+            return TransactionIsolation.SERIALIZABLE;
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionConcurrency}.
+     */
+    private TransactionConcurrency txRandomConcurrency(Random rnd) {
+        return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : 
TransactionConcurrency.PESSIMISTIC;
+    }
+
+    /**
      * @param cache Cache.
      * @param key Key
      * @param cntrs Partition counters.
@@ -922,28 +1061,28 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
     }
 
     /**
-     * @param evtsQueue Event queue.
+     * @param evtsQueues Event queue.
      * @param partCntrs Partition counters.
      * @param aff Affinity function.
      * @param key Key.
      * @param val Value.
      * @param oldVal Old value.
-     * @param qryCnt Query count.
      * @throws Exception If failed.
      */
-    private void waitAndCheckEvent(BlockingQueue<CacheEntryEvent<?, ?>> 
evtsQueue,
+    private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> 
evtsQueues,
         Map<Integer, Long> partCntrs,
         Affinity<Object> aff,
         Object key,
         Object val,
-        Object oldVal, int qryCnt) throws Exception {
+        Object oldVal)
+        throws Exception {
         if (val == null && oldVal == null) {
-            checkNoEvent(evtsQueue);
+            checkNoEvent(evtsQueues);
 
             return;
         }
 
-        for (int i = 0; i < qryCnt; i++) {
+        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
             CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
 
             assertNotNull("Failed to wait for event [key=" + key + ", val=" + 
val + ", oldVal=" + oldVal + ']', evt);
@@ -951,21 +1090,26 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             assertEquals(val, evt.getValue());
             assertEquals(oldVal, evt.getOldValue());
 
-            Long cntr = partCntrs.get(aff.partition(key));
+            long cntr = partCntrs.get(aff.partition(key));
+            CacheQueryEntryEvent qryEntryEvt = 
evt.unwrap(CacheQueryEntryEvent.class);
 
             assertNotNull(cntr);
-            assertEquals(cntr, evt.unwrap(Long.class));
+            assertNotNull(qryEntryEvt);
+
+            assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
         }
     }
 
     /**
-     * @param evtsQueue Event queue.
+     * @param evtsQueues Event queue.
      * @throws Exception If failed.
      */
-    private void checkNoEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue) 
throws Exception {
-        CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
+    private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> 
evtsQueues) throws Exception {
+        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+            CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
 
-        assertNull(evt);
+            assertNull(evt);
+        }
     }
 
     /**
@@ -1029,7 +1173,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
     /**
      *
      */
-    static class QueryTestKey implements Serializable {
+    static class QueryTestKey implements Serializable, Comparable {
         /** */
         private final Integer key;
 
@@ -1062,6 +1206,11 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
         @Override public String toString() {
             return S.toString(QueryTestKey.class, this);
         }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(Object o) {
+            return key - ((QueryTestKey)o).key;
+        }
     }
 
     /**
@@ -1146,4 +1295,11 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             return S.toString(EntrySetValueProcessor.class, this);
         }
     }
+
+    /**
+     *
+     */
+    protected enum ContinuousDeploy {
+        CLIENT, SERVER, ALL
+    }
 }

Reply via email to