This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b65411e IGNITE-14402 Java thin client: Continuous queries support -
Fixes #8960.
b65411e is described below
commit b65411e68105c36dc1220d850d3f595df2f8af4b
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Apr 8 13:31:08 2021 +0300
IGNITE-14402 Java thin client: Continuous queries support - Fixes #8960.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../clients/AbstractClientCompatibilityTest.java | 8 +
.../clients/JavaThinCompatibilityTest.java | 26 +
.../java/org/apache/ignite/client/ClientCache.java | 62 +-
.../ClientDisconnectListener.java} | 19 +-
.../thin/ClientCacheEntryListenerHandler.java | 265 +++++++
.../thin/ClientCacheEntryListenersRegistry.java | 61 ++
.../client/thin/ClientContinuousQueryCursor.java | 61 ++
.../internal/client/thin/ClientJCacheAdapter.java | 194 ++++++
.../thin/ClientJCacheEntryListenerAdapter.java | 70 ++
.../client/thin/ClientNotificationType.java | 21 +-
.../internal/client/thin/ClientOperation.java | 2 +
.../internal/client/thin/TcpClientCache.java | 128 +++-
.../internal/client/thin/TcpClientChannel.java | 16 +-
.../internal/client/thin/TcpIgniteClient.java | 23 +-
.../org/apache/ignite/client/ReliabilityTest.java | 14 -
.../client/thin/AbstractThinClientTest.java | 22 +
.../client/thin/CacheEntryListenersTest.java | 774 +++++++++++++++++++++
.../internal/client/thin/ComputeTaskTest.java | 17 -
.../internal/client/thin/ReliableChannelTest.java | 2 +-
.../org/apache/ignite/client/ClientTestSuite.java | 2 +
20 files changed, 1725 insertions(+), 62 deletions(-)
diff --git
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
index 07ba238..c12cf1c 100644
---
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
+++
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
@@ -57,6 +57,12 @@ public abstract class AbstractClientCompatibilityTest
extends IgniteCompatibilit
/** Version 2.9.0. */
protected static final IgniteProductVersion VER_2_9_0 =
IgniteProductVersion.fromString("2.9.0");
+ /** Version 2.10.0. */
+ protected static final IgniteProductVersion VER_2_10_0 =
IgniteProductVersion.fromString("2.10.0");
+
+ /** Version 2.11.0. */
+ protected static final IgniteProductVersion VER_2_11_0 =
IgniteProductVersion.fromString("2.11.0");
+
/** Ignite versions to test. Note: Only released versions or current
version should be included to this list. */
protected static final String[] TESTED_IGNITE_VERSIONS = new String[] {
"2.4.0",
@@ -68,6 +74,8 @@ public abstract class AbstractClientCompatibilityTest extends
IgniteCompatibilit
"2.8.0",
"2.8.1",
"2.9.0",
+ "2.9.1",
+ "2.10.0",
IgniteVersionUtils.VER_STR
};
diff --git
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
index 0b870bf..3aa6544 100644
---
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
+++
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
@@ -17,11 +17,13 @@
package org.apache.ignite.compatibility.clients;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import org.apache.ignite.Ignite;
@@ -32,6 +34,7 @@ import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientCacheConfiguration;
@@ -52,6 +55,7 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Assume;
@@ -313,6 +317,25 @@ public class JavaThinCompatibilityTest extends
AbstractClientCompatibilityTest {
}
}
+ /** */
+ private void testContinuousQueries() throws Exception {
+ X.println(">>>> Testing continuous queries");
+
+ try (IgniteClient client = Ignition.startClient(new
ClientConfiguration().setAddresses(ADDR))) {
+ ClientCache<Object, Object> cache =
client.getOrCreateCache("testContinuousQueries");
+
+ List<CacheEntryEvent<?, ?>> allEvts = new ArrayList<>();
+
+ cache.query(new ContinuousQuery<>().setLocalListener(evts ->
evts.forEach(allEvts::add)));
+
+ cache.put(0, 0);
+ cache.put(0, 1);
+ cache.remove(0);
+
+ assertTrue(GridTestUtils.waitForCondition(() -> allEvts.size() ==
3, 1_000L));
+ }
+ }
+
/** {@inheritDoc} */
@Override protected void testClient(IgniteProductVersion clientVer,
IgniteProductVersion serverVer) throws Exception {
IgniteProductVersion minVer = clientVer.compareTo(serverVer) < 0 ?
clientVer : serverVer;
@@ -345,6 +368,9 @@ public class JavaThinCompatibilityTest extends
AbstractClientCompatibilityTest {
testCompute();
testServices();
}
+
+ if (clientVer.compareTo(VER_2_11_0) >= 0 &&
serverVer.compareTo(VER_2_10_0) >= 0)
+ testContinuousQueries();
}
/** */
diff --git
a/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
b/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
index f1450c7..60bc28d 100644
--- a/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
@@ -22,9 +22,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryListener;
import javax.cache.expiry.ExpiryPolicy;
-
import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
@@ -703,19 +705,73 @@ public interface ClientCache<K, V> {
public <K1, V1> ClientCache<K1, V1> withExpirePolicy(ExpiryPolicy
expirePlc);
/**
- * Queries cache. Supports {@link ScanQuery} and {@link SqlFieldsQuery}.
+ * Queries cache. Supports {@link ScanQuery}, {@link SqlFieldsQuery} and
{@link ContinuousQuery}.
+ * <p>
+ * NOTE: For continuous query listeners there is no failover in case of
client channel failure, this event should
+ * be handled on the user's side. Use {@link #query(ContinuousQuery,
ClientDisconnectListener)} method to get
+ * notified about client disconnected event via {@link
ClientDisconnectListener} interface if you need it.
*
* @param qry Query.
* @return Cursor.
- *
*/
public <R> QueryCursor<R> query(Query<R> qry);
/**
+ * Start {@link ContinuousQuery} on the cache.
+ * <p>
+ * NOTE: There is no failover in case of client channel failure, this
event should be handled on the user's side.
+ * Use {@code disconnectListener} to handle this.
+ *
+ * @param qry Query.
+ * @param disconnectListener Listener of client disconnected event.
+ * @return Cursor.
+ */
+ public <R> QueryCursor<R> query(ContinuousQuery<K, V> qry,
ClientDisconnectListener disconnectListener);
+
+ /**
* Convenience method to execute {@link SqlFieldsQuery}.
*
* @param qry Query.
* @return Cursor.
*/
public FieldsQueryCursor<List<?>> query(SqlFieldsQuery qry);
+
+ /**
+ * Registers a {@link CacheEntryListener}. The supplied {@link
CacheEntryListenerConfiguration} is used to
+ * instantiate a listener and apply it to those events specified in the
configuration.
+ * <p>
+ * NOTE: There is no failover in case of client channel failure, this
event should be handled on the user's side.
+ * Use {@link #registerCacheEntryListener(CacheEntryListenerConfiguration,
ClientDisconnectListener)} method to get
+ * notified about client disconnected event via {@link
ClientDisconnectListener} interface if you need it.
+ *
+ * @param cacheEntryListenerConfiguration a factory and related
configuration for creating the listener.
+ * @throws IllegalArgumentException is the same
CacheEntryListenerConfiguration is used more than once or
+ * if some unsupported by thin client properties are set.
+ * @see CacheEntryListener
+ */
+ public void registerCacheEntryListener(CacheEntryListenerConfiguration<K,
V> cacheEntryListenerConfiguration);
+
+ /**
+ * Registers a {@link CacheEntryListener}. The supplied {@link
CacheEntryListenerConfiguration} is used to
+ * instantiate a listener and apply it to those events specified in the
configuration.
+ * <p>
+ * NOTE: There is no failover in case of client channel failure, this
event should be handled on the user's side.
+ * Use {@code disconnectListener} to handle this.
+ *
+ * @param cacheEntryListenerConfiguration a factory and related
configuration for creating the listener.
+ * @param disconnectListener Listener of client disconnected event.
+ * @throws IllegalArgumentException is the same
CacheEntryListenerConfiguration is used more than once or
+ * if some unsupported by thin client properties are set.
+ * @see CacheEntryListener
+ */
+ public void registerCacheEntryListener(CacheEntryListenerConfiguration<K,
V> cacheEntryListenerConfiguration,
+ ClientDisconnectListener disconnectListener);
+
+ /**
+ * Deregisters a listener, using the {@link
CacheEntryListenerConfiguration} that was used to register it.
+ *
+ * @param cacheEntryListenerConfiguration the factory and related
configuration that was used to create the
+ * listener.
+ */
+ public void
deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V>
cacheEntryListenerConfiguration);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
b/modules/core/src/main/java/org/apache/ignite/client/ClientDisconnectListener.java
similarity index 54%
copy from
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
copy to
modules/core/src/main/java/org/apache/ignite/client/ClientDisconnectListener.java
index f792908..e2c76b8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
+++
b/modules/core/src/main/java/org/apache/ignite/client/ClientDisconnectListener.java
@@ -15,12 +15,21 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client.thin;
+package org.apache.ignite.client;
+
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import org.apache.ignite.cache.query.ContinuousQuery;
/**
- * Notification types.
+ * Client disconnected event listener. Such listeners can be used in {@link
ClientCache#query(ContinuousQuery,
+ * ClientDisconnectListener)} or {@link
ClientCache#registerCacheEntryListener(CacheEntryListenerConfiguration,
+ * ClientDisconnectListener)} methods to handle client channel failure.
*/
-enum ClientNotificationType {
- /** Compute task finished. */
- COMPUTE_TASK_FINISHED;
+public interface ClientDisconnectListener {
+ /**
+ * Client disconnected callback.
+ *
+ * @param reason Exception that caused the disconnect, can be {@code null}.
+ */
+ public void onDisconnected(Exception reason);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
new file mode 100644
index 0000000..3f27881
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.client.ClientDisconnectListener;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static
org.apache.ignite.internal.client.thin.ClientNotificationType.CONTINUOUS_QUERY_EVENT;
+import static
org.apache.ignite.internal.client.thin.TcpClientCache.JAVA_PLATFORM;
+
+/**
+ * Handler for {@link ContinuousQuery} listeners and JCache cache entry
listeners.
+ */
+public class ClientCacheEntryListenerHandler<K, V> implements
NotificationListener, AutoCloseable {
+ /** "Keep binary" flag mask. */
+ private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
+
+ /** */
+ private final Cache<K, V> jCacheAdapter;
+
+ /** */
+ private final ReliableChannel ch;
+
+ /** */
+ private final boolean keepBinary;
+
+ /** */
+ private final ClientUtils utils;
+
+ /** */
+ private volatile CacheEntryUpdatedListener<K, V> locLsnr;
+
+ /** */
+ private volatile ClientDisconnectListener disconnectLsnr;
+
+ /** */
+ private volatile ClientChannel clientCh;
+
+ /** */
+ private volatile Long rsrcId;
+
+ /** */
+ ClientCacheEntryListenerHandler(
+ Cache<K, V> jCacheAdapter,
+ ReliableChannel ch,
+ ClientBinaryMarshaller marsh,
+ boolean keepBinary
+ ) {
+ this.jCacheAdapter = jCacheAdapter;
+ this.ch = ch;
+ this.keepBinary = keepBinary;
+ utils = new ClientUtils(marsh);
+ }
+
+ /**
+ * Send request to the server and start
+ */
+ public synchronized void startListen(
+ CacheEntryUpdatedListener<K, V> locLsnr,
+ ClientDisconnectListener disconnectLsnr,
+ Factory<? extends CacheEntryEventFilter<? super K, ? super V>>
rmtFilterFactory,
+ int pageSize,
+ long timeInterval,
+ boolean includeExpired
+ ) {
+ assert locLsnr != null;
+
+ if (clientCh != null)
+ throw new IllegalStateException("Listener was already started");
+
+ this.locLsnr = locLsnr;
+ this.disconnectLsnr = disconnectLsnr;
+
+ Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
+ BinaryOutputStream out = payloadCh.out();
+
+ out.writeInt(ClientUtils.cacheId(jCacheAdapter.getName()));
+ out.writeByte(keepBinary ? KEEP_BINARY_FLAG_MASK : 0);
+ out.writeInt(pageSize);
+ out.writeLong(timeInterval);
+ out.writeBoolean(includeExpired);
+
+ if (rmtFilterFactory == null)
+ out.writeByte(GridBinaryMarshaller.NULL);
+ else {
+ utils.writeObject(out, rmtFilterFactory);
+ out.writeByte(JAVA_PLATFORM);
+ }
+ };
+
+ Function<PayloadInputChannel, T2<ClientChannel, Long>> qryReader =
payloadCh -> {
+ ClientChannel ch = payloadCh.clientChannel();
+ Long rsrcId = payloadCh.in().readLong();
+
+ ch.addNotificationListener(CONTINUOUS_QUERY_EVENT, rsrcId, this);
+
+ return new T2<>(ch, rsrcId);
+ };
+
+ try {
+ T2<ClientChannel, Long> params =
ch.service(ClientOperation.QUERY_CONTINUOUS, qryWriter, qryReader);
+
+ clientCh = params.get1();
+ rsrcId = params.get2();
+ }
+ catch (ClientError e) {
+ throw new ClientException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void acceptNotification(ByteBuffer payload, Exception
err) {
+ if (err == null && payload != null) {
+ BinaryInputStream in = BinaryByteBufferInputStream.create(payload);
+
+ int cnt = in.readInt();
+
+ List<CacheEntryEvent<? extends K, ? extends V>> evts = new
ArrayList<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ K key = utils.readObject(in, keepBinary);
+ V oldVal = utils.readObject(in, keepBinary);
+ V val = utils.readObject(in, keepBinary);
+ byte evtTypeByte = in.readByte();
+
+ EventType evtType = eventType(evtTypeByte);
+
+ if (evtType == null)
+ onChannelClosed(new ClientException("Unknown event type: "
+ evtTypeByte));
+
+ evts.add(new CacheEntryEventImpl<>(jCacheAdapter, evtType,
key, oldVal, val));
+ }
+
+ locLsnr.onUpdated(evts);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onChannelClosed(Exception reason) {
+ ClientDisconnectListener lsnr = disconnectLsnr;
+
+ if (lsnr != null)
+ lsnr.onDisconnected(reason);
+
+ U.closeQuiet(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void close() {
+ ClientChannel clientCh = this.clientCh;
+
+ if (clientCh != null && !clientCh.closed()) {
+ clientCh.removeNotificationListener(CONTINUOUS_QUERY_EVENT,
rsrcId);
+
+ clientCh.service(ClientOperation.RESOURCE_CLOSE, ch ->
ch.out().writeLong(rsrcId), null);
+ }
+ }
+
+ /**
+ * Client channel.
+ */
+ public ClientChannel clientChannel() {
+ return clientCh;
+ }
+
+ /** */
+ private EventType eventType(byte evtTypeByte) {
+ switch (evtTypeByte) {
+ case 0: return EventType.CREATED;
+ case 1: return EventType.UPDATED;
+ case 2: return EventType.REMOVED;
+ case 3: return EventType.EXPIRED;
+ default: return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CacheEntryEventImpl<K, V> extends CacheEntryEvent<K,
V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Key. */
+ private final K key;
+
+ /** Old value. */
+ private final V oldVal;
+
+ /** Value. */
+ private final V val;
+
+ /**
+ *
+ */
+ private CacheEntryEventImpl(Cache<K, V> src, EventType evtType, K key,
V oldVal, V val) {
+ super(src, evtType);
+
+ this.key = key;
+ this.oldVal = oldVal;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public K getKey() {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V getValue() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V getOldValue() {
+ return oldVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isOldValueAvailable() {
+ return oldVal != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T unwrap(Class<T> clazz) {
+ if (clazz.isAssignableFrom(getClass()))
+ return clazz.cast(this);
+
+ throw new IllegalArgumentException("Unwrapping to class is not
supported: " + clazz);
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
new file mode 100644
index 0000000..07bb799
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenersRegistry.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+
+/**
+ * Per-cache cache entry listeners registry. Listeners can't be stored inside
ClientCache instance, since there can be
+ * several such instances per one cache.
+ */
+public class ClientCacheEntryListenersRegistry {
+ /** */
+ private final Map<String, Map<CacheEntryListenerConfiguration<?, ?>,
+ ClientCacheEntryListenerHandler<?, ?>>> lsnrs = new
ConcurrentHashMap<>();
+
+ /**
+ * Register listener handler.
+ *
+ * @return {@code True} if listener was succesfuly registered,
+ * {@code false} if listener was already registered before.
+ */
+ public boolean registerCacheEntryListener(String cacheName,
CacheEntryListenerConfiguration<?, ?> cfg,
+ ClientCacheEntryListenerHandler<?, ?> hnd) {
+ Map<CacheEntryListenerConfiguration<?, ?>,
ClientCacheEntryListenerHandler<?, ?>> cacheLsnrs =
+ lsnrs.computeIfAbsent(cacheName, k -> new ConcurrentHashMap<>());
+
+ ClientCacheEntryListenerHandler<?, ?> old =
cacheLsnrs.putIfAbsent(cfg, hnd);
+
+ return old == null;
+ }
+
+ /**
+ * Deregister listener handler.
+ *
+ * @return Listener handler.
+ */
+ public ClientCacheEntryListenerHandler<?, ?>
deregisterCacheEntryListener(String cacheName,
+ CacheEntryListenerConfiguration<?, ?> cfg) {
+ Map<CacheEntryListenerConfiguration<?, ?>,
ClientCacheEntryListenerHandler<?, ?>> cacheLsnrs =
+ lsnrs.get(cacheName);
+
+ return cacheLsnrs != null ? cacheLsnrs.remove(cfg) : null;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientContinuousQueryCursor.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientContinuousQueryCursor.java
new file mode 100644
index 0000000..0ff1f41
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientContinuousQueryCursor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Thin client continuous query cursor.
+ */
+class ClientContinuousQueryCursor<T> implements QueryCursor<T> {
+ /** Initial query cursor. */
+ private final QueryCursor<T> initQryCursor;
+
+ /** Cache entry listener handler. */
+ private final ClientCacheEntryListenerHandler<?, ?> lsnrHnd;
+
+ /**
+ * @param initQryCursor Initial query cursor.
+ * @param lsnrHnd Cache entry listener handler.
+ */
+ ClientContinuousQueryCursor(QueryCursor<T> initQryCursor,
ClientCacheEntryListenerHandler<?, ?> lsnrHnd) {
+ this.initQryCursor = initQryCursor;
+ this.lsnrHnd = lsnrHnd;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<T> getAll() {
+ return initQryCursor == null ? Collections.emptyList() :
initQryCursor.getAll();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ U.closeQuiet(initQryCursor);
+ U.closeQuiet(lsnrHnd);
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public Iterator<T> iterator() {
+ return initQryCursor == null ? Collections.emptyIterator() :
initQryCursor.iterator();
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientJCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientJCacheAdapter.java
new file mode 100644
index 0000000..45d3cf6
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientJCacheAdapter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import javax.cache.Cache;
+import javax.cache.CacheManager;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Configuration;
+import javax.cache.integration.CompletionListener;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import org.apache.ignite.client.ClientCache;
+
+/**
+ * Thin client cache to JCache compatible cache adapter.
+ */
+class ClientJCacheAdapter<K, V> implements Cache<K, V> {
+ /** Delegate. */
+ private final ClientCache<K, V> delegate;
+
+ /**
+ * @param delegate Delegate.
+ */
+ ClientJCacheAdapter(ClientCache<K, V> delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V get(K key) {
+ return delegate.get(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K, V> getAll(Set<? extends K> keys) {
+ return delegate.getAll(keys);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean containsKey(K key) {
+ return delegate.containsKey(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void loadAll(Set<? extends K> keys, boolean
replaceExistingValues,
+ CompletionListener completionListener) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void put(K key, V val) {
+ delegate.put(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public V getAndPut(K key, V val) {
+ return delegate.getAndPut(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void putAll(Map<? extends K, ? extends V> map) {
+ delegate.putAll(map);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean putIfAbsent(K key, V val) {
+ return delegate.putIfAbsent(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean remove(K key) {
+ return delegate.remove(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean remove(K key, V oldVal) {
+ return delegate.remove(key, oldVal);
+ }
+
+ /** {@inheritDoc} */
+ @Override public V getAndRemove(K key) {
+ return delegate.getAndRemove(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean replace(K key, V oldVal, V newVal) {
+ return delegate.replace(key, oldVal, newVal);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean replace(K key, V val) {
+ return delegate.replace(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public V getAndReplace(K key, V val) {
+ return delegate.getAndReplace(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeAll(Set<? extends K> keys) {
+ delegate.removeAll(keys);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeAll() {
+ delegate.removeAll();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clear() {
+ delegate.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <C extends Configuration<K, V>> C
getConfiguration(Class<C> clazz) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProc,
+ Object... arguments) throws EntryProcessorException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<?
extends K> keys,
+ EntryProcessor<K, V, T> entryProc, Object... arguments) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getName() {
+ return delegate.getName();
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheManager getCacheManager() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isClosed() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T unwrap(Class<T> clazz) {
+ if (clazz.isAssignableFrom(delegate.getClass()))
+ return (T)delegate;
+
+ throw new IllegalArgumentException("Unwrapping to class is not
supported: " + clazz);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void registerCacheEntryListener(
+ CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration)
{
+ delegate.registerCacheEntryListener(cacheEntryListenerConfiguration);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deregisterCacheEntryListener(
+ CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration)
{
+ delegate.deregisterCacheEntryListener(cacheEntryListenerConfiguration);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<Entry<K, V>> iterator() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientJCacheEntryListenerAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientJCacheEntryListenerAdapter.java
new file mode 100644
index 0000000..742af38
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientJCacheEntryListenerAdapter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collections;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryListener;
+import javax.cache.event.CacheEntryRemovedListener;
+import javax.cache.event.CacheEntryUpdatedListener;
+
+/**
+ * Adapter to convert CQ listener calls to JCache listener calls.
+ */
+class ClientJCacheEntryListenerAdapter<K, V> implements
CacheEntryUpdatedListener<K, V> {
+ /** Created listener. */
+ private final CacheEntryCreatedListener<K, V> crtLsnr;
+
+ /** Updated listener. */
+ private final CacheEntryUpdatedListener<K, V> updLsnr;
+
+ /** Removed listener. */
+ private final CacheEntryRemovedListener<K, V> rmvLsnr;
+
+ /** Expired listener. */
+ private final CacheEntryExpiredListener<K, V> expLsnr;
+
+ /** */
+ ClientJCacheEntryListenerAdapter(CacheEntryListener<? super K, ? super V>
impl) {
+ crtLsnr = impl instanceof CacheEntryCreatedListener ?
(CacheEntryCreatedListener<K, V>)impl : evts -> {};
+ updLsnr = impl instanceof CacheEntryUpdatedListener ?
(CacheEntryUpdatedListener<K, V>)impl : evts -> {};
+ rmvLsnr = impl instanceof CacheEntryRemovedListener ?
(CacheEntryRemovedListener<K, V>)impl : evts -> {};
+ expLsnr = impl instanceof CacheEntryExpiredListener ?
(CacheEntryExpiredListener<K, V>)impl : evts -> {};
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, ?
extends V>> evts) {
+ for (CacheEntryEvent<? extends K, ? extends V> evt : evts) {
+ try {
+ Iterable<CacheEntryEvent<? extends K, ? extends V>> evtColl =
Collections.singleton(evt);
+
+ switch (evt.getEventType()) {
+ case CREATED: crtLsnr.onCreated(evtColl); break;
+ case UPDATED: updLsnr.onUpdated(evtColl); break;
+ case REMOVED: rmvLsnr.onRemoved(evtColl); break;
+ case EXPIRED: expLsnr.onExpired(evtColl); break;
+ }
+ }
+ catch (Exception ignored) {
+ // Ignore exceptions in user code.
+ }
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
index f792908..1a716ac 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientNotificationType.java
@@ -21,6 +21,25 @@ package org.apache.ignite.internal.client.thin;
* Notification types.
*/
enum ClientNotificationType {
+ /** Continuous query event. */
+ CONTINUOUS_QUERY_EVENT(false),
+
/** Compute task finished. */
- COMPUTE_TASK_FINISHED;
+ COMPUTE_TASK_FINISHED(true);
+
+ /** */
+ private final boolean keepNotificationsWithoutListener;
+
+ /** */
+ ClientNotificationType(boolean keepNotificationsWithoutListener) {
+ this.keepNotificationsWithoutListener =
keepNotificationsWithoutListener;
+ }
+
+ /**
+ * @return {@code True} if it's required to save all received
notifications when listener for this notification
+ * is not yet registered. These notifications will be processed when
listener for their resource will be registered.
+ */
+ public boolean keepNotificationsWithoutListener() {
+ return keepNotificationsWithoutListener;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index 987345d..1abb51a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -61,6 +61,8 @@ enum ClientOperation {
/** Query sql cursor get page. */QUERY_SQL_CURSOR_GET_PAGE(2003),
/** Query sql fields. */QUERY_SQL_FIELDS(2004),
/** Query sql fields cursor get page.
*/QUERY_SQL_FIELDS_CURSOR_GET_PAGE(2005),
+ /** Continuous query. */QUERY_CONTINUOUS(2006),
+ /** Continuous query event. */QUERY_CONTINUOUS_EVENT(2007,
ClientNotificationType.CONTINUOUS_QUERY_EVENT),
/** Get binary type. */GET_BINARY_TYPE(3002),
/** Register binary type name. */REGISTER_BINARY_TYPE_NAME(3001),
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index cf9b70b..8857077 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -25,8 +25,13 @@ import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.cache.Cache;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryListener;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
@@ -35,12 +40,15 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientCacheConfiguration;
+import org.apache.ignite.client.ClientDisconnectListener;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.IgniteClientFuture;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
import
org.apache.ignite.internal.client.thin.TcpClientTransactions.TcpClientTransaction;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.internal.client.thin.ProtocolVersionFeature.EXPIRY_POLICY;
@@ -59,6 +67,9 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
/** "With expiry policy" flag mask. */
private static final byte WITH_EXPIRY_POLICY_FLAG_MASK = 0x04;
+ /** Platform type: Java platform. */
+ static final byte JAVA_PLATFORM = 1;
+
/** Cache id. */
private final int cacheId;
@@ -83,24 +94,34 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
/** Expiry policy. */
private final ExpiryPolicy expiryPlc;
+ /** Cache entry listeners registry. */
+ private final ClientCacheEntryListenersRegistry lsnrsRegistry;
+
+ /** JCache adapter. */
+ private final Cache<K, V> jCacheAdapter;
+
/** Constructor. */
- TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller
marsh, TcpClientTransactions transactions) {
- this(name, ch, marsh, transactions, false, null);
+ TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller
marsh, TcpClientTransactions transactions,
+ ClientCacheEntryListenersRegistry lsnrsRegistry) {
+ this(name, ch, marsh, transactions, lsnrsRegistry, false, null);
}
/** Constructor. */
TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller
marsh, TcpClientTransactions transactions,
- boolean keepBinary, ExpiryPolicy expiryPlc) {
+ ClientCacheEntryListenersRegistry lsnrsRegistry, boolean keepBinary,
ExpiryPolicy expiryPlc) {
this.name = name;
this.cacheId = ClientUtils.cacheId(name);
this.ch = ch;
this.marsh = marsh;
this.transactions = transactions;
+ this.lsnrsRegistry = lsnrsRegistry;
serDes = new ClientUtils(marsh);
this.keepBinary = keepBinary;
this.expiryPlc = expiryPlc;
+
+ jCacheAdapter = new ClientJCacheAdapter<>(this);
}
/** {@inheritDoc} */
@@ -697,12 +718,12 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
/** {@inheritDoc} */
@Override public <K1, V1> ClientCache<K1, V1> withKeepBinary() {
return keepBinary ? (ClientCache<K1, V1>)this :
- new TcpClientCache<>(name, ch, marsh, transactions, true,
expiryPlc);
+ new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry,
true, expiryPlc);
}
/** {@inheritDoc} */
@Override public <K1, V1> ClientCache<K1, V1>
withExpirePolicy(ExpiryPolicy expirePlc) {
- return new TcpClientCache<>(name, ch, marsh, transactions, keepBinary,
expirePlc);
+ return new TcpClientCache<>(name, ch, marsh, transactions,
lsnrsRegistry, keepBinary, expirePlc);
}
/** {@inheritDoc} */
@@ -719,6 +740,8 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
res = (QueryCursor<R>)sqlQuery((SqlQuery)qry);
else if (qry instanceof SqlFieldsQuery)
res = (QueryCursor<R>)query((SqlFieldsQuery)qry);
+ else if (qry instanceof ContinuousQuery)
+ res = query((ContinuousQuery<K, V>)qry, null);
else
throw new IllegalArgumentException(
String.format("Query of type [%s] is not supported",
qry.getClass().getSimpleName())
@@ -747,6 +770,99 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
));
}
+ /** {@inheritDoc} */
+ @Override public <R> QueryCursor<R> query(ContinuousQuery<K, V> qry,
ClientDisconnectListener disconnectLsnr) {
+ A.ensure(!(qry.getInitialQuery() instanceof ContinuousQuery), "Initial
query for continuous query " +
+ "can't be an instance of another continuous query");
+ A.notNull(qry.getLocalListener(), "Local listener");
+ A.ensure(!qry.isLocal(), "Local query is not supported by thin
client");
+ A.ensure(qry.isAutoUnsubscribe(), "AutoUnsubscribe flag is not
supported by thin client");
+ A.ensure(qry.getRemoteFilterFactory() == null || qry.getRemoteFilter()
== null,
+ "RemoteFilter and RemoteFilterFactory can't be used together");
+
+ ClientCacheEntryListenerHandler<K, V> hnd = new
ClientCacheEntryListenerHandler<>(
+ jCacheAdapter,
+ ch,
+ marsh,
+ keepBinary
+ );
+
+ hnd.startListen(
+ qry.getLocalListener(),
+ disconnectLsnr,
+ qry.getRemoteFilterFactory() != null ?
qry.getRemoteFilterFactory() : qry.getRemoteFilter() != null ?
+ FactoryBuilder.factoryOf(qry.getRemoteFilter()) : null,
+ qry.getPageSize(),
+ qry.getTimeInterval(),
+ qry.isIncludeExpired()
+ );
+
+ if (qry.getInitialQuery() != null) {
+ try {
+ QueryCursor<R> cur =
(QueryCursor<R>)query(qry.getInitialQuery());
+
+ return new ClientContinuousQueryCursor<>(cur, hnd);
+ }
+ catch (Exception e) {
+ U.closeQuiet(hnd);
+
+ throw e;
+ }
+ }
+ else
+ return new ClientContinuousQueryCursor<>(null, hnd);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void
registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cfg) {
+ registerCacheEntryListener(cfg, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void
registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cfg,
+ ClientDisconnectListener disconnectLsnr) {
+ A.ensure(!cfg.isSynchronous(),
+ "Unsupported cfg.isSynchronous() flag value");
+
+ A.notNull(cfg.getCacheEntryListenerFactory(),
"cfg.getCacheEntryListenerFactory()");
+
+ ClientCacheEntryListenerHandler<K, V> hnd = new
ClientCacheEntryListenerHandler<>(
+ jCacheAdapter,
+ ch,
+ marsh,
+ keepBinary
+ );
+
+ if (lsnrsRegistry.registerCacheEntryListener(name, cfg, hnd)) {
+ CacheEntryListener<? super K, ? super V> locLsnr =
cfg.getCacheEntryListenerFactory().create();
+
+ ClientDisconnectListener disconnectLsnr0 = e -> {
+ if (disconnectLsnr != null)
+ disconnectLsnr.onDisconnected(e);
+
+ lsnrsRegistry.deregisterCacheEntryListener(name, cfg);
+ };
+
+ hnd.startListen(
+ new ClientJCacheEntryListenerAdapter<>(locLsnr),
+ disconnectLsnr0,
+ cfg.getCacheEntryEventFilterFactory(),
+ ContinuousQuery.DFLT_PAGE_SIZE,
+ ContinuousQuery.DFLT_TIME_INTERVAL,
+ locLsnr instanceof CacheEntryExpiredListener
+ );
+ }
+ else
+ throw new IllegalStateException("Listener is already registered
for configuration: " + cfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void
deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> cfg) {
+ ClientCacheEntryListenerHandler<?, ?> hnd =
lsnrsRegistry.deregisterCacheEntryListener(name, cfg);
+
+ U.closeQuiet(hnd);
+ }
+
/** Handle scan query. */
private QueryCursor<Cache.Entry<K, V>> scanQuery(ScanQuery<K, V> qry) {
Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
@@ -758,7 +874,7 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
out.writeByte(GridBinaryMarshaller.NULL);
else {
serDes.writeObject(out, qry.getFilter());
- out.writeByte((byte)1); // Java platform
+ out.writeByte(JAVA_PLATFORM);
}
out.writeInt(qry.getPageSize());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index 3cd8688..4246b07 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -153,8 +153,10 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
throws ClientConnectionException, ClientAuthenticationException,
ClientProtocolError {
validateConfiguration(cfg);
- for (ClientNotificationType type : ClientNotificationType.values())
- pendingNotifications[type.ordinal()] = new ConcurrentHashMap<>();
+ for (ClientNotificationType type : ClientNotificationType.values()) {
+ if (type.keepNotificationsWithoutListener())
+ pendingNotifications[type.ordinal()] = new
ConcurrentHashMap<>();
+ }
Executor cfgExec = cfg.getAsyncContinuationExecutor();
asyncContinuationExecutor = cfgExec != null ? cfgExec :
ForkJoinPool.commonPool();
@@ -447,7 +449,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
if (lsrns != null)
lsnr = lsrns.get(resId);
- if (lsnr == null) {
+ if (notificationType.keepNotificationsWithoutListener() &&
lsnr == null) {
pendingNotifications[notificationType.ordinal()].computeIfAbsent(resId,
k -> new ConcurrentLinkedQueue<>()).add(new
T2<>(res, err));
}
@@ -484,7 +486,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
/** {@inheritDoc} */
@Override public void addNotificationListener(ClientNotificationType type,
Long rsrcId, NotificationListener lsnr) {
- Queue<T2<ByteBuffer, Exception>> pendingQueue;
+ Queue<T2<ByteBuffer, Exception>> pendingQueue = null;
notificationLsnrsGuard.writeLock().lock();
@@ -499,7 +501,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
lsnrs.put(rsrcId, lsnr);
- pendingQueue = pendingNotifications[type.ordinal()].remove(rsrcId);
+ if (type.keepNotificationsWithoutListener())
+ pendingQueue =
pendingNotifications[type.ordinal()].remove(rsrcId);
}
finally {
notificationLsnrsGuard.writeLock().unlock();
@@ -522,7 +525,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
lsnrs.remove(rsrcId);
- pendingNotifications[type.ordinal()].remove(rsrcId);
+ if (type.keepNotificationsWithoutListener())
+ pendingNotifications[type.ordinal()].remove(rsrcId);
}
finally {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index c67184a..617948d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -84,6 +84,9 @@ public class TcpIgniteClient implements IgniteClient {
/** Services facade. */
private final ClientServicesImpl services;
+ /** Registered entry listeners for all caches. */
+ private final ClientCacheEntryListenersRegistry lsnrsRegistry;
+
/** Marshaller. */
private final ClientBinaryMarshaller marsh;
@@ -130,6 +133,8 @@ public class TcpIgniteClient implements IgniteClient {
compute = new ClientComputeImpl(ch, marsh,
cluster.defaultClusterGroup());
services = new ClientServicesImpl(ch, marsh,
cluster.defaultClusterGroup());
+
+ lsnrsRegistry = new ClientCacheEntryListenersRegistry();
}
catch (Exception e) {
ch.close();
@@ -148,7 +153,7 @@ public class TcpIgniteClient implements IgniteClient {
ch.request(ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME, req ->
writeString(name, req.out()));
- return new TcpClientCache<>(name, ch, marsh, transactions);
+ return new TcpClientCache<>(name, ch, marsh, transactions,
lsnrsRegistry);
}
/** {@inheritDoc} */
@@ -157,7 +162,7 @@ public class TcpIgniteClient implements IgniteClient {
return new IgniteClientFutureImpl<>(
ch.requestAsync(ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME,
req -> writeString(name, req.out()))
- .thenApply(x -> new TcpClientCache<>(name, ch, marsh,
transactions)));
+ .thenApply(x -> new TcpClientCache<>(name, ch, marsh,
transactions, lsnrsRegistry)));
}
/** {@inheritDoc} */
@@ -168,7 +173,7 @@ public class TcpIgniteClient implements IgniteClient {
ch.request(ClientOperation.CACHE_GET_OR_CREATE_WITH_CONFIGURATION,
req -> serDes.cacheConfiguration(cfg, req.out(),
req.clientChannel().protocolCtx()));
- return new TcpClientCache<>(cfg.getName(), ch, marsh, transactions);
+ return new TcpClientCache<>(cfg.getName(), ch, marsh, transactions,
lsnrsRegistry);
}
/** {@inheritDoc} */
@@ -179,14 +184,14 @@ public class TcpIgniteClient implements IgniteClient {
return new IgniteClientFutureImpl<>(
ch.requestAsync(ClientOperation.CACHE_GET_OR_CREATE_WITH_CONFIGURATION,
req -> serDes.cacheConfiguration(cfg, req.out(),
req.clientChannel().protocolCtx()))
- .thenApply(x -> new TcpClientCache<>(cfg.getName(),
ch, marsh, transactions)));
+ .thenApply(x -> new TcpClientCache<>(cfg.getName(),
ch, marsh, transactions, lsnrsRegistry)));
}
/** {@inheritDoc} */
@Override public <K, V> ClientCache<K, V> cache(String name) {
ensureCacheName(name);
- return new TcpClientCache<>(name, ch, marsh, transactions);
+ return new TcpClientCache<>(name, ch, marsh, transactions,
lsnrsRegistry);
}
/** {@inheritDoc} */
@@ -221,7 +226,7 @@ public class TcpIgniteClient implements IgniteClient {
ch.request(ClientOperation.CACHE_CREATE_WITH_NAME, req ->
writeString(name, req.out()));
- return new TcpClientCache<>(name, ch, marsh, transactions);
+ return new TcpClientCache<>(name, ch, marsh, transactions,
lsnrsRegistry);
}
/** {@inheritDoc} */
@@ -230,7 +235,7 @@ public class TcpIgniteClient implements IgniteClient {
return new IgniteClientFutureImpl<>(
ch.requestAsync(ClientOperation.CACHE_CREATE_WITH_NAME, req ->
writeString(name, req.out()))
- .thenApply(x -> new TcpClientCache<>(name, ch, marsh,
transactions)));
+ .thenApply(x -> new TcpClientCache<>(name, ch, marsh,
transactions, lsnrsRegistry)));
}
/** {@inheritDoc} */
@@ -240,7 +245,7 @@ public class TcpIgniteClient implements IgniteClient {
ch.request(ClientOperation.CACHE_CREATE_WITH_CONFIGURATION,
req -> serDes.cacheConfiguration(cfg, req.out(),
req.clientChannel().protocolCtx()));
- return new TcpClientCache<>(cfg.getName(), ch, marsh, transactions);
+ return new TcpClientCache<>(cfg.getName(), ch, marsh, transactions,
lsnrsRegistry);
}
/** {@inheritDoc} */
@@ -251,7 +256,7 @@ public class TcpIgniteClient implements IgniteClient {
return new IgniteClientFutureImpl<>(
ch.requestAsync(ClientOperation.CACHE_CREATE_WITH_CONFIGURATION,
req -> serDes.cacheConfiguration(cfg, req.out(),
req.clientChannel().protocolCtx()))
- .thenApply(x -> new TcpClientCache<>(cfg.getName(),
ch, marsh, transactions)));
+ .thenApply(x -> new TcpClientCache<>(cfg.getName(),
ch, marsh, transactions, lsnrsRegistry)));
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 0deb6c9..c34b38e 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -40,8 +40,6 @@ import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.internal.client.thin.AbstractThinClientTest;
import org.apache.ignite.internal.client.thin.ClientServerError;
-import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
-import org.apache.ignite.mxbean.ClientProcessorMXBean;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
@@ -386,18 +384,6 @@ public class ReliabilityTest extends
AbstractThinClientTest {
}
/**
- * Drop all thin client connections on given Ignite instance.
- *
- * @param ignite Ignite.
- */
- private void dropAllThinClientConnections(Ignite ignite) {
- ClientProcessorMXBean mxBean = getMxBean(ignite.name(), "Clients",
- ClientListenerProcessor.class, ClientProcessorMXBean.class);
-
- mxBean.dropAllConnections();
- }
-
- /**
* Run the closure while Ignite nodes keep failing/recovering several
times.
*/
private void assertOnUnstableCluster(LocalIgniteCluster cluster, Runnable
clo) throws Exception {
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java
index 2e2f21d..486939d 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java
@@ -25,6 +25,8 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.mxbean.ClientProcessorMXBean;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
/**
@@ -94,4 +96,24 @@ public abstract class AbstractThinClientTest extends
GridCommonAbstractTest {
return startClient(Arrays.stream(igniteIdxs).mapToObj(igniteIdx ->
grid(igniteIdx).cluster().localNode())
.toArray(ClusterNode[]::new));
}
+
+ /**
+ * Drop all thin client connections on given Ignite instance.
+ *
+ * @param ignite Ignite.
+ */
+ protected void dropAllThinClientConnections(Ignite ignite) {
+ ClientProcessorMXBean mxBean = getMxBean(ignite.name(), "Clients",
+ ClientListenerProcessor.class, ClientProcessorMXBean.class);
+
+ mxBean.dropAllConnections();
+ }
+
+ /**
+ * Drop all thin client connections on all Ignite instances.
+ */
+ protected void dropAllThinClientConnections() {
+ for (Ignite ignite : G.allGrids())
+ dropAllThinClientConnections(ignite);
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheEntryListenersTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheEntryListenersTest.java
new file mode 100644
index 0000000..49496bca
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheEntryListenersTest.java
@@ -0,0 +1,774 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.cache.Cache;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryExpiredListener;
+import javax.cache.event.CacheEntryRemovedListener;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientDisconnectListener;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.Person;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.ThinClientConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Thin client cache entry listeners test.
+ */
+public class CacheEntryListenersTest extends AbstractThinClientTest {
+ /** Timeout. */
+ private static final long TIMEOUT = 1_000L;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(3);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return
super.getConfiguration(igniteInstanceName).setClientConnectorConfiguration(
+ new ClientConnectorConfiguration().setThinClientConfiguration(
+ new
ThinClientConfiguration().setMaxActiveComputeTasksPerConnection(100)));
+ }
+
+ /** Test continuous queries. */
+ @Test
+ public void testContinuousQueries() throws Exception {
+ try (IgniteClient client = startClient(0, 1, 2)) {
+ ClientCache<Integer, Integer> cache =
client.getOrCreateCache("testCQ");
+
+ ContinuousQueryListener<Integer, Integer> lsnr = new
ContinuousQueryListener<>();
+
+ cache.query(new ContinuousQuery<Integer,
Integer>().setLocalListener(lsnr));
+
+ for (int i = 0; i < 10; i++)
+ cache.put(i, i);
+
+ assertEquals(F.asMap(EventType.CREATED, IntStream.range(0,
10).boxed()
+ .collect(Collectors.toMap(i -> i, i -> i))),
aggregateListenerEvents(lsnr, 10));
+
+ for (int i = 0; i < 10; i++)
+ cache.put(i, -i);
+
+ assertEquals(F.asMap(EventType.UPDATED, IntStream.range(0,
10).boxed()
+ .collect(Collectors.toMap(i -> i, i -> -i))),
aggregateListenerEvents(lsnr, 10));
+
+ for (int i = 0; i < 10; i++)
+ cache.remove(i);
+
+ assertEquals(F.asMap(EventType.REMOVED, IntStream.range(0,
10).boxed()
+ .collect(Collectors.toMap(i -> i, i -> -i))),
aggregateListenerEvents(lsnr, 10));
+
+ assertTrue(lsnr.isQueueEmpty());
+ }
+ }
+
+ /** Test continuous queries with initial query. */
+ @Test
+ public void testContinuousQueriesWithInitialQuery() throws Exception {
+ try (IgniteClient client = startClient(0, 1, 2)) {
+ ClientCache<Integer, Integer> cache =
client.getOrCreateCache("testCQWithInitQ");
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, i);
+
+ ContinuousQueryListener<Integer, Integer> lsnr = new
ContinuousQueryListener<>();
+
+ QueryCursor<Cache.Entry<Integer, Integer>> cur = cache.query(new
ContinuousQuery<Integer, Integer>()
+ .setInitialQuery(new ScanQuery<>()).setLocalListener(lsnr));
+
+ assertTrue(lsnr.isQueueEmpty());
+
+ assertEquals(100, cur.getAll().size());
+
+ cache.put(100, 100);
+
+ lsnr.assertNextCacheEvent(EventType.CREATED, 100, 100);
+
+ cache.put(100, 101);
+
+ lsnr.assertNextCacheEvent(EventType.UPDATED, 100, 101);
+
+ cache.remove(100);
+
+ lsnr.assertNextCacheEvent(EventType.REMOVED, 100);
+ }
+ }
+
+ /** Test continuous queries with include expired parameter. */
+ @Test
+ public void testContinuousQueriesWithIncludeExpired() throws Exception {
+ try (IgniteClient client = startClient(0, 1, 2)) {
+ ClientCache<Integer, Integer> cache =
client.getOrCreateCache("testCQWithInclExp");
+
+ ContinuousQueryListener<Integer, Integer> lsnr1 = new
ContinuousQueryListener<>();
+ ContinuousQueryListener<Integer, Integer> lsnr2 = new
ContinuousQueryListener<>();
+
+ ContinuousQuery<Integer, Integer> qry1 = new
ContinuousQuery<Integer, Integer>().setLocalListener(lsnr1);
+ ContinuousQuery<Integer, Integer> qry2 = new
ContinuousQuery<Integer, Integer>().setLocalListener(lsnr2);
+
+ qry1.setIncludeExpired(false);
+ qry2.setIncludeExpired(true);
+
+ cache.query(qry1);
+ cache.query(qry2);
+
+ cache = cache.withExpirePolicy(new CreatedExpiryPolicy(new
Duration(TimeUnit.MILLISECONDS, 1)));
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, i);
+
+ int cntCreated = 0;
+ int cntExpired = 0;
+
+ for (int i = 0; i < 100; i++)
+ assertEquals(EventType.CREATED, lsnr1.poll().getEventType());
+
+ for (int i = 0; i < 200; i++) { // There should be two events for
each cache entry.
+ CacheEntryEvent<?, ?> evt = lsnr2.poll();
+
+ if (evt.getEventType() == EventType.CREATED)
+ cntCreated++;
+ else if (evt.getEventType() == EventType.EXPIRED)
+ cntExpired++;
+ else
+ fail("Unexpected event type: " + evt.getEventType());
+ }
+
+ assertEquals(100, cntCreated);
+ assertEquals(100, cntExpired);
+
+ assertTrue(lsnr1.isQueueEmpty());
+ assertTrue(lsnr2.isQueueEmpty());
+ }
+ }
+
+ /** Test continuous queries with page size parameter. */
+ @Test
+ public void testContinuousQueriesWithPageSize() throws Exception {
+ try (IgniteClient client = startClient(1, 2)) {
+ ClientCache<Integer, Integer> cache =
client.getOrCreateCache("testCQWithPageSize");
+ IgniteCache<Integer, Integer> nodeCache =
grid(0).getOrCreateCache(cache.getName());
+
+ ContinuousQueryListener<Integer, Integer> lsnr = new
ContinuousQueryListener<>();
+
+ ContinuousQuery<Integer, Integer> qry = new
ContinuousQuery<Integer, Integer>().setLocalListener(lsnr)
+ .setPageSize(10);
+
+ cache.query(qry);
+
+ // Each node has its own buffer, put data to the exactly one
remote node.
+ primaryKeys(nodeCache, 15).forEach(key -> cache.put(key, key));
+
+ // Check that only first page is received.
+ aggregateListenerEvents(lsnr, 10);
+
+ assertTrue(lsnr.isQueueEmpty());
+
+ primaryKeys(nodeCache, 6).forEach(key -> cache.put(key, key));
+
+ // Check that only second page is received.
+ aggregateListenerEvents(lsnr, 10);
+
+ assertTrue(lsnr.isQueueEmpty());
+ }
+ }
+
+ /** Test continuous queries with time interval parameter. */
+ @Test
+ public void testContinuousQueriesWithTimeInterval() throws Exception {
+ try (IgniteClient client = startClient(1, 2)) {
+ ClientCache<Integer, Integer> cache =
client.getOrCreateCache("testCQWithTimeInterval");
+ IgniteCache<Integer, Integer> nodeCache =
grid(0).getOrCreateCache(cache.getName());
+
+ ContinuousQueryListener<Integer, Integer> lsnr = new
ContinuousQueryListener<>();
+
+ ContinuousQuery<Integer, Integer> qry = new
ContinuousQuery<Integer, Integer>().setLocalListener(lsnr)
+ .setPageSize(10).setTimeInterval(TIMEOUT);
+
+ long ts1 = U.currentTimeMillis();
+
+ cache.query(qry);
+
+ // Put data to the remote node.
+ int key = primaryKey(nodeCache);
+ cache.put(key, key);
+
+ assertNotNull(lsnr.poll(TIMEOUT * 2));
+
+ assertTrue(lsnr.isQueueEmpty());
+
+ long ts2 = U.currentTimeMillis();
+
+ // Ensure that item was received after timeout.
+ assertTrue("ts2 - ts1 = " + (ts2 - ts1), ts2 - ts1 >= TIMEOUT);
+ }
+ }
+
+ /** Test JCache entry listeners. */
+ @Test
+ public void testJCacheListeners() throws Exception {
+ try (IgniteClient client = startClient(0, 1, 2)) {
+ ClientCache<Integer, Integer> cache =
client.getOrCreateCache("testJCacheListeners");
+
+ JCacheEntryListener<Integer, Integer> lsnr = new
JCacheEntryListener<>();
+
+ cache.registerCacheEntryListener(new
MutableCacheEntryListenerConfiguration<>(
+ () -> lsnr, null, true, false));
+
+ for (int i = 0; i < 10; i++)
+ cache.put(i, i);
+
+ assertEquals(F.asMap(EventType.CREATED, IntStream.range(0,
10).boxed()
+ .collect(Collectors.toMap(i -> i, i -> i))),
aggregateListenerEvents(lsnr, 10));
+
+ for (int i = 0; i < 10; i++)
+ cache.put(i, -i);
+
+ assertEquals(F.asMap(EventType.UPDATED, IntStream.range(0,
10).boxed()
+ .collect(Collectors.toMap(i -> i, i -> -i))),
aggregateListenerEvents(lsnr, 10));
+
+ for (int i = 0; i < 10; i++)
+ cache.remove(i);
+
+ assertEquals(F.asMap(EventType.REMOVED, IntStream.range(0,
10).boxed()
+ .collect(Collectors.toMap(i -> i, i -> -i))),
aggregateListenerEvents(lsnr, 10));
+
+ assertTrue(lsnr.isQueueEmpty());
+ }
+ }
+
+ /** Test JCache entry listeners with expired entries. */
+ @Test
+ public void testJCacheListenersExpiredEntries() throws Exception {
+ try (IgniteClient client = startClient(0, 1, 2)) {
+ ClientCache<Integer, Integer> cache =
client.getOrCreateCache("testJCacheListenersWithExp");
+
+ JCacheEntryListener<Integer, Integer> lsnr = new
JCacheEntryListener<>();
+
+ cache.registerCacheEntryListener(new
MutableCacheEntryListenerConfiguration<>(
+ () -> lsnr, null, true, false));
+
+ cache = cache.withExpirePolicy(new CreatedExpiryPolicy(new
Duration(TimeUnit.MILLISECONDS, 1)));
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, i);
+
+ int cntCreated = 0;
+ int cntExpired = 0;
+
+ for (int i = 0; i < 200; i++) { // There should be two events for
each cache entry.
+ CacheEntryEvent<?, ?> evt = lsnr.poll();
+
+ if (evt.getEventType() == EventType.CREATED)
+ cntCreated++;
+ else if (evt.getEventType() == EventType.EXPIRED)
+ cntExpired++;
+ else
+ fail("Unexpected event type: " + evt.getEventType());
+ }
+
+ assertEquals(100, cntCreated);
+ assertEquals(100, cntExpired);
+
+ assertTrue(lsnr.isQueueEmpty());
+ }
+ }
+
+ /** Test continuous queries and JCache entry listeners with keep binary
flag. */
+ @Test
+ public void testListenersWithKeepBinary() throws Exception {
+ try (IgniteClient client = startClient(0, 1, 2)) {
+ ClientCache<Object, Object> cache1 =
client.getOrCreateCache("testListenersWithKB");
+ ClientCache<Object, Object> cache2 = cache1.withKeepBinary();
+
+ ContinuousQueryListener<Object, Object> lsnr1 = new
ContinuousQueryListener<>();
+ ContinuousQueryListener<Object, Object> lsnr2 = new
ContinuousQueryListener<>();
+
+ cache1.query(new ContinuousQuery<>().setLocalListener(lsnr1));
+ cache2.query(new ContinuousQuery<>().setLocalListener(lsnr2));
+
+ JCacheEntryListener<Object, Object> lsnr3 = new
JCacheEntryListener<>();
+ JCacheEntryListener<Object, Object> lsnr4 = new
JCacheEntryListener<>();
+
+ cache1.registerCacheEntryListener(new
MutableCacheEntryListenerConfiguration<>(
+ () -> lsnr3, null, true, false));
+ cache2.registerCacheEntryListener(new
MutableCacheEntryListenerConfiguration<>(
+ () -> lsnr4, null, true, false));
+
+ Person person1 = new Person(0, "name");
+ Person person2 = new Person(1, "another name");
+
+ cache1.put(0, person1);
+
+ lsnr1.assertNextCacheEvent(EventType.CREATED, 0, person1);
+ lsnr2.assertNextCacheEvent(EventType.CREATED, 0,
client.binary().toBinary(person1));
+ lsnr3.assertNextCacheEvent(EventType.CREATED, 0, person1);
+ lsnr4.assertNextCacheEvent(EventType.CREATED, 0,
client.binary().toBinary(person1));
+
+ cache1.put(0, person2);
+
+ lsnr1.assertNextCacheEvent(EventType.UPDATED, 0, person2);
+ lsnr2.assertNextCacheEvent(EventType.UPDATED, 0,
client.binary().toBinary(person2));
+ lsnr3.assertNextCacheEvent(EventType.UPDATED, 0, person2);
+ lsnr4.assertNextCacheEvent(EventType.UPDATED, 0,
client.binary().toBinary(person2));
+ }
+ }
+
+ /** Test continuous queries and JCache entry listeners with remote
filters. */
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testListenersWithRemoteFilter() throws Exception {
+ try (IgniteClient client = startClient(0, 1, 2)) {
+ ClientCache<Integer, Integer> cache =
client.getOrCreateCache("testListenersWithRmtFilter");
+
+ CacheEntryEventSerializableFilter<Integer, Integer> rmtFilter =
evt -> (evt.getKey() & 1) == 0;
+
+ ContinuousQueryListener<Integer, Integer> lsnr1 = new
ContinuousQueryListener<>();
+ ContinuousQueryListener<Integer, Integer> lsnr2 = new
ContinuousQueryListener<>();
+
+ cache.query(new ContinuousQuery<Integer,
Integer>().setLocalListener(lsnr1)
+ .setRemoteFilterFactory(() -> rmtFilter));
+
+ cache.query(new ContinuousQuery<Integer,
Integer>().setLocalListener(lsnr2)
+ .setRemoteFilter(rmtFilter));
+
+ JCacheEntryListener<Integer, Integer> lsnr3 = new
JCacheEntryListener<>();
+
+ cache.registerCacheEntryListener(new
MutableCacheEntryListenerConfiguration<>(
+ () -> lsnr3, () -> rmtFilter, true, false));
+
+ for (int i = 0; i < 10; i++)
+ cache.put(i, i);
+
+ Map<EventType, Map<Integer, Integer>> expRes =
F.asMap(EventType.CREATED,
+ IntStream.range(0, 5).boxed().collect(Collectors.toMap(i -> i
* 2, i -> i * 2)));
+
+ assertEquals(expRes, aggregateListenerEvents(lsnr1, 5));
+ assertEquals(expRes, aggregateListenerEvents(lsnr2, 5));
+ assertEquals(expRes, aggregateListenerEvents(lsnr3, 5));
+
+ for (int i = 0; i < 10; i++)
+ cache.put(i, -i);
+
+ expRes = F.asMap(EventType.UPDATED,
+ IntStream.range(0, 5).boxed().collect(Collectors.toMap(i -> i
* 2, i -> -i * 2)));
+
+ assertEquals(expRes, aggregateListenerEvents(lsnr1, 5));
+ assertEquals(expRes, aggregateListenerEvents(lsnr2, 5));
+ assertEquals(expRes, aggregateListenerEvents(lsnr3, 5));
+
+ for (int i = 0; i < 10; i++)
+ cache.remove(i);
+
+ expRes = F.asMap(EventType.REMOVED,
+ IntStream.range(0, 5).boxed().collect(Collectors.toMap(i -> i
* 2, i -> -i * 2)));
+
+ assertEquals(expRes, aggregateListenerEvents(lsnr1, 5));
+ assertEquals(expRes, aggregateListenerEvents(lsnr2, 5));
+ assertEquals(expRes, aggregateListenerEvents(lsnr3, 5));
+
+ assertTrue(lsnr1.isQueueEmpty());
+ assertTrue(lsnr2.isQueueEmpty());
+ assertTrue(lsnr3.isQueueEmpty());
+ }
+ }
+
+ /** Test disconnect event for cache entry listeners. */
+ @Test
+ public void testDisconnectListeners() throws Exception {
+ try (IgniteClient client = startClient(0, 1, 2)) {
+ ClientCache<Object, Object> cache =
client.getOrCreateCache("testDisconnect");
+
+ ContinuousQueryListener<Object, Object> lsnr1 = new
ContinuousQueryListener<>();
+
+ cache.query(new ContinuousQuery<>().setLocalListener(lsnr1),
lsnr1);
+
+ JCacheEntryListener<Object, Object> lsnr2 = new
JCacheEntryListener<>();
+
+ CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new
MutableCacheEntryListenerConfiguration<>(
+ () -> lsnr2, null, true, false);
+
+ cache.registerCacheEntryListener(lsnrCfg, lsnr2);
+
+ cache.put(0, 0);
+
+ lsnr1.assertNextCacheEvent(EventType.CREATED, 0, 0);
+ lsnr2.assertNextCacheEvent(EventType.CREATED, 0, 0);
+
+ dropAllThinClientConnections();
+
+ // Can't detect channel failure until we send something to server.
+ cache.put(1, 1);
+
+ assertTrue(lsnr1.isQueueEmpty());
+ assertTrue(lsnr2.isQueueEmpty());
+
+ assertTrue(waitForCondition(lsnr1::isDisconnected, TIMEOUT));
+ assertTrue(waitForCondition(lsnr2::isDisconnected, TIMEOUT));
+
+ // Should be able to register the same listener on the same cache
again.
+ cache.registerCacheEntryListener(lsnrCfg);
+ }
+ }
+
+ /** */
+ @Test
+ @SuppressWarnings("ThrowableNotThrown")
+ public void testRegisterDeregisterListener() throws Exception {
+ try (IgniteClient client = startClient(0, 1, 2)) {
+ String cacheName = "registerListener";
+
+ ClientCache<Integer, Integer> cache0 =
client.getOrCreateCache(cacheName);
+
+ CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new
MutableCacheEntryListenerConfiguration<>(
+ JCacheEntryListener::new,
+ null,
+ true,
+ false
+ );
+
+ cache0.registerCacheEntryListener(lsnrCfg);
+
+ ClientCache<Integer, Integer> cache1 =
client.getOrCreateCache(cacheName + '2');
+
+ // Can register the same listener on another cache.
+ cache1.registerCacheEntryListener(lsnrCfg);
+
+ ClientCache<Integer, Integer> cache2 = client.cache(cacheName);
+
+ // Can't register the same listener on the same cache.
+ assertThrowsWithCause(() ->
cache2.registerCacheEntryListener(lsnrCfg), IllegalStateException.class);
+
+ ClientCache<Integer, Integer> cache3 = client.cache(cacheName);
+
+ cache3.deregisterCacheEntryListener(lsnrCfg);
+
+ // Can register the listener after deregisteration.
+ cache2.registerCacheEntryListener(lsnrCfg);
+ }
+ }
+
+ /** */
+ @Test
+ @SuppressWarnings({"ThrowableNotThrown", "deprecation"})
+ public void testListenersUnsupportedParameters() throws Exception {
+ try (IgniteClient client = startClient(0, 1, 2)) {
+ ClientCache<Integer, Integer> cache =
client.getOrCreateCache("testUnsupportedParams");
+
+ // Check null listener factory.
+ CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg1 = new
MutableCacheEntryListenerConfiguration<>(
+ null,
+ null,
+ true,
+ false
+ );
+
+ assertThrowsWithCause(() ->
cache.registerCacheEntryListener(lsnrCfg1), NullPointerException.class);
+
+ // Check synchronous flag.
+ CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg2 = new
MutableCacheEntryListenerConfiguration<>(
+ JCacheEntryListener::new,
+ null,
+ true,
+ true
+ );
+
+ assertThrowsWithCause(() ->
cache.registerCacheEntryListener(lsnrCfg2), IllegalArgumentException.class);
+
+ // Check local flag.
+ ContinuousQueryListener<Integer, Integer> cqLsnr = new
ContinuousQueryListener<>();
+
+ ContinuousQuery<Integer, Integer> qry1 = new
ContinuousQuery<Integer, Integer>().setLocalListener(cqLsnr)
+ .setLocal(true);
+
+ assertThrowsWithCause(() -> cache.query(qry1),
IllegalArgumentException.class);
+
+ // Check null listener.
+ ContinuousQuery<Integer, Integer> qry2 = new ContinuousQuery<>();
+
+ assertThrowsWithCause(() -> cache.query(qry2),
NullPointerException.class);
+
+ // Check auto unsubscribe flag.
+ ContinuousQuery<Integer, Integer> qry3 = new
ContinuousQuery<Integer, Integer>().setLocalListener(cqLsnr)
+ .setAutoUnsubscribe(false);
+
+ assertThrowsWithCause(() -> cache.query(qry3),
IllegalArgumentException.class);
+
+ // Check continuous query as initial query.
+ ContinuousQuery<Integer, Integer> qry4 = new
ContinuousQuery<Integer, Integer>().setLocalListener(cqLsnr)
+ .setInitialQuery(new ContinuousQuery<>());
+
+ assertThrowsWithCause(() -> cache.query(qry4),
IllegalArgumentException.class);
+
+ // Check filter factory and filter defined at the same time.
+ CacheEntryEventSerializableFilter<Integer, Integer> rmtFilter = r
-> true;
+
+ ContinuousQuery<Integer, Integer> qry5 = new
ContinuousQuery<Integer, Integer>().setLocalListener(cqLsnr)
+ .setRemoteFilter(rmtFilter);
+
+ qry5.setRemoteFilterFactory(FactoryBuilder.factoryOf(rmtFilter));
+
+ assertThrowsWithCause(() -> cache.query(qry5),
IllegalArgumentException.class);
+ }
+ }
+
+ /** */
+ @Test
+ public void testListenersClose() throws Exception {
+ try (IgniteClient client = startClient(0, 1, 2)) {
+ ClientCache<Integer, Integer> cache =
client.getOrCreateCache("testListenersClose");
+
+ ContinuousQueryListener<Integer, Integer> lsnr1 = new
ContinuousQueryListener<>();
+
+ QueryCursor<?> qry = cache.query(new ContinuousQuery<Integer,
Integer>().setLocalListener(lsnr1));
+
+ JCacheEntryListener<Integer, Integer> lsnr2 = new
JCacheEntryListener<>();
+
+ CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new
MutableCacheEntryListenerConfiguration<>(
+ () -> lsnr2, null, true, false);
+
+ cache.registerCacheEntryListener(lsnrCfg);
+
+ cache.put(0, 0);
+
+ lsnr1.assertNextCacheEvent(EventType.CREATED, 0, 0);
+ lsnr2.assertNextCacheEvent(EventType.CREATED, 0, 0);
+
+ qry.close();
+ cache.deregisterCacheEntryListener(lsnrCfg);
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, i);
+
+ assertTrue(lsnr1.isQueueEmpty());
+ assertTrue(lsnr2.isQueueEmpty());
+ }
+ }
+
+ /** */
+ @Test
+ public void testContinuousQueriesWithConcurrentCompute() throws Exception {
+ try (IgniteClient client = startClient(0, 1, 2)) {
+ int threadsCnt = 20;
+ int iterations = 50;
+
+ Set<UUID> allNodesIds = new
HashSet<>(F.nodeIds(grid(0).cluster().nodes()));
+
+ AtomicInteger threadIdxs = new AtomicInteger();
+
+ GridTestUtils.runMultiThreaded(
+ () -> {
+ int threadIdx = threadIdxs.incrementAndGet();
+
+ ClientCache<Integer, Integer> cache =
client.getOrCreateCache("testCQwithCompute" + threadIdx);
+
+ try {
+ for (int i = 0; i < iterations; i++) {
+ ContinuousQueryListener<Integer, Integer> lsnr =
new ContinuousQueryListener<>();
+
+ QueryCursor<?> cur = cache.query(new
ContinuousQuery<Integer, Integer>()
+ .setLocalListener(lsnr));
+
+ cache.put(i, i);
+
+ Future<T2<UUID, Set<UUID>>> fut =
client.compute().executeAsync2(TestTask.class.getName(),
+ null);
+
+ assertEquals(allNodesIds, fut.get().get2());
+
+ lsnr.assertNextCacheEvent(EventType.CREATED, i, i);
+
+ assertTrue(lsnr.isQueueEmpty());
+
+ cur.close();
+ }
+ }
+ catch (Exception e) {
+ log.error("Failure: ", e);
+
+ fail();
+ }
+ }, threadsCnt, "run-task-async"
+ );
+ }
+ }
+
+ /** */
+ private static <K, V> Map<EventType, Map<K, V>>
aggregateListenerEvents(ContinuousQueryListener<K, V> lsnr,
+ int evtsCnt) throws Exception {
+ Map<EventType, Map<K, V>> res = new EnumMap<>(EventType.class);
+
+ for (int i = 0; i < evtsCnt; i++) {
+ CacheEntryEvent<? extends K, ? extends V> evt = lsnr.poll();
+
+ Map<K, V> locMap = res.computeIfAbsent(evt.getEventType(), k ->
new HashMap<>());
+
+ locMap.put(evt.getKey(), evt.getValue());
+ }
+
+ return res;
+ }
+
+ /** */
+ private static class ContinuousQueryListener<K, V> implements
CacheEntryUpdatedListener<K, V>,
+ ClientDisconnectListener {
+ /** Local entries map. */
+ private final BlockingQueue<CacheEntryEvent<? extends K, ? extends V>>
evtsQ =
+ new LinkedBlockingQueue<>();
+
+ /** Disconnected flag. */
+ private volatile boolean disconnected;
+
+ /** Failure. */
+ private volatile Exception failure;
+
+ /**
+ * @param expectedEvtType Expected event type ({@code null} for any
event type).
+ * @param evt Event.
+ */
+ protected void addEvent(EventType expectedEvtType, CacheEntryEvent<?
extends K, ? extends V> evt) {
+ if (expectedEvtType != null && evt.getEventType() !=
expectedEvtType)
+ failure = new Exception("Unexpected event type [expEvtType=" +
expectedEvtType + ", evt=" + evt + ']');
+ else
+ evtsQ.add(evt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K,
? extends V>> events) {
+ events.forEach(evt -> addEvent(null, evt));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(Exception reason) {
+ disconnected = true;
+ }
+
+ /**
+ * Poll next cache event.
+ */
+ public CacheEntryEvent<? extends K, ? extends V> poll(long timeout)
throws Exception {
+ if (failure != null)
+ throw failure;
+
+ CacheEntryEvent<? extends K, ? extends V> evt =
evtsQ.poll(timeout, TimeUnit.MILLISECONDS);
+
+ assertNotNull(evt);
+
+ return evt;
+ }
+
+ /**
+ * Poll next cache event.
+ */
+ public CacheEntryEvent<? extends K, ? extends V> poll() throws
Exception {
+ return poll(TIMEOUT);
+ }
+
+ /**
+ * Assert parameters of the next cache event.
+ */
+ public void assertNextCacheEvent(EventType expType, K expKey) throws
Exception {
+ CacheEntryEvent<? extends K, ? extends V> evt = poll();
+ assertEquals(expType, evt.getEventType());
+ assertEquals(expKey, evt.getKey());
+ }
+
+ /**
+ * Assert parameters of the next cache event.
+ */
+ public void assertNextCacheEvent(EventType expType, K expKey, V
expVal) throws Exception {
+ CacheEntryEvent<? extends K, ? extends V> evt = poll();
+ assertEquals(expType, evt.getEventType());
+ assertEquals(expKey, evt.getKey());
+ assertEquals(expVal, evt.getValue());
+ }
+
+ /** */
+ public boolean isDisconnected() {
+ return disconnected;
+ }
+
+ /** */
+ public boolean isQueueEmpty() {
+ return evtsQ.isEmpty();
+ }
+ }
+
+ /** */
+ private static class JCacheEntryListener<K, V> extends
ContinuousQueryListener<K, V> implements
+ CacheEntryCreatedListener<K, V>, CacheEntryRemovedListener<K, V>,
CacheEntryExpiredListener<K, V> {
+ /** {@inheritDoc} */
+ @Override public void onCreated(Iterable<CacheEntryEvent<? extends K,
? extends V>> events) {
+ events.forEach(evt -> addEvent(EventType.CREATED, evt));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K,
? extends V>> events) {
+ events.forEach(evt -> addEvent(EventType.UPDATED, evt));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onRemoved(Iterable<CacheEntryEvent<? extends K,
? extends V>> events) {
+ events.forEach(evt -> addEvent(EventType.REMOVED, evt));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onExpired(Iterable<CacheEntryEvent<? extends K,
? extends V>> events) {
+ events.forEach(evt -> addEvent(EventType.EXPIRED, evt));
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
index 69d6fda..44b6425 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
@@ -35,8 +35,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientClusterGroup;
@@ -51,11 +49,8 @@ import org.apache.ignite.compute.ComputeTaskName;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.ThinClientConfiguration;
-import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.mxbean.ClientProcessorMXBean;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
@@ -647,18 +642,6 @@ public class ComputeTaskTest extends
AbstractThinClientTest {
}
/**
- *
- */
- private void dropAllThinClientConnections() {
- for (Ignite ignite : G.allGrids()) {
- ClientProcessorMXBean mxBean = getMxBean(ignite.name(), "Clients",
- ClientListenerProcessor.class, ClientProcessorMXBean.class);
-
- mxBean.dropAllConnections();
- }
- }
-
- /**
* Compute task with latch on routing node.
*/
@ComputeTaskName("TestLatchTask")
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
index f61083b..88ee8eb 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
@@ -319,7 +319,7 @@ public class ReliableChannelTest {
ClientBinaryMarshaller marsh = mock(ClientBinaryMarshaller.class);
TcpClientTransactions transactions = mock(TcpClientTransactions.class);
- TcpClientCache cache = new TcpClientCache("", rc, marsh, transactions,
false, null);
+ TcpClientCache cache = new TcpClientCache("", rc, marsh, transactions,
null, false, null);
GridTestUtils.assertThrowsWithCause(() -> op.accept(cache),
TestChannelException.class);
}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 48d346f..85255ce 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -18,6 +18,7 @@
package org.apache.ignite.client;
import org.apache.ignite.internal.client.thin.CacheAsyncTest;
+import org.apache.ignite.internal.client.thin.CacheEntryListenersTest;
import org.apache.ignite.internal.client.thin.ClusterApiTest;
import org.apache.ignite.internal.client.thin.ClusterGroupTest;
import org.apache.ignite.internal.client.thin.ComputeTaskTest;
@@ -56,6 +57,7 @@ import org.junit.runners.Suite;
ClusterApiTest.class,
ClusterGroupTest.class,
ServicesTest.class,
+ CacheEntryListenersTest.class,
ThinClientPartitionAwarenessStableTopologyTest.class,
ThinClientPartitionAwarenessUnstableTopologyTest.class,
ThinClientPartitionAwarenessResourceReleaseTest.class,