Repository: ignite
Updated Branches:
  refs/heads/ignite-5075-cc [created] 3cc4f9f51


cc


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3cc4f9f5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3cc4f9f5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3cc4f9f5

Branch: refs/heads/ignite-5075-cc
Commit: 3cc4f9f518b45823c91b266269df72c5486c7b89
Parents: c04b39a
Author: sboikov <[email protected]>
Authored: Wed May 24 12:10:43 2017 +0300
Committer: sboikov <[email protected]>
Committed: Wed May 24 14:03:11 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |   1 -
 .../continuous/CacheContinuousQueryEntry.java   |  31 +--
 .../CacheContinuousQueryEventBuffer.java        | 272 +++++++++++++++++++
 .../continuous/CacheContinuousQueryHandler.java | 196 ++++---------
 .../continuous/GridContinuousBatchAdapter.java  |   2 +-
 .../continuous/GridContinuousProcessor.java     |  12 +-
 .../continuous/GridContinuousQueryBatch.java    |  16 +-
 ...nuousQueryConcurrentPartitionUpdateTest.java | 177 ++++++++++++
 .../CacheContinuousQueryEventBufferTest.java    | 227 ++++++++++++++++
 ...eCacheContinuousQueryImmutableEntryTest.java |   2 -
 .../IgniteCacheQuerySelfTestSuite3.java         |   5 +
 11 files changed, 763 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index edf90d0..30c2a33 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index bf2a691..e40f83e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -105,12 +105,12 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
     @GridToStringInclude
     private AffinityTopologyVersion topVer;
 
-    /** Filtered events. */
-    private GridLongList filteredEvts;
-
     /** Keep binary. */
     private boolean keepBinary;
 
+    /** */
+    public long filteredCnt;
+
     /**
      * Required by {@link Message}.
      */
@@ -207,13 +207,6 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
     }
 
     /**
-     * @return Size include this event and filtered.
-     */
-    public int size() {
-        return filteredEvts != null ? filteredEvts.size() + 1 : 1;
-    }
-
-    /**
      * @return If entry filtered then will return light-weight <i><b>new 
entry</b></i> without values and key
      * (avoid to huge memory consumption), otherwise {@code this}.
      */
@@ -251,20 +244,6 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
     }
 
     /**
-     * @param cntrs Filtered events.
-     */
-    void filteredEvents(GridLongList cntrs) {
-        filteredEvts = cntrs;
-    }
-
-    /**
-     * @return previous filtered events.
-     */
-    long[] filteredEvents() {
-        return filteredEvts == null ? null : filteredEvts.array();
-    }
-
-    /**
      * @param cctx Cache context.
      * @throws IgniteCheckedException In case of error.
      */
@@ -363,7 +342,7 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
                 writer.incrementState();
 
             case 2:
-                if (!writer.writeMessage("filteredEvts", filteredEvts))
+                if (!writer.writeLong("filteredCnt", filteredCnt))
                     return false;
 
                 writer.incrementState();
@@ -446,7 +425,7 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
                 reader.incrementState();
 
             case 2:
