This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new b65411e  IGNITE-14402 Java thin client: Continuous queries support - 
Fixes #8960.
b65411e is described below

commit b65411e68105c36dc1220d850d3f595df2f8af4b
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Apr 8 13:31:08 2021 +0300

    IGNITE-14402 Java thin client: Continuous queries support - Fixes #8960.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../clients/AbstractClientCompatibilityTest.java   |   8 +
 .../clients/JavaThinCompatibilityTest.java         |  26 +
 .../java/org/apache/ignite/client/ClientCache.java |  62 +-
 .../ClientDisconnectListener.java}                 |  19 +-
 .../thin/ClientCacheEntryListenerHandler.java      | 265 +++++++
 .../thin/ClientCacheEntryListenersRegistry.java    |  61 ++
 .../client/thin/ClientContinuousQueryCursor.java   |  61 ++
 .../internal/client/thin/ClientJCacheAdapter.java  | 194 ++++++
 .../thin/ClientJCacheEntryListenerAdapter.java     |  70 ++
 .../client/thin/ClientNotificationType.java        |  21 +-
 .../internal/client/thin/ClientOperation.java      |   2 +
 .../internal/client/thin/TcpClientCache.java       | 128 +++-
 .../internal/client/thin/TcpClientChannel.java     |  16 +-
 .../internal/client/thin/TcpIgniteClient.java      |  23 +-
 .../org/apache/ignite/client/ReliabilityTest.java  |  14 -
 .../client/thin/AbstractThinClientTest.java        |  22 +
 .../client/thin/CacheEntryListenersTest.java       | 774 +++++++++++++++++++++
 .../internal/client/thin/ComputeTaskTest.java      |  17 -
 .../internal/client/thin/ReliableChannelTest.java  |   2 +-
 .../org/apache/ignite/client/ClientTestSuite.java  |   2 +
 20 files changed, 1725 insertions(+), 62 deletions(-)

diff --git 
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
 
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
index 07ba238..c12cf1c 100644
--- 
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
+++ 
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
@@ -57,6 +57,12 @@ public abstract class AbstractClientCompatibilityTest 
extends IgniteCompatibilit
     /** Version 2.9.0. */
     protected static final IgniteProductVersion VER_2_9_0 = 
IgniteProductVersion.fromString("2.9.0");
 
+    /** Version 2.10.0. */
+    protected static final IgniteProductVersion VER_2_10_0 = 
IgniteProductVersion.fromString("2.10.0");
+
+    /** Version 2.11.0. */
+    protected static final IgniteProductVersion VER_2_11_0 = 
IgniteProductVersion.fromString("2.11.0");
+
     /** Ignite versions to test. Note: Only released versions or current 
version should be included to this list. */
     protected static final String[] TESTED_IGNITE_VERSIONS = new String[] {
         "2.4.0",
@@ -68,6 +74,8 @@ public abstract class AbstractClientCompatibilityTest extends 
IgniteCompatibilit
         "2.8.0",
         "2.8.1",
         "2.9.0",
+        "2.9.1",
+        "2.10.0",
         IgniteVersionUtils.VER_STR
     };
 
diff --git 
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
 
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
index 0b870bf..3aa6544 100644
--- 
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
+++ 
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
@@ -17,11 +17,13 @@
 
 package org.apache.ignite.compatibility.clients;
 
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
 import javax.cache.expiry.CreatedExpiryPolicy;
 import javax.cache.expiry.Duration;
 import org.apache.ignite.Ignite;
@@ -32,6 +34,7 @@ import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.client.ClientCache;
 import org.apache.ignite.client.ClientCacheConfiguration;
@@ -52,6 +55,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Assume;
@@ -313,6 +317,25 @@ public class JavaThinCompatibilityTest extends 
AbstractClientCompatibilityTest {
         }
     }
 
+    /** */
+    private void testContinuousQueries() throws Exception {
+        X.println(">>>> Testing continuous queries");
+
+        try (IgniteClient client = Ignition.startClient(new 
ClientConfiguration().setAddresses(ADDR))) {
+            ClientCache<Object, Object> cache = 
client.getOrCreateCache("testContinuousQueries");
+
+            List<CacheEntryEvent<?, ?>> allEvts = new ArrayList<>();
+
+            cache.query(new ContinuousQuery<>().setLocalListener(evts -> 
evts.forEach(allEvts::add)));
+
+            cache.put(0, 0);
+            cache.put(0, 1);
+            cache.remove(0);
+
+            assertTrue(GridTestUtils.waitForCondition(() -> allEvts.size() == 
3, 1_000L));
+        }
+    }
+
     /** {@inheritDoc} */
     @Override protected void testClient(IgniteProductVersion clientVer, 
IgniteProductVersion serverVer) throws Exception {
         IgniteProductVersion minVer = clientVer.compareTo(serverVer) < 0 ? 
clientVer : serverVer;
@@ -345,6 +368,9 @@ public class JavaThinCompatibilityTest extends 
AbstractClientCompatibilityTest {
             testCompute();
             testServices();
         }
+
+        if (clientVer.compareTo(VER_2_11_0) >= 0 && 
serverVer.compareTo(VER_2_10_0) >= 0)
+            testContinuousQueries();
     }
 
     /** */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java 
b/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
index f1450c7..60bc28d 100644
--- a/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
@@ -22,9 +22,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryListener;
 import javax.cache.expiry.ExpiryPolicy;
-
 import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.Query;
 import org.apache.ignite.cache.query.QueryCursor;
@@ -703,19 +705,73 @@ public interface ClientCache<K, V> {
     public <K1, V1> ClientCache<K1, V1> withExpirePolicy(ExpiryPolicy 
expirePlc);
 
     /**
-     * Queries cache. Supports {@link ScanQuery} and {@link SqlFieldsQuery}.
+     * Queries cache. Supports {@link ScanQuery}, {@link SqlFieldsQuery} and 
{@link ContinuousQuery}.
+     * <p>
+     * NOTE: For continuous query listeners there is no failover in case of 
client channel failure, this event should
+     * be handled on the user's side. Use {@link #query(ContinuousQuery, 
ClientDisconnectListener)} method to get
+     * notified about client disconnected event via {@link 
ClientDisconnectListener} interface if you need it.
      *
      * @param qry Query.
      * @return Cursor.
-     *
      */
     public <R> QueryCursor<R> query(Query<R> qry);
 
     /**
+     * Start {@link ContinuousQuery} on the cache.
+     * <p>
+     * NOTE: There is no failover in case of client channel failure, this 
event should be handled on the user's side.
+     * Use {@code disconnectListener} to handle this.
+     *
+     * @param qry Query.
+     * @param disconnectListener Listener of client disconnected event.
+     * @return Cursor.
+     */
+    public <R> QueryCursor<R> query(ContinuousQuery<K, V> qry, 
ClientDisconnectListener disconnectListener);
+
+    /**
      * Convenience method to execute {@link SqlFieldsQuery}.
      *
      * @param qry Query.
      * @return Cursor.
      */
     public FieldsQueryCursor<List<?>> query(SqlFieldsQuery qry);
+
+    /**
+     * Registers a {@link CacheEntryListener}. The supplied {@link 
CacheEntryListenerConfiguration} is used to
+     * instantiate a listener and apply it to those events specified in the 
configuration.
+     * <p>
+     * NOTE: There is no failover in case of client channel failure, this 
event should be handled on the user's side.
+     * Use {@link #registerCacheEntryListener(CacheEntryListenerConfiguration, 
ClientDisconnectListener)} method to get
+     * notified about client disconnected event via {@link 
ClientDisconnectListener} interface if you need it.
+     *
+     * @param cacheEntryListenerConfiguration a factory and related 
configuration for creating the listener.
+     * @throws IllegalArgumentException is the same 
CacheEntryListenerConfiguration is used more than once or
+     *          if some unsupported by thin client properties are set.
+     * @see CacheEntryListener
+     */
+    public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, 
V> cacheEntryListenerConfiguration);
+
+    /**
+     * Registers a {@link CacheEntryListener}. The supplied {@link 
CacheEntryListenerConfiguration} is used to
+     * instantiate a listener and apply it to those events specified in the 
configuration.
+     * <p>
+     * NOTE: There is no failover in case of client channel failure, this 
event should be handled on the user's side.
+     * Use {@code disconnectListener} to handle this.
+     *
+     * @param cacheEntryListenerConfiguration a factory and related 
configuration for creating the listener.
+     * @param disconnectListener Listener of client disconnected event.
+     * @throws IllegalArgumentException is the same 
CacheEntryListenerConfiguration is used more than once or
+     *          if some unsupported by thin client properties are set.
+     * @see CacheEntryListener
+     */
+    public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, 
V> cacheEntryListenerConfiguration,
+        ClientDisconnectListener disconnectListener);
+
+    /**
+     * Deregisters a listener, using the {@link 
CacheEntryListenerConfiguration} that was used to register it.
+     *
+     * @param cacheEntryListenerConfiguration the factory and related 
configuration that was used to create the
+     *         listener.
+     */
+    public void 
deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> 
cacheEntryListenerConfiguration);
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
 
b/modules/core/src/main/java/org/apache/ignite/client/ClientDisconnectListener.java
similarity index 54%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
copy to 
modules/core/src/main/java/org/apache/ignite/client/ClientDisconnectListener.java
index f792908..e2c76b8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/client/ClientDisconnectListener.java
@@ -15,12 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client.thin;
+package org.apache.ignite.client;
+
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import org.apache.ignite.cache.query.ContinuousQuery;
 
 /**
- * Notification types.
+ * Client disconnected event listener. Such listeners can be used in {@link 
ClientCache#query(ContinuousQuery,
+ * ClientDisconnectListener)} or {@link 
ClientCache#registerCacheEntryListener(CacheEntryListenerConfiguration,
+ * ClientDisconnectListener)} methods to handle client channel failure.
  */
-enum ClientNotificationType {
-    /** Compute task finished. */
-    COMPUTE_TASK_FINISHED;
+public interface ClientDisconnectListener {
+    /**
+     * Client disconnected callback.
+     *
+     * @param reason Exception that caused the disconnect, can be {@code null}.
+     */
+    public void onDisconnected(Exception reason);
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
new file mode 100644
index 0000000..3f27881
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.client.ClientDisconnectListener;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static 
org.apache.ignite.internal.client.thin.ClientNotificationType.CONTINUOUS_QUERY_EVENT;
+import static 
org.apache.ignite.internal.client.thin.TcpClientCache.JAVA_PLATFORM;
+
+/**
+ * Handler for {@link ContinuousQuery} listeners and JCache cache entry 
listeners.
+ */
+public class ClientCacheEntryListenerHandler<K, V> implements 
NotificationListener, AutoCloseable {
+    /** "Keep binary" flag mask. */
+    private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
+
+    /** */
+    private final Cache<K, V> jCacheAdapter;
+
+    /** */
+    private final ReliableChannel ch;
+
+    /** */
+    private final boolean keepBinary;
+
+    /** */
+    private final ClientUtils utils;
+
+    /** */
+    private volatile CacheEntryUpdatedListener<K, V> locLsnr;
+
+    /** */
+    private volatile ClientDisconnectListener disconnectLsnr;
+
+    /** */
+    private volatile ClientChannel clientCh;
+
+    /** */
+    private volatile Long rsrcId;
+
+    /** */
+    ClientCacheEntryListenerHandler(
+        Cache<K, V> jCacheAdapter,
+        ReliableChannel ch,
+        ClientBinaryMarshaller marsh,
+        boolean keepBinary
+    ) {
+        this.jCacheAdapter = jCacheAdapter;
+        this.ch = ch;
+        this.keepBinary = keepBinary;
+        utils = new ClientUtils(marsh);
+    }
+
+    /**
+     * Send request to the server and start
+     */
+    public synchronized void startListen(
+        CacheEntryUpdatedListener<K, V> locLsnr,
+        ClientDisconnectListener disconnectLsnr,
+        Factory<? extends CacheEntryEventFilter<? super K, ? super V>> 
rmtFilterFactory,
+        int pageSize,
+        long timeInterval,
+        boolean includeExpired
+    ) {
+        assert locLsnr != null;
+
+        if (clientCh != null)
+            throw new IllegalStateException("Listener was already started");
+
+        this.locLsnr = locLsnr;
+        this.disconnectLsnr = disconnectLsnr;
+
+        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
+            BinaryOutputStream out = payloadCh.out();
+
+            out.writeInt(ClientUtils.cacheId(jCacheAdapter.getName()));
+            out.writeByte(keepBinary ? KEEP_BINARY_FLAG_MASK : 0);
+            out.writeInt(pageSize);
+            out.writeLong(timeInterval);
+            out.writeBoolean(includeExpired);
+
+            if (rmtFilterFactory == null)
+                out.writeByte(GridBinaryMarshaller.NULL);
+            else {
+                utils.writeObject(out, rmtFilterFactory);
+                out.writeByte(JAVA_PLATFORM);
+            }
+        };
+
+        Function<PayloadInputChannel, T2<ClientChannel, Long>> qryReader = 
payloadCh -> {
+            ClientChannel ch = payloadCh.clientChannel();
+            Long rsrcId = payloadCh.in().readLong();
+
+            ch.addNotificationListener(CONTINUOUS_QUERY_EVENT, rsrcId, this);
+
+            return new T2<>(ch, rsrcId);
+        };
+
+        try {
+            T2<ClientChannel, Long> params = 
ch.service(ClientOperation.QUERY_CONTINUOUS, qryWriter, qryReader);
+
+            clientCh = params.get1();
+            rsrcId = params.get2();
+        }
+        catch (ClientError e) {
+            throw new ClientException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acceptNotification(ByteBuffer payload, Exception 
err) {
+        if (err == null && payload != null) {
+            BinaryInputStream in = BinaryByteBufferInputStream.create(payload);
+
+            int cnt = in.readInt();
+
+            List<CacheEntryEvent<? extends K, ? extends V>> evts = new 
ArrayList<>(cnt);
+
+            for (int i = 0; i < cnt; i++) {
+                K key = utils.readObject(in, keepBinary);
+                V oldVal = utils.readObject(in, keepBinary);
+                V val = utils.readObject(in, keepBinary);
+                byte evtTypeByte = in.readByte();
+
+                EventType evtType = eventType(evtTypeByte);
+
+                if (evtType == null)
+                    onChannelClosed(new ClientException("Unknown event type: " 
+ evtTypeByte));
+
+                evts.add(new CacheEntryEventImpl<>(jCacheAdapter, evtType, 
key, oldVal, val));
+            }
+
+            locLsnr.onUpdated(evts);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onChannelClosed(Exception reason) {
+        ClientDisconnectListener lsnr = disconnectLsnr;
+
+        if (lsnr != null)
+            lsnr.onDisconnected(reason);
+
+        U.closeQuiet(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() {
+        ClientChannel clientCh = this.clientCh;
+
+        if (clientCh != null && !clientCh.closed()) {
+            clientCh.removeNotificationListener(CONTINUOUS_QUERY_EVENT, 
rsrcId);
+
+            clientCh.service(ClientOperation.RESOURCE_CLOSE, ch -> 
ch.out().writeLong(rsrcId), null);
+        }
+    }
+
+    /**
+     * Client channel.
+     */
+    public ClientChannel clientChannel() {
+        return clientCh;
+    }
+
+    /** */
+    private EventType eventType(byte evtTypeByte) {
+        switch (evtTypeByte) {
+            case 0: return EventType.CREATED;
+            case 1: return EventType.UPDATED;
+            case 2: return EventType.REMOVED;
+            case 3: return EventType.EXPIRED;
+            default: return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheEntryEventImpl<K, V> extends CacheEntryEvent<K, 
V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Key. */
+        private final K key;
+
+        /** Old value. */
+        private final V oldVal;
+
+        /** Value. */
+        private final V val;
+
+        /**
+         *
+         */
+        private CacheEntryEventImpl(Cache<K, V> src, EventType evtType, K key, 
V oldVal, V val) {
+            super(src, evtType);
+
+            this.key = key;
+            this.oldVal = oldVal;
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public K getKey() {
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public V getValue() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public V getOldValue() {
+            return oldVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isOldValueAvailable() {
+            return oldVal != null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public <T> T unwrap(Class<T> clazz) {
+            if (clazz.isAssignableFrom(getClass()))
+                return clazz.cast(this);
+
+            throw new IllegalArgumentException("Unwrapping to class is not 
supported: " + clazz);
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
new file mode 100644
index 0000000..07bb799
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+
+/**
+ * Per-cache cache entry listeners registry. Listeners can't be stored inside 
ClientCache instance, since there can be
+ * several such instances per one cache.
+ */
+public class ClientCacheEntryListenersRegistry {
+    /** */
+    private final Map<String, Map<CacheEntryListenerConfiguration<?, ?>,
+        ClientCacheEntryListenerHandler<?, ?>>> lsnrs = new 
ConcurrentHashMap<>();
+
+    /**
+     * Register listener handler.
+     *
+     * @return {@code True} if listener was succesfuly registered,
+     *         {@code false} if listener was already registered before.
+     */
+    public boolean registerCacheEntryListener(String cacheName, 
CacheEntryListenerConfiguration<?, ?> cfg,
+        ClientCacheEntryListenerHandler<?, ?> hnd) {
+        Map<CacheEntryListenerConfiguration<?, ?>, 
ClientCacheEntryListenerHandler<?, ?>> cacheLsnrs =
+            lsnrs.computeIfAbsent(cacheName, k -> new ConcurrentHashMap<>());
+
+        ClientCacheEntryListenerHandler<?, ?> old = 
cacheLsnrs.putIfAbsent(cfg, hnd);
+
+        return old == null;
+    }
+
+    /**
+     * Deregister listener handler.
+     *
+     * @return Listener handler.
+     */
+    public ClientCacheEntryListenerHandler<?, ?> 
deregisterCacheEntryListener(String cacheName,
+        CacheEntryListenerConfiguration<?, ?> cfg) {
+        Map<CacheEntryListenerConfiguration<?, ?>, 
ClientCacheEntryListenerHandler<?, ?>> cacheLsnrs =
+            lsnrs.get(cacheName);
+
+        return cacheLsnrs != null ? cacheLsnrs.remove(cfg) : null;
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientContinuousQueryCursor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientContinuousQueryCursor.java
new file mode 100644
index 0000000..0ff1f41
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientContinuousQueryCursor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Thin client continuous query cursor.
+ */
+class ClientContinuousQueryCursor<T> implements QueryCursor<T> {
+    /** Initial query cursor. */
+    private final QueryCursor<T> initQryCursor;
+
+    /** Cache entry listener handler. */
+    private final ClientCacheEntryListenerHandler<?, ?> lsnrHnd;
+
+    /**
+     * @param initQryCursor Initial query cursor.
+     * @param lsnrHnd Cache entry listener handler.
+     */
+    ClientContinuousQueryCursor(QueryCursor<T> initQryCursor, 
ClientCacheEntryListenerHandler<?, ?> lsnrHnd) {
+        this.initQryCursor = initQryCursor;
+        this.lsnrHnd = lsnrHnd;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<T> getAll() {
+        return initQryCursor == null ? Collections.emptyList() : 
initQryCursor.getAll();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        U.closeQuiet(initQryCursor);
+        U.closeQuiet(lsnrHnd);
+    }
+
+    /** {@inheritDoc} */
+    @NotNull @Override public Iterator<T> iterator() {
+        return initQryCursor == null ? Collections.emptyIterator() : 
initQryCursor.iterator();
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientJCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientJCacheAdapter.java
new file mode 100644
index 0000000..45d3cf6
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientJCacheAdapter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import javax.cache.Cache;
+import javax.cache.CacheManager;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Configuration;
+import javax.cache.integration.CompletionListener;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import org.apache.ignite.client.ClientCache;
+
+/**
+ * Thin client cache to JCache compatible cache adapter.
+ */
+class ClientJCacheAdapter<K, V> implements Cache<K, V> {
+    /** Delegate. */
+    private final ClientCache<K, V> delegate;
+
+    /**
+     * @param delegate Delegate.
+     */
+    ClientJCacheAdapter(ClientCache<K, V> delegate) {
+        this.delegate = delegate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public V get(K key) {
+        return delegate.get(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<K, V> getAll(Set<? extends K> keys) {
+        return delegate.getAll(keys);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean containsKey(K key) {
+        return delegate.containsKey(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void loadAll(Set<? extends K> keys, boolean 
replaceExistingValues,
+        CompletionListener completionListener) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void put(K key, V val) {
+        delegate.put(key, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public V getAndPut(K key, V val) {
+        return delegate.getAndPut(key, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void putAll(Map<? extends K, ? extends V> map) {
+        delegate.putAll(map);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean putIfAbsent(K key, V val) {
+        return delegate.putIfAbsent(key, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean remove(K key) {
+        return delegate.remove(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean remove(K key, V oldVal) {
+        return delegate.remove(key, oldVal);
+    }
+
+    /** {@inheritDoc} */
+    @Override public V getAndRemove(K key) {
+        return delegate.getAndRemove(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean replace(K key, V oldVal, V newVal) {
+        return delegate.replace(key, oldVal, newVal);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean replace(K key, V val) {
+        return delegate.replace(key, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public V getAndReplace(K key, V val) {
+        return delegate.getAndReplace(key, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeAll(Set<? extends K> keys) {
+        delegate.removeAll(keys);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeAll() {
+        delegate.removeAll();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void clear() {
+        delegate.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override public <C extends Configuration<K, V>> C 
getConfiguration(Class<C> clazz) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProc,
+        Object... arguments) throws EntryProcessorException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? 
extends K> keys,
+        EntryProcessor<K, V, T> entryProc, Object... arguments) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getName() {
+        return delegate.getName();
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheManager getCacheManager() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isClosed() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T unwrap(Class<T> clazz) {
+        if (clazz.isAssignableFrom(delegate.getClass()))
+            return (T)delegate;
+
+        throw new IllegalArgumentException("Unwrapping to class is not 
supported: " + clazz);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void registerCacheEntryListener(
+        CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) 
{
+        delegate.registerCacheEntryListener(cacheEntryListenerConfiguration);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void deregisterCacheEntryListener(
+        CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) 
{
+        delegate.deregisterCacheEntryListener(cacheEntryListenerConfiguration);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterator<Entry<K, V>> iterator() {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientJCacheEntryListenerAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientJCacheEntryListenerAdapter.java
new file mode 100644
index 0000000..742af38
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientJCacheEntryListenerAdapter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collections;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryListener;
+import javax.cache.event.CacheEntryRemovedListener;
+import javax.cache.event.CacheEntryUpdatedListener;
+
+/**
+ * Adapter to convert CQ listener calls to JCache listener calls.
+ */
+class ClientJCacheEntryListenerAdapter<K, V> implements 
CacheEntryUpdatedListener<K, V> {
+    /** Created listener. */
+    private final CacheEntryCreatedListener<K, V> crtLsnr;
+
+    /** Updated listener. */
+    private final CacheEntryUpdatedListener<K, V> updLsnr;
+
+    /** Removed listener. */
+    private final CacheEntryRemovedListener<K, V> rmvLsnr;
+
+    /** Expired listener. */
+    private final CacheEntryExpiredListener<K, V> expLsnr;
+
+    /** */
+    ClientJCacheEntryListenerAdapter(CacheEntryListener<? super K, ? super V> 
impl) {
+        crtLsnr = impl instanceof CacheEntryCreatedListener ? 
(CacheEntryCreatedListener<K, V>)impl : evts -> {};
+        updLsnr = impl instanceof CacheEntryUpdatedListener ? 
(CacheEntryUpdatedListener<K, V>)impl : evts -> {};
+        rmvLsnr = impl instanceof CacheEntryRemovedListener ? 
(CacheEntryRemovedListener<K, V>)impl : evts -> {};
+        expLsnr = impl instanceof CacheEntryExpiredListener ? 
(CacheEntryExpiredListener<K, V>)impl : evts -> {};
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, ? 
extends V>> evts) {
+        for (CacheEntryEvent<? extends K, ? extends V> evt : evts) {
+            try {
+                Iterable<CacheEntryEvent<? extends K, ? extends V>> evtColl = 
Collections.singleton(evt);
+
+                switch (evt.getEventType()) {
+                    case CREATED: crtLsnr.onCreated(evtColl); break;
+                    case UPDATED: updLsnr.onUpdated(evtColl); break;
+                    case REMOVED: rmvLsnr.onRemoved(evtColl); break;
+                    case EXPIRED: expLsnr.onExpired(evtColl); break;
+                }
+            }
+            catch (Exception ignored) {
+                // Ignore exceptions in user code.
+            }
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
index f792908..1a716ac 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
@@ -21,6 +21,25 @@ package org.apache.ignite.internal.client.thin;
  * Notification types.
  */
 enum ClientNotificationType {
+    /** Continuous query  event. */
+    CONTINUOUS_QUERY_EVENT(false),
+
     /** Compute task finished. */
-    COMPUTE_TASK_FINISHED;
+    COMPUTE_TASK_FINISHED(true);
+
+    /** */
+    private final boolean keepNotificationsWithoutListener;
+
+    /** */
+    ClientNotificationType(boolean keepNotificationsWithoutListener) {
+        this.keepNotificationsWithoutListener = 
keepNotificationsWithoutListener;
+    }
+
+    /**
+     * @return {@code True} if it's required to save all received 
notifications when listener for this notification
+     * is not yet registered. These notifications will be processed when 
listener for their resource will be registered.
+     */
+    public boolean keepNotificationsWithoutListener() {
+        return keepNotificationsWithoutListener;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index 987345d..1abb51a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -61,6 +61,8 @@ enum ClientOperation {
     /** Query sql cursor get page. */QUERY_SQL_CURSOR_GET_PAGE(2003),
     /** Query sql fields. */QUERY_SQL_FIELDS(2004),
     /** Query sql fields cursor get page. 
*/QUERY_SQL_FIELDS_CURSOR_GET_PAGE(2005),
+    /** Continuous query. */QUERY_CONTINUOUS(2006),
+    /** Continuous query event. */QUERY_CONTINUOUS_EVENT(2007, 
ClientNotificationType.CONTINUOUS_QUERY_EVENT),
 
     /** Get binary type. */GET_BINARY_TYPE(3002),
     /** Register binary type name. */REGISTER_BINARY_TYPE_NAME(3001),
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index cf9b70b..8857077 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -25,8 +25,13 @@ import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import javax.cache.Cache;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryListener;
 import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.Query;
 import org.apache.ignite.cache.query.QueryCursor;
@@ -35,12 +40,15 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.client.ClientCache;
 import org.apache.ignite.client.ClientCacheConfiguration;
+import org.apache.ignite.client.ClientDisconnectListener;
 import org.apache.ignite.client.ClientException;
 import org.apache.ignite.client.IgniteClientFuture;
 import org.apache.ignite.internal.binary.GridBinaryMarshaller;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
 import 
org.apache.ignite.internal.client.thin.TcpClientTransactions.TcpClientTransaction;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.internal.client.thin.ProtocolVersionFeature.EXPIRY_POLICY;
@@ -59,6 +67,9 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
     /** "With expiry policy" flag mask. */
     private static final byte WITH_EXPIRY_POLICY_FLAG_MASK = 0x04;
 
+    /** Platform type: Java platform. */
+    static final byte JAVA_PLATFORM = 1;
+
     /** Cache id. */
     private final int cacheId;
 
@@ -83,24 +94,34 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
     /** Expiry policy. */
     private final ExpiryPolicy expiryPlc;
 
+    /** Cache entry listeners registry. */
+    private final ClientCacheEntryListenersRegistry lsnrsRegistry;
+
+    /** JCache adapter. */
+    private final Cache<K, V> jCacheAdapter;
+
     /** Constructor. */
-    TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller 
marsh, TcpClientTransactions transactions) {
-        this(name, ch, marsh, transactions, false, null);
+    TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller 
marsh, TcpClientTransactions transactions,
+        ClientCacheEntryListenersRegistry lsnrsRegistry) {
+        this(name, ch, marsh, transactions, lsnrsRegistry, false, null);
     }
 
     /** Constructor. */
     TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller 
marsh, TcpClientTransactions transactions,
-        boolean keepBinary, ExpiryPolicy expiryPlc) {
+        ClientCacheEntryListenersRegistry lsnrsRegistry, boolean keepBinary, 
ExpiryPolicy expiryPlc) {
         this.name = name;
         this.cacheId = ClientUtils.cacheId(name);
         this.ch = ch;
         this.marsh = marsh;
         this.transactions = transactions;
+        this.lsnrsRegistry = lsnrsRegistry;
 
         serDes = new ClientUtils(marsh);
 
         this.keepBinary = keepBinary;
         this.expiryPlc = expiryPlc;
+
+        jCacheAdapter = new ClientJCacheAdapter<>(this);
     }
 
     /** {@inheritDoc} */
@@ -697,12 +718,12 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
     /** {@inheritDoc} */
     @Override public <K1, V1> ClientCache<K1, V1> withKeepBinary() {
         return keepBinary ? (ClientCache<K1, V1>)this :
-            new TcpClientCache<>(name, ch, marsh, transactions, true, 
expiryPlc);
+            new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry, 
true, expiryPlc);
     }
 
     /** {@inheritDoc} */
     @Override public <K1, V1> ClientCache<K1, V1> 
withExpirePolicy(ExpiryPolicy expirePlc) {
-        return new TcpClientCache<>(name, ch, marsh, transactions, keepBinary, 
expirePlc);
+        return new TcpClientCache<>(name, ch, marsh, transactions, 
lsnrsRegistry, keepBinary, expirePlc);
     }
 
     /** {@inheritDoc} */
@@ -719,6 +740,8 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
             res = (QueryCursor<R>)sqlQuery((SqlQuery)qry);
         else if (qry instanceof SqlFieldsQuery)
             res = (QueryCursor<R>)query((SqlFieldsQuery)qry);
+        else if (qry instanceof ContinuousQuery)
+            res = query((ContinuousQuery<K, V>)qry, null);
         else
             throw new IllegalArgumentException(
                 String.format("Query of type [%s] is not supported", 
qry.getClass().getSimpleName())
@@ -747,6 +770,99 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
         ));
     }
 
+    /** {@inheritDoc} */
+    @Override public <R> QueryCursor<R> query(ContinuousQuery<K, V> qry, 
ClientDisconnectListener disconnectLsnr) {
+        A.ensure(!(qry.getInitialQuery() instanceof ContinuousQuery), "Initial 
query for continuous query " +
+            "can't be an instance of another continuous query");
+        A.notNull(qry.getLocalListener(), "Local listener");
+        A.ensure(!qry.isLocal(), "Local query is not supported by thin 
client");
+        A.ensure(qry.isAutoUnsubscribe(), "AutoUnsubscribe flag is not 
supported by thin client");
+        A.ensure(qry.getRemoteFilterFactory() == null || qry.getRemoteFilter() 
== null,
+            "RemoteFilter and RemoteFilterFactory can't be used together");
+
+        ClientCacheEntryListenerHandler<K, V> hnd = new 
ClientCacheEntryListenerHandler<>(
+            jCacheAdapter,
+            ch,
+            marsh,
+            keepBinary
+        );
+
+        hnd.startListen(
+            qry.getLocalListener(),
+            disconnectLsnr,
+            qry.getRemoteFilterFactory() != null ? 
qry.getRemoteFilterFactory() : qry.getRemoteFilter() != null ?
+                FactoryBuilder.factoryOf(qry.getRemoteFilter()) : null,
+            qry.getPageSize(),
+            qry.getTimeInterval(),
+            qry.isIncludeExpired()
+        );
+
+        if (qry.getInitialQuery() != null) {
+            try {
+                QueryCursor<R> cur = 
(QueryCursor<R>)query(qry.getInitialQuery());
+
+                return new ClientContinuousQueryCursor<>(cur, hnd);
+            }
+            catch (Exception e) {
+                U.closeQuiet(hnd);
+
+                throw e;
+            }
+        }
+        else
+            return new ClientContinuousQueryCursor<>(null, hnd);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void 
registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cfg) {
+        registerCacheEntryListener(cfg, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void 
registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cfg,
+        ClientDisconnectListener disconnectLsnr) {
+        A.ensure(!cfg.isSynchronous(),
+            "Unsupported cfg.isSynchronous() flag value");
+
+        A.notNull(cfg.getCacheEntryListenerFactory(), 
"cfg.getCacheEntryListenerFactory()");
+
+        ClientCacheEntryListenerHandler<K, V> hnd = new 
ClientCacheEntryListenerHandler<>(
+            jCacheAdapter,
+            ch,
+            marsh,
+            keepBinary
+        );
+
+        if (lsnrsRegistry.registerCacheEntryListener(name, cfg, hnd)) {
+            CacheEntryListener<? super K, ? super V> locLsnr = 
cfg.getCacheEntryListenerFactory().create();
+
+            ClientDisconnectListener disconnectLsnr0 = e -> {
+                if (disconnectLsnr != null)
+                    disconnectLsnr.onDisconnected(e);
+
+                lsnrsRegistry.deregisterCacheEntryListener(name, cfg);
+            };
+
+            hnd.startListen(
+                new ClientJCacheEntryListenerAdapter<>(locLsnr),
+                disconnectLsnr0,
+                cfg.getCacheEntryEventFilterFactory(),
+                ContinuousQuery.DFLT_PAGE_SIZE,
+                ContinuousQuery.DFLT_TIME_INTERVAL,
+                locLsnr instanceof CacheEntryExpiredListener
+            );
+        }
+        else
+            throw new IllegalStateException("Listener is already registered 
for configuration: " + cfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void 
deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> cfg) {
+        ClientCacheEntryListenerHandler<?, ?> hnd = 
lsnrsRegistry.deregisterCacheEntryListener(name, cfg);
+
+        U.closeQuiet(hnd);
+    }
+
     /** Handle scan query. */
     private QueryCursor<Cache.Entry<K, V>> scanQuery(ScanQuery<K, V> qry) {
         Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
@@ -758,7 +874,7 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
                 out.writeByte(GridBinaryMarshaller.NULL);
             else {
                 serDes.writeObject(out, qry.getFilter());
-                out.writeByte((byte)1); // Java platform
+                out.writeByte(JAVA_PLATFORM);
             }
 
             out.writeInt(qry.getPageSize());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index 3cd8688..4246b07 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -153,8 +153,10 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         throws ClientConnectionException, ClientAuthenticationException, 
ClientProtocolError {
         validateConfiguration(cfg);
 
-        for (ClientNotificationType type : ClientNotificationType.values())
-            pendingNotifications[type.ordinal()] = new ConcurrentHashMap<>();
+        for (ClientNotificationType type : ClientNotificationType.values()) {
+            if (type.keepNotificationsWithoutListener())
+                pendingNotifications[type.ordinal()] = new 
ConcurrentHashMap<>();
+        }
 
         Executor cfgExec = cfg.getAsyncContinuationExecutor();
         asyncContinuationExecutor = cfgExec != null ? cfgExec : 
ForkJoinPool.commonPool();
@@ -447,7 +449,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                     if (lsrns != null)
                         lsnr = lsrns.get(resId);
 
-                    if (lsnr == null) {
+                    if (notificationType.keepNotificationsWithoutListener() && 
lsnr == null) {
                         
pendingNotifications[notificationType.ordinal()].computeIfAbsent(resId,
                             k -> new ConcurrentLinkedQueue<>()).add(new 
T2<>(res, err));
                     }
@@ -484,7 +486,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
 
     /** {@inheritDoc} */
     @Override public void addNotificationListener(ClientNotificationType type, 
Long rsrcId, NotificationListener lsnr) {
-        Queue<T2<ByteBuffer, Exception>> pendingQueue;
+        Queue<T2<ByteBuffer, Exception>> pendingQueue = null;
 
         notificationLsnrsGuard.writeLock().lock();
 
@@ -499,7 +501,8 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
 
             lsnrs.put(rsrcId, lsnr);
 
-            pendingQueue = pendingNotifications[type.ordinal()].remove(rsrcId);
+            if (type.keepNotificationsWithoutListener())
+                pendingQueue = 
pendingNotifications[type.ordinal()].remove(rsrcId);
         }
         finally {
             notificationLsnrsGuard.writeLock().unlock();
@@ -522,7 +525,8 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
 
             lsnrs.remove(rsrcId);
 
-            pendingNotifications[type.ordinal()].remove(rsrcId);
+            if (type.keepNotificationsWithoutListener())
+                pendingNotifications[type.ordinal()].remove(rsrcId);
 
         }
         finally {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index c67184a..617948d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -84,6 +84,9 @@ public class TcpIgniteClient implements IgniteClient {
     /** Services facade. */
     private final ClientServicesImpl services;
 
+    /** Registered entry listeners for all caches. */
+    private final ClientCacheEntryListenersRegistry lsnrsRegistry;
+
     /** Marshaller. */
     private final ClientBinaryMarshaller marsh;
 
@@ -130,6 +133,8 @@ public class TcpIgniteClient implements IgniteClient {
             compute = new ClientComputeImpl(ch, marsh, 
cluster.defaultClusterGroup());
 
             services = new ClientServicesImpl(ch, marsh, 
cluster.defaultClusterGroup());
+
+            lsnrsRegistry = new ClientCacheEntryListenersRegistry();
         }
         catch (Exception e) {
             ch.close();
@@ -148,7 +153,7 @@ public class TcpIgniteClient implements IgniteClient {
 
         ch.request(ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME, req -> 
writeString(name, req.out()));
 
-        return new TcpClientCache<>(name, ch, marsh, transactions);
+        return new TcpClientCache<>(name, ch, marsh, transactions, 
lsnrsRegistry);
     }
 
     /** {@inheritDoc} */
@@ -157,7 +162,7 @@ public class TcpIgniteClient implements IgniteClient {
 
         return new IgniteClientFutureImpl<>(
                 ch.requestAsync(ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME, 
req -> writeString(name, req.out()))
-                        .thenApply(x -> new TcpClientCache<>(name, ch, marsh, 
transactions)));
+                        .thenApply(x -> new TcpClientCache<>(name, ch, marsh, 
transactions, lsnrsRegistry)));
     }
 
     /** {@inheritDoc} */
@@ -168,7 +173,7 @@ public class TcpIgniteClient implements IgniteClient {
         ch.request(ClientOperation.CACHE_GET_OR_CREATE_WITH_CONFIGURATION,
             req -> serDes.cacheConfiguration(cfg, req.out(), 
req.clientChannel().protocolCtx()));
 
-        return new TcpClientCache<>(cfg.getName(), ch, marsh, transactions);
+        return new TcpClientCache<>(cfg.getName(), ch, marsh, transactions, 
lsnrsRegistry);
     }
 
     /** {@inheritDoc} */
@@ -179,14 +184,14 @@ public class TcpIgniteClient implements IgniteClient {
         return new IgniteClientFutureImpl<>(
                 
ch.requestAsync(ClientOperation.CACHE_GET_OR_CREATE_WITH_CONFIGURATION,
                         req -> serDes.cacheConfiguration(cfg, req.out(), 
req.clientChannel().protocolCtx()))
-                        .thenApply(x -> new TcpClientCache<>(cfg.getName(), 
ch, marsh, transactions)));
+                        .thenApply(x -> new TcpClientCache<>(cfg.getName(), 
ch, marsh, transactions, lsnrsRegistry)));
     }
 
     /** {@inheritDoc} */
     @Override public <K, V> ClientCache<K, V> cache(String name) {
         ensureCacheName(name);
 
-        return new TcpClientCache<>(name, ch, marsh, transactions);
+        return new TcpClientCache<>(name, ch, marsh, transactions, 
lsnrsRegistry);
     }
 
     /** {@inheritDoc} */
@@ -221,7 +226,7 @@ public class TcpIgniteClient implements IgniteClient {
 
         ch.request(ClientOperation.CACHE_CREATE_WITH_NAME, req -> 
writeString(name, req.out()));
 
-        return new TcpClientCache<>(name, ch, marsh, transactions);
+        return new TcpClientCache<>(name, ch, marsh, transactions, 
lsnrsRegistry);
     }
 
     /** {@inheritDoc} */
@@ -230,7 +235,7 @@ public class TcpIgniteClient implements IgniteClient {
 
         return new IgniteClientFutureImpl<>(
                 ch.requestAsync(ClientOperation.CACHE_CREATE_WITH_NAME, req -> 
writeString(name, req.out()))
-                        .thenApply(x -> new TcpClientCache<>(name, ch, marsh, 
transactions)));
+                        .thenApply(x -> new TcpClientCache<>(name, ch, marsh, 
transactions, lsnrsRegistry)));
     }
 
     /** {@inheritDoc} */
@@ -240,7 +245,7 @@ public class TcpIgniteClient implements IgniteClient {
         ch.request(ClientOperation.CACHE_CREATE_WITH_CONFIGURATION,
             req -> serDes.cacheConfiguration(cfg, req.out(), 
req.clientChannel().protocolCtx()));
 
-        return new TcpClientCache<>(cfg.getName(), ch, marsh, transactions);
+        return new TcpClientCache<>(cfg.getName(), ch, marsh, transactions, 
lsnrsRegistry);
     }
 
     /** {@inheritDoc} */
@@ -251,7 +256,7 @@ public class TcpIgniteClient implements IgniteClient {
         return new IgniteClientFutureImpl<>(
                 
ch.requestAsync(ClientOperation.CACHE_CREATE_WITH_CONFIGURATION,
                         req -> serDes.cacheConfiguration(cfg, req.out(), 
req.clientChannel().protocolCtx()))
-                        .thenApply(x -> new TcpClientCache<>(cfg.getName(), 
ch, marsh, transactions)));
+                        .thenApply(x -> new TcpClientCache<>(cfg.getName(), 
ch, marsh, transactions, lsnrsRegistry)));
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java 
b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 0deb6c9..c34b38e 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -40,8 +40,6 @@ import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.failure.FailureHandler;
 import org.apache.ignite.internal.client.thin.AbstractThinClientTest;
 import org.apache.ignite.internal.client.thin.ClientServerError;
-import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
-import org.apache.ignite.mxbean.ClientProcessorMXBean;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
@@ -386,18 +384,6 @@ public class ReliabilityTest extends 
AbstractThinClientTest {
     }
 
     /**
-     * Drop all thin client connections on given Ignite instance.
-     *
-     * @param ignite Ignite.
-     */
-    private void dropAllThinClientConnections(Ignite ignite) {
-        ClientProcessorMXBean mxBean = getMxBean(ignite.name(), "Clients",
-            ClientListenerProcessor.class, ClientProcessorMXBean.class);
-
-        mxBean.dropAllConnections();
-    }
-
-    /**
      * Run the closure while Ignite nodes keep failing/recovering several 
times.
      */
     private void assertOnUnstableCluster(LocalIgniteCluster cluster, Runnable 
clo) throws Exception {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java
index 2e2f21d..486939d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java
@@ -25,6 +25,8 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.mxbean.ClientProcessorMXBean;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 /**
@@ -94,4 +96,24 @@ public abstract class AbstractThinClientTest extends 
GridCommonAbstractTest {
         return startClient(Arrays.stream(igniteIdxs).mapToObj(igniteIdx -> 
grid(igniteIdx).cluster().localNode())
             .toArray(ClusterNode[]::new));
     }
+
+    /**
+     * Drop all thin client connections on given Ignite instance.
+     *
+     * @param ignite Ignite.
+     */
+    protected void dropAllThinClientConnections(Ignite ignite) {
+        ClientProcessorMXBean mxBean = getMxBean(ignite.name(), "Clients",
+            ClientListenerProcessor.class, ClientProcessorMXBean.class);
+
+        mxBean.dropAllConnections();
+    }
+
+    /**
+     * Drop all thin client connections on all Ignite instances.
+     */
+    protected void dropAllThinClientConnections() {
+        for (Ignite ignite : G.allGrids())
+            dropAllThinClientConnections(ignite);
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheEntryListenersTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheEntryListenersTest.java
new file mode 100644
index 0000000..49496bca
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheEntryListenersTest.java
@@ -0,0 +1,774 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.cache.Cache;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryRemovedListener;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientDisconnectListener;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.Person;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.ThinClientConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Thin client cache entry listeners test.
+ */
+public class CacheEntryListenersTest extends AbstractThinClientTest {
+    /** Timeout. */
+    private static final long TIMEOUT = 1_000L;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(3);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        return 
super.getConfiguration(igniteInstanceName).setClientConnectorConfiguration(
+            new ClientConnectorConfiguration().setThinClientConfiguration(
+                new 
ThinClientConfiguration().setMaxActiveComputeTasksPerConnection(100)));
+    }
+
+    /** Test continuous queries. */
+    @Test
+    public void testContinuousQueries() throws Exception {
+        try (IgniteClient client = startClient(0, 1, 2)) {
+            ClientCache<Integer, Integer> cache = 
client.getOrCreateCache("testCQ");
+
+            ContinuousQueryListener<Integer, Integer> lsnr = new 
ContinuousQueryListener<>();
+
+            cache.query(new ContinuousQuery<Integer, 
Integer>().setLocalListener(lsnr));
+
+            for (int i = 0; i < 10; i++)
+                cache.put(i, i);
+
+            assertEquals(F.asMap(EventType.CREATED, IntStream.range(0, 
10).boxed()
+                .collect(Collectors.toMap(i -> i, i -> i))), 
aggregateListenerEvents(lsnr, 10));
+
+            for (int i = 0; i < 10; i++)
+                cache.put(i, -i);
+
+            assertEquals(F.asMap(EventType.UPDATED, IntStream.range(0, 
10).boxed()
+                .collect(Collectors.toMap(i -> i, i -> -i))), 
aggregateListenerEvents(lsnr, 10));
+
+            for (int i = 0; i < 10; i++)
+                cache.remove(i);
+
+            assertEquals(F.asMap(EventType.REMOVED, IntStream.range(0, 
10).boxed()
+                .collect(Collectors.toMap(i -> i, i -> -i))), 
aggregateListenerEvents(lsnr, 10));
+
+            assertTrue(lsnr.isQueueEmpty());
+        }
+    }
+
+    /** Test continuous queries with initial query. */
+    @Test
+    public void testContinuousQueriesWithInitialQuery() throws Exception {
+        try (IgniteClient client = startClient(0, 1, 2)) {
+            ClientCache<Integer, Integer> cache = 
client.getOrCreateCache("testCQWithInitQ");
+
+            for (int i = 0; i < 100; i++)
+                cache.put(i, i);
+
+            ContinuousQueryListener<Integer, Integer> lsnr = new 
ContinuousQueryListener<>();
+
+            QueryCursor<Cache.Entry<Integer, Integer>> cur = cache.query(new 
ContinuousQuery<Integer, Integer>()
+                .setInitialQuery(new ScanQuery<>()).setLocalListener(lsnr));
+
+            assertTrue(lsnr.isQueueEmpty());
+
+            assertEquals(100, cur.getAll().size());
+
+            cache.put(100, 100);
+
+            lsnr.assertNextCacheEvent(EventType.CREATED, 100, 100);
+
+            cache.put(100, 101);
+
+            lsnr.assertNextCacheEvent(EventType.UPDATED, 100, 101);
+
+            cache.remove(100);
+
+            lsnr.assertNextCacheEvent(EventType.REMOVED, 100);
+        }
+    }
+
+    /** Test continuous queries with include expired parameter. */
+    @Test
+    public void testContinuousQueriesWithIncludeExpired() throws Exception {
+        try (IgniteClient client = startClient(0, 1, 2)) {
+            ClientCache<Integer, Integer> cache = 
client.getOrCreateCache("testCQWithInclExp");
+
+            ContinuousQueryListener<Integer, Integer> lsnr1 = new 
ContinuousQueryListener<>();
+            ContinuousQueryListener<Integer, Integer> lsnr2 = new 
ContinuousQueryListener<>();
+
+            ContinuousQuery<Integer, Integer> qry1 = new 
ContinuousQuery<Integer, Integer>().setLocalListener(lsnr1);
+            ContinuousQuery<Integer, Integer> qry2 = new 
ContinuousQuery<Integer, Integer>().setLocalListener(lsnr2);
+
+            qry1.setIncludeExpired(false);
+            qry2.setIncludeExpired(true);
+
+            cache.query(qry1);
+            cache.query(qry2);
+
+            cache = cache.withExpirePolicy(new CreatedExpiryPolicy(new 
Duration(TimeUnit.MILLISECONDS, 1)));
+
+            for (int i = 0; i < 100; i++)
+                cache.put(i, i);
+
+            int cntCreated = 0;
+            int cntExpired = 0;
+
+            for (int i = 0; i < 100; i++)
+                assertEquals(EventType.CREATED, lsnr1.poll().getEventType());
+
+            for (int i = 0; i < 200; i++) { // There should be two events for 
each cache entry.
+                CacheEntryEvent<?, ?> evt = lsnr2.poll();
+
+                if (evt.getEventType() == EventType.CREATED)
+                    cntCreated++;
+                else if (evt.getEventType() == EventType.EXPIRED)
+                    cntExpired++;
+                else
+                    fail("Unexpected event type: " + evt.getEventType());
+            }
+
+            assertEquals(100, cntCreated);
+            assertEquals(100, cntExpired);
+
+            assertTrue(lsnr1.isQueueEmpty());
+            assertTrue(lsnr2.isQueueEmpty());
+        }
+    }
+
+    /** Test continuous queries with page size parameter. */
+    @Test
+    public void testContinuousQueriesWithPageSize() throws Exception {
+        try (IgniteClient client = startClient(1, 2)) {
+            ClientCache<Integer, Integer> cache = 
client.getOrCreateCache("testCQWithPageSize");
+            IgniteCache<Integer, Integer> nodeCache = 
grid(0).getOrCreateCache(cache.getName());
+
+            ContinuousQueryListener<Integer, Integer> lsnr = new 
ContinuousQueryListener<>();
+
+            ContinuousQuery<Integer, Integer> qry = new 
ContinuousQuery<Integer, Integer>().setLocalListener(lsnr)
+                .setPageSize(10);
+
+            cache.query(qry);
+
+            // Each node has its own buffer, put data to the exactly one 
remote node.
+            primaryKeys(nodeCache, 15).forEach(key -> cache.put(key, key));
+
+            // Check that only first page is received.
+            aggregateListenerEvents(lsnr, 10);
+
+            assertTrue(lsnr.isQueueEmpty());
+
+            primaryKeys(nodeCache, 6).forEach(key -> cache.put(key, key));
+
+            // Check that only second page is received.
+            aggregateListenerEvents(lsnr, 10);
+
+            assertTrue(lsnr.isQueueEmpty());
+        }
+    }
+
+    /** Test continuous queries with time interval parameter. */
+    @Test
+    public void testContinuousQueriesWithTimeInterval() throws Exception {
+        try (IgniteClient client = startClient(1, 2)) {
+            ClientCache<Integer, Integer> cache = 
client.getOrCreateCache("testCQWithTimeInterval");
+            IgniteCache<Integer, Integer> nodeCache = 
grid(0).getOrCreateCache(cache.getName());
+
+            ContinuousQueryListener<Integer, Integer> lsnr = new 
ContinuousQueryListener<>();
+
+            ContinuousQuery<Integer, Integer> qry = new 
ContinuousQuery<Integer, Integer>().setLocalListener(lsnr)
+                .setPageSize(10).setTimeInterval(TIMEOUT);
+
+            long ts1 = U.currentTimeMillis();
+
+            cache.query(qry);
+
+            // Put data to the remote node.
+            int key = primaryKey(nodeCache);
+            cache.put(key, key);
+
+            assertNotNull(lsnr.poll(TIMEOUT * 2));
+
+            assertTrue(lsnr.isQueueEmpty());
+
+            long ts2 = U.currentTimeMillis();
+
+            // Ensure that item was received after timeout.
+            assertTrue("ts2 - ts1 = " + (ts2 - ts1), ts2 - ts1 >= TIMEOUT);
+        }
+    }
+
+    /** Test JCache entry listeners. */
+    @Test
+    public void testJCacheListeners() throws Exception {
+        try (IgniteClient client = startClient(0, 1, 2)) {
+            ClientCache<Integer, Integer> cache = 
client.getOrCreateCache("testJCacheListeners");
+
+            JCacheEntryListener<Integer, Integer> lsnr = new 
JCacheEntryListener<>();
+
+            cache.registerCacheEntryListener(new 
MutableCacheEntryListenerConfiguration<>(
+                () -> lsnr, null, true, false));
+
+            for (int i = 0; i < 10; i++)
+                cache.put(i, i);
+
+            assertEquals(F.asMap(EventType.CREATED, IntStream.range(0, 
10).boxed()
+                .collect(Collectors.toMap(i -> i, i -> i))), 
aggregateListenerEvents(lsnr, 10));
+
+            for (int i = 0; i < 10; i++)
+                cache.put(i, -i);
+
+            assertEquals(F.asMap(EventType.UPDATED, IntStream.range(0, 
10).boxed()
+                .collect(Collectors.toMap(i -> i, i -> -i))), 
aggregateListenerEvents(lsnr, 10));
+
+            for (int i = 0; i < 10; i++)
+                cache.remove(i);
+
+            assertEquals(F.asMap(EventType.REMOVED, IntStream.range(0, 
10).boxed()
+                .collect(Collectors.toMap(i -> i, i -> -i))), 
aggregateListenerEvents(lsnr, 10));
+
+            assertTrue(lsnr.isQueueEmpty());
+        }
+    }
+
+    /** Test JCache entry listeners with expired entries. */
+    @Test
+    public void testJCacheListenersExpiredEntries() throws Exception {
+        try (IgniteClient client = startClient(0, 1, 2)) {
+            ClientCache<Integer, Integer> cache = 
client.getOrCreateCache("testJCacheListenersWithExp");
+
+            JCacheEntryListener<Integer, Integer> lsnr = new 
JCacheEntryListener<>();
+
+            cache.registerCacheEntryListener(new 
MutableCacheEntryListenerConfiguration<>(
+                () -> lsnr, null, true, false));
+
+            cache = cache.withExpirePolicy(new CreatedExpiryPolicy(new 
Duration(TimeUnit.MILLISECONDS, 1)));
+
+            for (int i = 0; i < 100; i++)
+                cache.put(i, i);
+
+            int cntCreated = 0;
+            int cntExpired = 0;
+
+            for (int i = 0; i < 200; i++) { // There should be two events for 
each cache entry.
+                CacheEntryEvent<?, ?> evt = lsnr.poll();
+
+                if (evt.getEventType() == EventType.CREATED)
+                    cntCreated++;
+                else if (evt.getEventType() == EventType.EXPIRED)
+                    cntExpired++;
+                else
+                    fail("Unexpected event type: " + evt.getEventType());
+            }
+
+            assertEquals(100, cntCreated);
+            assertEquals(100, cntExpired);
+
+            assertTrue(lsnr.isQueueEmpty());
+        }
+    }
+
+    /** Test continuous queries and JCache entry listeners with keep binary 
flag. */
+    @Test
+    public void testListenersWithKeepBinary() throws Exception {
+        try (IgniteClient client = startClient(0, 1, 2)) {
+            ClientCache<Object, Object> cache1 = 
client.getOrCreateCache("testListenersWithKB");
+            ClientCache<Object, Object> cache2 = cache1.withKeepBinary();
+
+            ContinuousQueryListener<Object, Object> lsnr1 = new 
ContinuousQueryListener<>();
+            ContinuousQueryListener<Object, Object> lsnr2 = new 
ContinuousQueryListener<>();
+
+            cache1.query(new ContinuousQuery<>().setLocalListener(lsnr1));
+            cache2.query(new ContinuousQuery<>().setLocalListener(lsnr2));
+
+            JCacheEntryListener<Object, Object> lsnr3 = new 
JCacheEntryListener<>();
+            JCacheEntryListener<Object, Object> lsnr4 = new 
JCacheEntryListener<>();
+
+            cache1.registerCacheEntryListener(new 
MutableCacheEntryListenerConfiguration<>(
+                () -> lsnr3, null, true, false));
+            cache2.registerCacheEntryListener(new 
MutableCacheEntryListenerConfiguration<>(
+                () -> lsnr4, null, true, false));
+
+            Person person1 = new Person(0, "name");
+            Person person2 = new Person(1, "another name");
+
+            cache1.put(0, person1);
+
+            lsnr1.assertNextCacheEvent(EventType.CREATED, 0, person1);
+            lsnr2.assertNextCacheEvent(EventType.CREATED, 0, 
client.binary().toBinary(person1));
+            lsnr3.assertNextCacheEvent(EventType.CREATED, 0, person1);
+            lsnr4.assertNextCacheEvent(EventType.CREATED, 0, 
client.binary().toBinary(person1));
+
+            cache1.put(0, person2);
+
+            lsnr1.assertNextCacheEvent(EventType.UPDATED, 0, person2);
+            lsnr2.assertNextCacheEvent(EventType.UPDATED, 0, 
client.binary().toBinary(person2));
+            lsnr3.assertNextCacheEvent(EventType.UPDATED, 0, person2);
+            lsnr4.assertNextCacheEvent(EventType.UPDATED, 0, 
client.binary().toBinary(person2));
+        }
+    }
+
+    /** Test continuous queries and JCache entry listeners with remote 
filters. */
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testListenersWithRemoteFilter() throws Exception {
+        try (IgniteClient client = startClient(0, 1, 2)) {
+            ClientCache<Integer, Integer> cache = 
client.getOrCreateCache("testListenersWithRmtFilter");
+
+            CacheEntryEventSerializableFilter<Integer, Integer> rmtFilter = 
evt -> (evt.getKey() & 1) == 0;
+
+            ContinuousQueryListener<Integer, Integer> lsnr1 = new 
ContinuousQueryListener<>();
+            ContinuousQueryListener<Integer, Integer> lsnr2 = new 
ContinuousQueryListener<>();
+
+            cache.query(new ContinuousQuery<Integer, 
Integer>().setLocalListener(lsnr1)
+                .setRemoteFilterFactory(() -> rmtFilter));
+
+            cache.query(new ContinuousQuery<Integer, 
Integer>().setLocalListener(lsnr2)
+                .setRemoteFilter(rmtFilter));
+
+            JCacheEntryListener<Integer, Integer> lsnr3 = new 
JCacheEntryListener<>();
+
+            cache.registerCacheEntryListener(new 
MutableCacheEntryListenerConfiguration<>(
+                () -> lsnr3, () -> rmtFilter, true, false));
+
+            for (int i = 0; i < 10; i++)
+                cache.put(i, i);
+
+            Map<EventType, Map<Integer, Integer>> expRes = 
F.asMap(EventType.CREATED,
+                IntStream.range(0, 5).boxed().collect(Collectors.toMap(i -> i 
* 2, i -> i * 2)));
+
+            assertEquals(expRes, aggregateListenerEvents(lsnr1, 5));
+            assertEquals(expRes, aggregateListenerEvents(lsnr2, 5));
+            assertEquals(expRes, aggregateListenerEvents(lsnr3, 5));
+
+            for (int i = 0; i < 10; i++)
+                cache.put(i, -i);
+
+            expRes = F.asMap(EventType.UPDATED,
+                IntStream.range(0, 5).boxed().collect(Collectors.toMap(i -> i 
* 2, i -> -i * 2)));
+
+            assertEquals(expRes, aggregateListenerEvents(lsnr1, 5));
+            assertEquals(expRes, aggregateListenerEvents(lsnr2, 5));
+            assertEquals(expRes, aggregateListenerEvents(lsnr3, 5));
+
+            for (int i = 0; i < 10; i++)
+                cache.remove(i);
+
+            expRes = F.asMap(EventType.REMOVED,
+                IntStream.range(0, 5).boxed().collect(Collectors.toMap(i -> i 
* 2, i -> -i * 2)));
+
+            assertEquals(expRes, aggregateListenerEvents(lsnr1, 5));
+            assertEquals(expRes, aggregateListenerEvents(lsnr2, 5));
+            assertEquals(expRes, aggregateListenerEvents(lsnr3, 5));
+
+            assertTrue(lsnr1.isQueueEmpty());
+            assertTrue(lsnr2.isQueueEmpty());
+            assertTrue(lsnr3.isQueueEmpty());
+        }
+    }
+
+    /** Test disconnect event for cache entry listeners. */
+    @Test
+    public void testDisconnectListeners() throws Exception {
+        try (IgniteClient client = startClient(0, 1, 2)) {
+            ClientCache<Object, Object> cache = 
client.getOrCreateCache("testDisconnect");
+
+            ContinuousQueryListener<Object, Object> lsnr1 = new 
ContinuousQueryListener<>();
+
+            cache.query(new ContinuousQuery<>().setLocalListener(lsnr1), 
lsnr1);
+
+            JCacheEntryListener<Object, Object> lsnr2 = new 
JCacheEntryListener<>();
+
+            CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new 
MutableCacheEntryListenerConfiguration<>(
+                () -> lsnr2, null, true, false);
+
+            cache.registerCacheEntryListener(lsnrCfg, lsnr2);
+
+            cache.put(0, 0);
+
+            lsnr1.assertNextCacheEvent(EventType.CREATED, 0, 0);
+            lsnr2.assertNextCacheEvent(EventType.CREATED, 0, 0);
+
+            dropAllThinClientConnections();
+
+            // Can't detect channel failure until we send something to server.
+            cache.put(1, 1);
+
+            assertTrue(lsnr1.isQueueEmpty());
+            assertTrue(lsnr2.isQueueEmpty());
+
+            assertTrue(waitForCondition(lsnr1::isDisconnected, TIMEOUT));
+            assertTrue(waitForCondition(lsnr2::isDisconnected, TIMEOUT));
+
+            // Should be able to register the same listener on the same cache 
again.
+            cache.registerCacheEntryListener(lsnrCfg);
+        }
+    }
+
+    /** */
+    @Test
+    @SuppressWarnings("ThrowableNotThrown")
+    public void testRegisterDeregisterListener() throws Exception {
+        try (IgniteClient client = startClient(0, 1, 2)) {
+            String cacheName = "registerListener";
+
+            ClientCache<Integer, Integer> cache0 = 
client.getOrCreateCache(cacheName);
+
+            CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new 
MutableCacheEntryListenerConfiguration<>(
+                JCacheEntryListener::new,
+                null,
+                true,
+                false
+            );
+
+            cache0.registerCacheEntryListener(lsnrCfg);
+
+            ClientCache<Integer, Integer> cache1 = 
client.getOrCreateCache(cacheName + '2');
+
+            // Can register the same listener on another cache.
+            cache1.registerCacheEntryListener(lsnrCfg);
+
+            ClientCache<Integer, Integer> cache2 = client.cache(cacheName);
+
+            // Can't register the same listener on the same cache.
+            assertThrowsWithCause(() -> 
cache2.registerCacheEntryListener(lsnrCfg), IllegalStateException.class);
+
+            ClientCache<Integer, Integer> cache3 = client.cache(cacheName);
+
+            cache3.deregisterCacheEntryListener(lsnrCfg);
+
+            // Can register the listener after deregisteration.
+            cache2.registerCacheEntryListener(lsnrCfg);
+        }
+    }
+
+    /** */
+    @Test
+    @SuppressWarnings({"ThrowableNotThrown", "deprecation"})
+    public void testListenersUnsupportedParameters() throws Exception {
+        try (IgniteClient client = startClient(0, 1, 2)) {
+            ClientCache<Integer, Integer> cache = 
client.getOrCreateCache("testUnsupportedParams");
+
+            // Check null listener factory.
+            CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg1 = new 
MutableCacheEntryListenerConfiguration<>(
+                null,
+                null,
+                true,
+                false
+            );
+
+            assertThrowsWithCause(() -> 
cache.registerCacheEntryListener(lsnrCfg1), NullPointerException.class);
+
+            // Check synchronous flag.
+            CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg2 = new 
MutableCacheEntryListenerConfiguration<>(
+                JCacheEntryListener::new,
+                null,
+                true,
+                true
+            );
+
+            assertThrowsWithCause(() -> 
cache.registerCacheEntryListener(lsnrCfg2), IllegalArgumentException.class);
+
+            // Check local flag.
+            ContinuousQueryListener<Integer, Integer> cqLsnr = new 
ContinuousQueryListener<>();
+
+            ContinuousQuery<Integer, Integer> qry1 = new 
ContinuousQuery<Integer, Integer>().setLocalListener(cqLsnr)
+                .setLocal(true);
+
+            assertThrowsWithCause(() -> cache.query(qry1), 
IllegalArgumentException.class);
+
+            // Check null listener.
+            ContinuousQuery<Integer, Integer> qry2 = new ContinuousQuery<>();
+
+            assertThrowsWithCause(() -> cache.query(qry2), 
NullPointerException.class);
+
+            // Check auto unsubscribe flag.
+            ContinuousQuery<Integer, Integer> qry3 = new 
ContinuousQuery<Integer, Integer>().setLocalListener(cqLsnr)
+                .setAutoUnsubscribe(false);
+
+            assertThrowsWithCause(() -> cache.query(qry3), 
IllegalArgumentException.class);
+
+            // Check continuous query as initial query.
+            ContinuousQuery<Integer, Integer> qry4 = new 
ContinuousQuery<Integer, Integer>().setLocalListener(cqLsnr)
+                .setInitialQuery(new ContinuousQuery<>());
+
+            assertThrowsWithCause(() -> cache.query(qry4), 
IllegalArgumentException.class);
+
+            // Check filter factory and filter defined at the same time.
+            CacheEntryEventSerializableFilter<Integer, Integer> rmtFilter = r 
-> true;
+
+            ContinuousQuery<Integer, Integer> qry5 = new 
ContinuousQuery<Integer, Integer>().setLocalListener(cqLsnr)
+                .setRemoteFilter(rmtFilter);
+
+            qry5.setRemoteFilterFactory(FactoryBuilder.factoryOf(rmtFilter));
+
+            assertThrowsWithCause(() -> cache.query(qry5), 
IllegalArgumentException.class);
+        }
+    }
+
+    /** */
+    @Test
+    public void testListenersClose() throws Exception {
+        try (IgniteClient client = startClient(0, 1, 2)) {
+            ClientCache<Integer, Integer> cache = 
client.getOrCreateCache("testListenersClose");
+
+            ContinuousQueryListener<Integer, Integer> lsnr1 = new 
ContinuousQueryListener<>();
+
+            QueryCursor<?> qry = cache.query(new ContinuousQuery<Integer, 
Integer>().setLocalListener(lsnr1));
+
+            JCacheEntryListener<Integer, Integer> lsnr2 = new 
JCacheEntryListener<>();
+
+            CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new 
MutableCacheEntryListenerConfiguration<>(
+                () -> lsnr2, null, true, false);
+
+            cache.registerCacheEntryListener(lsnrCfg);
+
+            cache.put(0, 0);
+
+            lsnr1.assertNextCacheEvent(EventType.CREATED, 0, 0);
+            lsnr2.assertNextCacheEvent(EventType.CREATED, 0, 0);
+
+            qry.close();
+            cache.deregisterCacheEntryListener(lsnrCfg);
+
+            for (int i = 0; i < 100; i++)
+                cache.put(i, i);
+
+            assertTrue(lsnr1.isQueueEmpty());
+            assertTrue(lsnr2.isQueueEmpty());
+        }
+    }
+
+    /** */
+    @Test
+    public void testContinuousQueriesWithConcurrentCompute() throws Exception {
+        try (IgniteClient client = startClient(0, 1, 2)) {
+            int threadsCnt = 20;
+            int iterations = 50;
+
+            Set<UUID> allNodesIds = new 
HashSet<>(F.nodeIds(grid(0).cluster().nodes()));
+
+            AtomicInteger threadIdxs = new AtomicInteger();
+
+            GridTestUtils.runMultiThreaded(
+                () -> {
+                    int threadIdx = threadIdxs.incrementAndGet();
+
+                    ClientCache<Integer, Integer> cache = 
client.getOrCreateCache("testCQwithCompute" + threadIdx);
+
+                    try {
+                        for (int i = 0; i < iterations; i++) {
+                            ContinuousQueryListener<Integer, Integer> lsnr = 
new ContinuousQueryListener<>();
+
+                            QueryCursor<?> cur = cache.query(new 
ContinuousQuery<Integer, Integer>()
+                                .setLocalListener(lsnr));
+
+                            cache.put(i, i);
+
+                            Future<T2<UUID, Set<UUID>>> fut = 
client.compute().executeAsync2(TestTask.class.getName(),
+                                null);
+
+                            assertEquals(allNodesIds, fut.get().get2());
+
+                            lsnr.assertNextCacheEvent(EventType.CREATED, i, i);
+
+                            assertTrue(lsnr.isQueueEmpty());
+
+                            cur.close();
+                        }
+                    }
+                    catch (Exception e) {
+                        log.error("Failure: ", e);
+
+                        fail();
+                    }
+                }, threadsCnt, "run-task-async"
+            );
+        }
+    }
+
+    /** */
+    private static <K, V> Map<EventType, Map<K, V>> 
aggregateListenerEvents(ContinuousQueryListener<K, V> lsnr,
+        int evtsCnt) throws Exception {
+        Map<EventType, Map<K, V>> res = new EnumMap<>(EventType.class);
+
+        for (int i = 0; i < evtsCnt; i++) {
+            CacheEntryEvent<? extends K, ? extends V> evt = lsnr.poll();
+
+            Map<K, V> locMap = res.computeIfAbsent(evt.getEventType(), k -> 
new HashMap<>());
+
+            locMap.put(evt.getKey(), evt.getValue());
+        }
+
+        return res;
+    }
+
+    /** */
+    private static class ContinuousQueryListener<K, V> implements 
CacheEntryUpdatedListener<K, V>,
+        ClientDisconnectListener {
+        /** Local entries map. */
+        private final BlockingQueue<CacheEntryEvent<? extends K, ? extends V>> 
evtsQ =
+            new LinkedBlockingQueue<>();
+
+        /** Disconnected flag. */
+        private volatile boolean disconnected;
+
+        /** Failure. */
+        private volatile Exception failure;
+
+        /**
+         * @param expectedEvtType Expected event type ({@code null} for any 
event type).
+         * @param evt Event.
+         */
+        protected void addEvent(EventType expectedEvtType, CacheEntryEvent<? 
extends K, ? extends V> evt) {
+            if (expectedEvtType != null && evt.getEventType() != 
expectedEvtType)
+                failure = new Exception("Unexpected event type [expEvtType=" + 
expectedEvtType + ", evt=" + evt + ']');
+            else
+                evtsQ.add(evt);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, 
? extends V>> events) {
+            events.forEach(evt -> addEvent(null, evt));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onDisconnected(Exception reason) {
+            disconnected = true;
+        }
+
+        /**
+         * Poll next cache event.
+         */
+        public CacheEntryEvent<? extends K, ? extends V> poll(long timeout) 
throws Exception {
+            if (failure != null)
+                throw failure;
+
+            CacheEntryEvent<? extends K, ? extends V> evt = 
evtsQ.poll(timeout, TimeUnit.MILLISECONDS);
+
+            assertNotNull(evt);
+
+            return evt;
+        }
+
+        /**
+         * Poll next cache event.
+         */
+        public CacheEntryEvent<? extends K, ? extends V> poll() throws 
Exception {
+            return poll(TIMEOUT);
+        }
+
+        /**
+         * Assert parameters of the next cache event.
+         */
+        public void assertNextCacheEvent(EventType expType, K expKey) throws 
Exception {
+            CacheEntryEvent<? extends K, ? extends V> evt = poll();
+            assertEquals(expType, evt.getEventType());
+            assertEquals(expKey, evt.getKey());
+        }
+
+        /**
+         * Assert parameters of the next cache event.
+         */
+        public void assertNextCacheEvent(EventType expType, K expKey, V 
expVal) throws Exception {
+            CacheEntryEvent<? extends K, ? extends V> evt = poll();
+            assertEquals(expType, evt.getEventType());
+            assertEquals(expKey, evt.getKey());
+            assertEquals(expVal, evt.getValue());
+        }
+
+        /** */
+        public boolean isDisconnected() {
+            return disconnected;
+        }
+
+        /** */
+        public boolean isQueueEmpty() {
+            return evtsQ.isEmpty();
+        }
+    }
+
+    /** */
+    private static class JCacheEntryListener<K, V> extends 
ContinuousQueryListener<K, V> implements
+        CacheEntryCreatedListener<K, V>, CacheEntryRemovedListener<K, V>, 
CacheEntryExpiredListener<K, V> {
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends K, 
? extends V>> events) {
+            events.forEach(evt -> addEvent(EventType.CREATED, evt));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, 
? extends V>> events) {
+            events.forEach(evt -> addEvent(EventType.UPDATED, evt));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends K, 
? extends V>> events) {
+            events.forEach(evt -> addEvent(EventType.REMOVED, evt));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onExpired(Iterable<CacheEntryEvent<? extends K, 
? extends V>> events) {
+            events.forEach(evt -> addEvent(EventType.EXPIRED, evt));
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
index 69d6fda..44b6425 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
@@ -35,8 +35,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.client.ClientCache;
 import org.apache.ignite.client.ClientClusterGroup;
@@ -51,11 +49,8 @@ import org.apache.ignite.compute.ComputeTaskName;
 import org.apache.ignite.configuration.ClientConnectorConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.ThinClientConfiguration;
-import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.mxbean.ClientProcessorMXBean;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
@@ -647,18 +642,6 @@ public class ComputeTaskTest extends 
AbstractThinClientTest {
     }
 
     /**
-     *
-     */
-    private void dropAllThinClientConnections() {
-        for (Ignite ignite : G.allGrids()) {
-            ClientProcessorMXBean mxBean = getMxBean(ignite.name(), "Clients",
-                ClientListenerProcessor.class, ClientProcessorMXBean.class);
-
-            mxBean.dropAllConnections();
-        }
-    }
-
-    /**
      * Compute task with latch on routing node.
      */
     @ComputeTaskName("TestLatchTask")
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
index f61083b..88ee8eb 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
@@ -319,7 +319,7 @@ public class ReliableChannelTest {
         ClientBinaryMarshaller marsh = mock(ClientBinaryMarshaller.class);
         TcpClientTransactions transactions = mock(TcpClientTransactions.class);
 
-        TcpClientCache cache = new TcpClientCache("", rc, marsh, transactions, 
false, null);
+        TcpClientCache cache = new TcpClientCache("", rc, marsh, transactions, 
null, false, null);
 
         GridTestUtils.assertThrowsWithCause(() -> op.accept(cache), 
TestChannelException.class);
     }
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java 
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 48d346f..85255ce 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.client;
 
 import org.apache.ignite.internal.client.thin.CacheAsyncTest;
+import org.apache.ignite.internal.client.thin.CacheEntryListenersTest;
 import org.apache.ignite.internal.client.thin.ClusterApiTest;
 import org.apache.ignite.internal.client.thin.ClusterGroupTest;
 import org.apache.ignite.internal.client.thin.ComputeTaskTest;
@@ -56,6 +57,7 @@ import org.junit.runners.Suite;
     ClusterApiTest.class,
     ClusterGroupTest.class,
     ServicesTest.class,
+    CacheEntryListenersTest.class,
     ThinClientPartitionAwarenessStableTopologyTest.class,
     ThinClientPartitionAwarenessUnstableTopologyTest.class,
     ThinClientPartitionAwarenessResourceReleaseTest.class,

Reply via email to