-                filteredEvts = reader.readMessage("filteredEvts");
+                filteredCnt = reader.readLong("filteredCnt");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
new file mode 100644
index 0000000..b7b3267
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class CacheContinuousQueryEventBuffer {
+    /** */
+    private static final int BUF_SIZE = 5;
+
+    /** */
+    private AtomicReference<Batch> curBatch = new AtomicReference<>();
+
+    /** */
+    private ConcurrentSkipListMap<Long, Object> pending = new 
ConcurrentSkipListMap<>();
+
+    /**
+     * @return Initial partition counter.
+     */
+    protected long currentPartitionCounter() {
+        return 0;
+    }
+
+    /**
+     * For test purpose only.
+     *
+     * @return Current number of filtered events.
+     */
+    long currentFiltered() {
+        Batch batch = curBatch.get();
+
+        return batch != null ? batch.filtered : 0;
+    }
+
+    /**
+     * @param e Entry to process.
+     * @return Collected entries to pass to listener (single entry or entries 
list).
+     */
+    @Nullable Object processEntry(CacheContinuousQueryEntry e) {
+        return process0(e.updateCounter(), e);
+    }
+
+    /**
+     * @param cntr Filtered counter.
+     * @return Collected entries to pass to listener (single entry or entries 
list).
+     */
+    @Nullable Object processFiltered(long cntr) {
+        return process0(cntr, cntr);
+    }
+
+    /**
+     * @param cntr Entry counter.
+     * @param entry Entry.
+     * @return Collected entries.
+     */
+    private Object process0(long cntr, Object entry) {
+        assert cntr >= 0 : cntr;
+
+        Batch batch = initBatch();
+
+        if (batch == null || cntr < batch.startCntr) {
+            assert entry != null : cntr;
+
+            return entry;
+        }
+
+        Object res = null;
+
+        if (cntr <= batch.endCntr)
+            res = batch.processEvent0(null, cntr, entry);
+        else
+            pending.put(cntr, entry);
+
+        Batch batch0 = curBatch.get();
+
+        if (batch0 != batch) {
+            do {
+                batch = batch0;
+
+                res = processPending(res, batch);
+
+                batch0 = curBatch.get();
+            }
+            while (batch != batch0);
+        }
+
+        return res;
+    }
+
+    /**
+     * @return Current batch.
+     */
+    @Nullable private Batch initBatch() {
+        Batch batch = curBatch.get();
+
+        if (batch != null)
+            return batch;
+
+        long curCntr = currentPartitionCounter();
+
+        if (curCntr == -1)
+            return null;
+
+        batch = new Batch(curCntr + 1, 0L, new Object[BUF_SIZE]);
+
+        if (curBatch.compareAndSet(null, batch))
+            return batch;
+
+        return curBatch.get();
+    }
+
+    /**
+     * @param res Current result.
+     * @param batch Current batch.
+     * @return New result.
+     */
+    @Nullable private Object processPending(@Nullable Object res, Batch batch) 
{
+        if (pending.floorKey(batch.endCntr) != null) {
+            for (Map.Entry<Long, Object> p : pending.headMap(batch.endCntr, 
true).entrySet()) {
+                long cntr = p.getKey();
+
+                assert cntr >= batch.startCntr : cntr;
+
+                if (cntr <= batch.endCntr && pending.remove(p.getKey()) != 
null)
+                    res = batch.processEvent0(res, p.getKey(), p.getValue());
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     *
+     */
+    private class Batch {
+        /** */
+        private long filtered;
+
+        /** */
+        private final long startCntr;
+
+        /** */
+        private final long endCntr;
+
+        /** */
+        private int lastProc = -1;
+
+        /** */
+        private final Object[] evts;
+
+        /**
+         * @param filtered Number of filtered events before this batch.
+         * @param evts Events array.
+         * @param startCntr Start counter.
+         */
+        Batch(long startCntr, long filtered, Object[] evts) {
+            assert startCntr >= 0;
+            assert filtered >= 0;
+
+            this.startCntr = startCntr;
+            this.filtered = filtered;
+            this.evts = evts;
+
+            endCntr = startCntr + BUF_SIZE - 1;
+        }
+
+        /**
+         * @param res Current result.
+         * @param cntr Event counter.
+         * @param evt Event.
+         * @return New result.
+         */
+        @SuppressWarnings("unchecked")
+        @Nullable private Object processEvent0(
+            @Nullable Object res,
+            long cntr,
+            Object evt) {
+            int pos = (int)(cntr - startCntr);
+
+            synchronized (this) {
+                evts[pos] = evt;
+
+                int next = lastProc + 1;
+
+                if (next == pos) {
+                    for (int i = next; i < evts.length; i++) {
+                        Object e = evts[i];
+
+                        if (e != null) {
+                            if (e.getClass() == Long.class)
+                                filtered++;
+                            else {
+                                CacheContinuousQueryEntry evt0 = 
(CacheContinuousQueryEntry)e;
+
+                                if (!evt0.isFiltered()) {
+                                    evt0.filteredCnt = filtered;
+
+                                    filtered = 0;
+
+                                    if (res == null)
+                                        res = evt0;
+                                    else {
+                                        List<CacheContinuousQueryEntry> 
resList;
+
+                                        if (res instanceof 
CacheContinuousQueryEntry) {
+                                            resList = new ArrayList<>();
+
+                                            
resList.add((CacheContinuousQueryEntry)res);
+                                        }
+                                        else {
+                                            assert res instanceof List : res;
+
+                                            resList = 
(List<CacheContinuousQueryEntry>)res;
+                                        }
+
+                                        resList.add(evt0);
+
+                                        res = resList;
+                                    }
+                                }
+                                else
+                                    filtered++;
+                            }
+
+                            pos = i;
+                        }
+                        else
+                            break;
+                    }
+
+                    lastProc = pos;
+                }
+                else
+                    return res;
+            }
+
+            if (pos == evts.length -1) {
+                Arrays.fill(evts, null);
+
+                Batch nextBatch = new Batch(this.startCntr + BUF_SIZE, 
filtered, evts);
+
+                curBatch.set(nextBatch);
+            }
+
+            return res;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index e7706dd..ab70f81 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -28,13 +28,11 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
@@ -42,6 +40,7 @@ import javax.cache.event.EventType;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
@@ -59,6 +58,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
@@ -67,13 +67,12 @@ import 
org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import 
org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch;
 import 
org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
-import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
-import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteAsyncCallback;
@@ -94,7 +93,12 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     private static final long serialVersionUID = 0L;
 
     /** */
-    private static final int BACKUP_ACK_THRESHOLD = 100;
+    private static final int BACKUP_ACK_THRESHOLD =
+        
IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD",
 100);
+
+    /** */
+    private static final int LSNR_MAX_BUF_SIZE =
+        
IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE",
 10_000);
 
     /** Cache name. */
     private String cacheName;
@@ -145,7 +149,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     private transient ConcurrentMap<Integer, PartitionRecovery> rcvs;
 
     /** */
-    private transient ConcurrentMap<Integer, EntryBuffer> entryBufs;
+    private transient ConcurrentMap<Integer, CacheContinuousQueryEventBuffer> 
entryBufs;
 
     /** */
     private transient AcknowledgeBuffer ackBuf;
@@ -811,13 +815,13 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
                 if (!entry.isFiltered())
                     prepareEntry(cctx, nodeId, entry);
 
-                CacheContinuousQueryEntry e = handleEntry(entry);
+                Object entryOrList = handleEntry(cctx, entry);
 
-                if (e != null) {
+                if (entryOrList != null) {
                     if (log.isDebugEnabled())
-                        log.debug("Send the following event to listener: " + 
e);
+                        log.debug("Send the following event to listener: " + 
entryOrList);
 
-                    ctx.continuous().addNotification(nodeId, routineId, entry, 
topic, sync, true);
+                    ctx.continuous().addNotification(nodeId, routineId, 
entryOrList, topic, sync, true);
                 }
             }
         }
@@ -918,10 +922,11 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     }
 
     /**
+     * @param cctx Cache context.
      * @param e Entry.
      * @return Entry.
      */
-    private CacheContinuousQueryEntry handleEntry(CacheContinuousQueryEntry e) 
{
+    private Object handleEntry(final GridCacheContext cctx, 
CacheContinuousQueryEntry e) {
         assert e != null;
         assert entryBufs != null;
 
@@ -934,21 +939,32 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
 
         // Initial query entry.
         // This events should be fired immediately.
-        if (e.updateCounter() == -1)
+        if (e.updateCounter() == -1L)
             return e;
 
-        EntryBuffer buf = entryBufs.get(e.partition());
+        CacheContinuousQueryEventBuffer buf = entryBufs.get(e.partition());
 
         if (buf == null) {
-            buf = new EntryBuffer();
+            final int part = e.partition();
 
-            EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf);
+            buf = new CacheContinuousQueryEventBuffer() {
+                @Override protected long currentPartitionCounter() {
+                    GridDhtLocalPartition locPart = 
cctx.topology().localPartition(part, null, false);
 
-            if (oldRec != null)
-                buf = oldRec;
+                    if (locPart == null)
+                        return -1L;
+
+                    return locPart.updateCounter();
+                }
+            };
+
+            CacheContinuousQueryEventBuffer oldBuf = 
entryBufs.putIfAbsent(e.partition(), buf);
+
+            if (oldBuf != null)
+                buf = oldBuf;
         }
 
-        return buf.handle(e);
+        return buf.processEntry(e);
     }
 
     /**
@@ -959,7 +975,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         private static final CacheContinuousQueryEntry HOLE = new 
CacheContinuousQueryEntry();
 
         /** */
-        private final static int MAX_BUFF_SIZE = 100;
+        private final static int MAX_BUFF_SIZE = LSNR_MAX_BUF_SIZE;
 
         /** */
         private IgniteLogger log;
@@ -1084,11 +1100,14 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
                 if (entry.updateCounter() > lastFiredEvt) {
                     pendingEvts.put(entry.updateCounter(), entry);
 
-                    // Put filtered events.
-                    if (entry.filteredEvents() != null) {
-                        for (long cnrt : entry.filteredEvents()) {
-                            if (cnrt > lastFiredEvt)
-                                pendingEvts.put(cnrt, HOLE);
+                    // TODO
+                    if (entry.filteredCnt > 0) {
+                        long filteredCntr = entry.updateCounter() - 
entry.filteredCnt;
+
+                        for (long i = 0; i < entry.filteredCnt; i++) {
+                            pendingEvts.put(filteredCntr, HOLE);
+
+                            filteredCntr++;
                         }
                     }
                 }
@@ -1115,6 +1134,17 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
                 entries = new ArrayList<>();
 
                 if (pendingEvts.size() >= MAX_BUFF_SIZE) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Pending events reached max of buffer size 
[lastFiredEvt=" + lastFiredEvt +
+                            ", curTop=" + curTop +
+                            ", entUpdCnt=" + entry.updateCounter() +
+                            ", partId=" + entry.partition() +
+                            ", pendingEvts=" + pendingEvts + ']');
+                    }
+
+                    LT.warn(log, "Pending events reached max of buffer size 
[cache=" + cctx.name() +
+                        ", partId=" + entry.partition() + ']');
+
                     for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); 
i++) {
                         Map.Entry<Long, CacheContinuousQueryEntry> e = 
iter.next();
 
@@ -1125,14 +1155,6 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
 
                         iter.remove();
                     }
-
-                    if (log.isDebugEnabled()) {
-                        log.debug("Pending events reached max of buffer size 
[lastFiredEvt=" + lastFiredEvt +
-                            ", curTop=" + curTop +
-                            ", entUpdCnt=" + entry.updateCounter() +
-                            ", partId=" + entry.partition() +
-                            ", pendingEvts=" + pendingEvts + ']');
-                    }
                 }
                 else {
                     // Elements are consistently.
@@ -1166,116 +1188,6 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
         }
     }
 
-    /**
-     *
-     */
-    private static class EntryBuffer {
-        /** */
-        private final static int MAX_BUFF_SIZE = 100;
-
-        /** */
-        private final GridConcurrentSkipListSet<Long> buf = new 
GridConcurrentSkipListSet<>();
-
-        /** */
-        private AtomicLong lastFiredCntr = new AtomicLong();
-
-        /**
-         * @param newVal New value.
-         * @return Old value if previous value less than new value otherwise 
{@code -1}.
-         */
-        private long updateFiredCounter(long newVal) {
-            long prevVal = lastFiredCntr.get();
-
-            while (prevVal < newVal) {
-                if (lastFiredCntr.compareAndSet(prevVal, newVal))
-                    return prevVal;
-                else
-                    prevVal = lastFiredCntr.get();
-            }
-
-            return prevVal >= newVal ? -1 : prevVal;
-        }
-
-        /**
-         * Add continuous entry.
-         *
-         * @param e Cache continuous query entry.
-         * @return Collection entries which will be fired.
-         */
-        public CacheContinuousQueryEntry handle(CacheContinuousQueryEntry e) {
-            assert e != null;
-
-            if (e.isFiltered()) {
-                Long last = buf.lastx();
-                Long first = buf.firstx();
-
-                if (last != null && first != null && last - first >= 
MAX_BUFF_SIZE) {
-                    NavigableSet<Long> prevHoles = buf.subSet(first, true, 
last, true);
-
-                    GridLongList filteredEvts = new GridLongList((int)(last - 
first));
-
-                    int size = 0;
-
-                    Long cntr;
-
-                    while ((cntr = prevHoles.pollFirst()) != null) {
-                        filteredEvts.add(cntr);
-
-                        ++size;
-                    }
-
-                    filteredEvts.truncate(size, true);
-
-                    e.filteredEvents(filteredEvts);
-
-                    return e;
-                }
-
-                if (lastFiredCntr.get() > e.updateCounter() || 
e.updateCounter() == 1)
-                    return e;
-                else {
-                    buf.add(e.updateCounter());
-
-                    // Double check. If another thread sent a event with 
counter higher than this event.
-                    if (lastFiredCntr.get() > e.updateCounter() && 
buf.contains(e.updateCounter())) {
-                        buf.remove(e.updateCounter());
-
-                        return e;
-                    }
-                    else
-                        return null;
-                }
-            }
-            else {
-                long prevVal = updateFiredCounter(e.updateCounter());
-
-                if (prevVal == -1)
-                    return e;
-                else {
-                    NavigableSet<Long> prevHoles = buf.subSet(prevVal, true, 
e.updateCounter(), true);
-
-                    GridLongList filteredEvts = new 
GridLongList((int)(e.updateCounter() - prevVal));
-
-                    int size = 0;
-
-                    Long cntr;
-
-                    while ((cntr = prevHoles.pollFirst()) != null) {
-                        filteredEvts.add(cntr);
-
-                        ++size;
-                    }
-
-                    filteredEvts.truncate(size, true);
-
-                    e.filteredEvents(filteredEvts);
-
-                    return e;
-                }
-            }
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public void onNodeLeft() {
         Collection<CacheContinuousQueryEntry> backupQueue0 = backupQueue;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
index 4540de1..597eae8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
@@ -25,7 +25,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
  */
 public class GridContinuousBatchAdapter implements GridContinuousBatch {
     /** Buffer. */
-    private final ConcurrentLinkedDeque8<Object> buf = new 
ConcurrentLinkedDeque8<>();
+    protected final ConcurrentLinkedDeque8<Object> buf = new 
ConcurrentLinkedDeque8<>();
 
     /** {@inheritDoc} */
     @Override public void add(Object obj) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index abcd8ea..da951f2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -897,7 +897,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         throws IgniteCheckedException {
         assert nodeId != null;
         assert routineId != null;
-        assert !msg || obj instanceof Message : obj;
+        assert !msg || (obj instanceof Message || obj instanceof Collection) : 
obj;
 
         assert !nodeId.equals(ctx.localNodeId());
 
@@ -917,7 +917,13 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 syncMsgFuts.put(futId, fut);
 
                 try {
-                    sendNotification(nodeId, routineId, futId, F.asList(obj), 
null, msg, null);
+                    sendNotification(nodeId,
+                        routineId,
+                        futId,
+                        obj instanceof Collection ? (Collection)obj : 
F.asList(obj),
+                        null,
+                        msg,
+                        null);
 
                     info.hnd.onBatchAcknowledged(routineId, info.add(obj), 
ctx);
                 }
@@ -1563,7 +1569,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         GridContinuousBatch addAll(Collection<?> objs) {
             assert objs != null;
 
-            GridContinuousBatch toSnd = null;
+            GridContinuousBatch toSnd;
 
             lock.writeLock().lock();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
index c5d854b..0eba44b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousQueryBatch.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.continuous;
 
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
@@ -31,11 +32,20 @@ public class GridContinuousQueryBatch extends 
GridContinuousBatchAdapter {
     /** {@inheritDoc} */
     @Override public void add(Object obj) {
         assert obj != null;
-        assert obj instanceof CacheContinuousQueryEntry;
+        assert obj instanceof CacheContinuousQueryEntry || obj instanceof List;
 
-        super.add(obj);
+        if (obj instanceof CacheContinuousQueryEntry) {
+            buf.add(obj);
 
-        size.addAndGet(((CacheContinuousQueryEntry)obj).size());
+            size.incrementAndGet();
+        }
+        else {
+            List<Object> objs = (List<Object>)obj;
+
+            buf.addAll(objs);
+
+            size.addAndGet(objs.size());
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
new file mode 100644
index 0000000..5cdcc98
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheContinuousQueryConcurrentPartitionUpdateTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi) cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdatePartitionAtomic() throws Exception {
+        concurrentUpdatePartition(ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdatePartitionTx() throws Exception {
+        concurrentUpdatePartition(TRANSACTIONAL);
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @throws Exception If failed.
+     */
+    private void concurrentUpdatePartition(CacheAtomicityMode atomicityMode) 
throws Exception {
+        Ignite srv = startGrid(0);
+
+        client = true;
+
+        Ignite client = startGrid(1);
+
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicityMode(atomicityMode);
+
+        IgniteCache clientCache = client.createCache(ccfg);
+
+        final AtomicInteger evtCnt = new AtomicInteger();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> 
evts) {
+                for (CacheEntryEvent evt : evts) {
+                    assertNotNull(evt.getKey());
+                    assertNotNull(evt.getValue());
+
+                    evtCnt.incrementAndGet();
+                }
+            }
+        });
+
+        clientCache.query(qry);
+
+        Affinity<Integer> aff = srv.affinity(DEFAULT_CACHE_NAME);
+
+        final List<Integer> keys = new ArrayList<>();
+
+        final int KEYS = 10;
+
+        for (int i = 0; i < 100_000; i++) {
+            if (aff.partition(i) == 0) {
+                keys.add(i);
+
+                if (keys.size() == KEYS)
+                    break;
+            }
+        }
+
+        assertEquals(KEYS, keys.size());
+
+        final int THREADS = 10;
+        final int UPDATES = 1000;
+
+        final IgniteCache<Object, Object> srvCache = 
srv.cache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < 100; i++) {
+            log.info("Iteration: " + i);
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    for (int i = 0; i < UPDATES; i++)
+                        srvCache.put(keys.get(rnd.nextInt(KEYS)), i);
+
+                    return null;
+                }
+            }, THREADS, "update");
+
+            log.info("Finished update");
+
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    log.info("Events: " + evtCnt.get());
+
+                    return evtCnt.get() >= THREADS * UPDATES;
+                }
+            }, 5000);
+
+            assertEquals(THREADS * UPDATES, evtCnt.get());
+
+            evtCnt.set(0);
+        }
+    }
+
+    public void testConcurrentUpdateAndQueryStart() throws Exception {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
new file mode 100644
index 0000000..75a664c
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBufferTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CyclicBarrier;
+import javax.cache.event.EventType;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class CacheContinuousQueryEventBufferTest extends 
GridCommonAbstractTest {
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBuffer1() throws Exception {
+        testBuffer(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBuffer2() throws Exception {
+        for (int i = 0; i < 10; i++) {
+            log.info("Iteration: " + i);
+
+            testBuffer(10);
+        }
+    }
+
+    /**
+     * @param threads Threads number.
+     * @throws Exception If failed.
+     */
+    private void testBuffer(int threads) throws Exception {
+        long seed = System.nanoTime();
+
+        Random rnd = new Random(seed);
+
+        log.info("Start test, seed: " + seed);
+
+        for (int i = 0; i < 10; i++) {
+            int cnt = rnd.nextInt(10_000) + 1;
+
+            testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 
0.5f, threads);
+            testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 
0.9f, threads);
+            testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 
0.99f, threads);
+            testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 
0.01f, threads);
+            testBuffer(rnd, new CacheContinuousQueryEventBuffer(), cnt, 1, 
0.f, threads);
+        }
+
+        CacheContinuousQueryEventBuffer b = new 
CacheContinuousQueryEventBuffer();
+
+        long cntr = 1;
+
+        for (int i = 0; i < 10; i++) {
+            int cnt = rnd.nextInt(10_000) + 1;
+            float ratio = rnd.nextFloat();
+
+            testBuffer(rnd, b, cnt, cntr, ratio, threads);
+
+            cntr += cnt;
+        }
+    }
+
+    /**
+     * @param rnd Random.
+     * @param b Buffer.
+     * @param cnt Entries count.
+     * @param cntr Current counter.
+     * @param filterRatio Filtered events ratio.
+     * @param threads Threads number.
+     * @throws Exception If failed.
+     */
+    private void testBuffer(Random rnd,
+        final CacheContinuousQueryEventBuffer b,
+        int cnt,
+        long cntr,
+        float filterRatio,
+        int threads)
+        throws Exception
+    {
+        List<CacheContinuousQueryEntry> expEntries = new ArrayList<>();
+
+        List<Object> entries = new ArrayList<>();
+
+        long filtered = b.currentFiltered();
+
+        for (int i = 0; i < cnt; i++) {
+            if (rnd.nextFloat() < filterRatio) {
+                entries.add(cntr);
+
+                cntr++;
+
+                filtered++;
+            }
+            else {
+                CacheContinuousQueryEntry entry = new 
CacheContinuousQueryEntry(
+                    0,
+                    EventType.CREATED,
+                    null,
+                    null,
+                    null,
+                    false,
+                    0,
+                    cntr,
+                    null);
+
+                entries.add(entry);
+
+                CacheContinuousQueryEntry expEntry = new 
CacheContinuousQueryEntry(
+                    0,
+                    EventType.CREATED,
+                    null,
+                    null,
+                    null,
+                    false,
+                    0,
+                    cntr,
+                    null);
+
+                expEntry.filteredCnt = filtered;
+
+                cntr++;
+
+                expEntries.add(expEntry);
+
+                filtered = 0;
+            }
+        }
+
+        Collections.shuffle(entries, rnd);
+
+        List<CacheContinuousQueryEntry> actualEntries = new 
ArrayList<>(expEntries.size());
+
+        if (threads == 1) {
+            for (int i = 0; i < entries.size(); i++) {
+                Object o = entries.get(i);
+
+                Object res;
+
+                if (o instanceof Long)
+                    res = b.processFiltered((Long)o);
+                else
+                    res = b.processEntry((CacheContinuousQueryEntry)o);
+
+                if (res != null) {
+                    if (res instanceof CacheContinuousQueryEntry)
+                        actualEntries.add((CacheContinuousQueryEntry)res);
+                    else
+                        
actualEntries.addAll((List<CacheContinuousQueryEntry>)res);
+                }
+            }
+        }
+        else {
+            final CyclicBarrier barrier = new CyclicBarrier(threads);
+
+            final ConcurrentLinkedQueue<Object> q = new 
ConcurrentLinkedQueue<>(entries);
+
+            final ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> act0 
= new ConcurrentSkipListMap<>();
+
+            GridTestUtils.runMultiThreaded(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    barrier.await();
+
+                    Object o;
+
+                    while ((o = q.poll()) != null) {
+                        Object res;
+
+                        if (o instanceof Long)
+                            res = b.processFiltered((Long)o);
+                        else
+                            res = b.processEntry((CacheContinuousQueryEntry)o);
+
+                        if (res != null) {
+                            if (res instanceof CacheContinuousQueryEntry)
+                                
act0.put(((CacheContinuousQueryEntry)res).updateCounter(), 
(CacheContinuousQueryEntry)res);
+                            else {
+                                for (CacheContinuousQueryEntry e : 
((List<CacheContinuousQueryEntry>)res))
+                                    act0.put(e.updateCounter(), e);
+                            }
+                        }
+                    }
+
+                    return null;
+                }
+            }, threads, "test");
+
+            actualEntries.addAll(act0.values());
+        }
+
+        assertEquals(expEntries.size(), actualEntries.size());
+
+        for (int i = 0; i < expEntries.size(); i++) {
+            CacheContinuousQueryEntry expEvt = expEntries.get(i);
+            CacheContinuousQueryEntry actualEvt = actualEntries.get(i);
+
+            assertEquals(expEvt.updateCounter(), actualEvt.updateCounter());
+            assertEquals(expEvt.filteredCnt, actualEvt.filteredCnt);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
index b91217f..d230320 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
@@ -140,7 +140,6 @@ public class IgniteCacheContinuousQueryImmutableEntryTest 
extends GridCommonAbst
             1L,
             new AffinityTopologyVersion(1L));
 
-        e0.filteredEvents(new GridLongList(new long[]{1L, 2L}));
         e0.markFiltered();
 
         ByteBuffer buf = ByteBuffer.allocate(4096);
@@ -156,7 +155,6 @@ public class IgniteCacheContinuousQueryImmutableEntryTest 
extends GridCommonAbst
         assertEquals(e0.cacheId(), e1.cacheId());
         assertEquals(e0.eventType(), e1.eventType());
         assertEquals(e0.isFiltered(), e1.isFiltered());
-        assertEquals(GridLongList.asList(e0.filteredEvents()), 
GridLongList.asList(e1.filteredEvents()));
         assertEquals(e0.isBackup(), e1.isBackup());
         assertEquals(e0.isKeepBinary(), e1.isKeepBinary());
         assertEquals(e0.partition(), e1.partition());

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cc4f9f5/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index 8dd273a..0084cdc 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -21,6 +21,8 @@ import junit.framework.TestSuite;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFilterListenerTest;
+import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryConcurrentPartitionUpdateTest;
+import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBufferTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryExecuteInPrimaryTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryAsyncFilterRandomOperationTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterRandomOperationTest;
@@ -118,6 +120,9 @@ public class IgniteCacheQuerySelfTestSuite3 extends 
TestSuite {
         suite.addTestSuite(ContinuousQueryPeerClassLoadingTest.class);
         suite.addTestSuite(ClientReconnectContinuousQueryTest.class);
 
+        
suite.addTestSuite(CacheContinuousQueryConcurrentPartitionUpdateTest.class);
+        suite.addTestSuite(CacheContinuousQueryEventBufferTest.class);
+
         suite.addTest(IgniteDistributedJoinTestSuite.suite());
 
         return suite;

Reply via email to