This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5c26896d886 IGNITE-22028 Java Thin: Implement invoke/invokeAll
operations - Fixes #11358.
5c26896d886 is described below
commit 5c26896d886ab6659b4aa303720830e27409877e
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri May 24 00:01:10 2024 +0300
IGNITE-22028 Java Thin: Implement invoke/invokeAll operations - Fixes
#11358.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../org/apache/ignite/snippets/JavaThinClient.java | 20 ++
docs/_docs/thin-client-comparison.csv | 7 +-
docs/_docs/thin-clients/java-thin-client.adoc | 20 ++
.../java/org/apache/ignite/client/ClientCache.java | 136 ++++++++
.../apache/ignite/client/ClientOperationType.java | 11 +
.../internal/client/thin/ClientJCacheAdapter.java | 18 +-
.../internal/client/thin/ClientOperation.java | 12 +
.../internal/client/thin/ClientServerError.java | 11 +
.../client/thin/ProtocolBitmaskFeature.java | 5 +-
.../internal/client/thin/TcpClientCache.java | 158 +++++++++
.../platform/client/ClientBitmaskFeature.java | 5 +-
.../platform/client/ClientMessageParser.java | 14 +
.../processors/platform/client/ClientStatus.java | 3 +
.../client/cache/ClientCacheInvokeAllRequest.java | 59 ++++
.../client/cache/ClientCacheInvokeAllResponse.java | 76 +++++
.../client/cache/ClientCacheInvokeRequest.java | 154 +++++++++
.../internal/client/thin/BlockingTxOpsTest.java | 27 ++
.../ignite/internal/client/thin/InvokeTest.java | 355 +++++++++++++++++++++
.../org/apache/ignite/client/ClientTestSuite.java | 2 +
19 files changed, 1082 insertions(+), 11 deletions(-)
diff --git
a/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/JavaThinClient.java
b/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/JavaThinClient.java
index b4979acd03d..50bb9cd3f93 100644
---
a/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/JavaThinClient.java
+++
b/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/JavaThinClient.java
@@ -521,4 +521,24 @@ public class JavaThinClient {
private static interface MyService {
public void myServiceMethod();
}
+
+ void entryProcessor() throws Exception {
+ ClientConfiguration clientCfg = new
ClientConfiguration().setAddresses("127.0.0.1:10800");
+
+ try (IgniteClient client = Ignition.startClient(clientCfg)) {
+ //tag::entry-processor[]
+ ClientCache<Integer, Integer> cache =
client.getOrCreateCache("myCache");
+ cache.invoke(0, new IncrementProcessor());
+ //end::entry-processor[]
+ }
+ }
+
+ //tag::entry-processor-class[]
+ public class IncrementProcessor implements EntryProcessor<Integer,
Integer, Integer> {
+ @Override public Integer process(MutableEntry<Integer, Integer> entry,
Object... arguments) {
+ entry.setValue(entry.getValue() == null ? 1 : entry.getValue() +
1);
+ return entry.getValue();
+ }
+ }
+ //end::entry-processor-class[]
}
diff --git a/docs/_docs/thin-client-comparison.csv
b/docs/_docs/thin-client-comparison.csv
index 1c9bd884543..aa42639dfb8 100644
--- a/docs/_docs/thin-client-comparison.csv
+++ b/docs/_docs/thin-client-comparison.csv
@@ -1,7 +1,7 @@
Thin Client Feature,Java,.NET,C++,Python,Node.js,PHP
-Scan Query,{yes},{yes},No,{yes},{yes},{yes}
+Scan Query,{yes},{yes},{yes},{yes},{yes},{yes}
Scan Query with a filter,{yes},{yes},No,No,No,No
-SqlFieldsQuery,{yes},{yes},No,{yes},{yes},{yes}
+SqlFieldsQuery,{yes},{yes},{yes},{yes},{yes},{yes}
Binary Object API,{yes},{yes},No,No,{yes},{yes}
Failover,{yes},{yes},{yes},{yes},{yes},{yes}
Async Operations,{yes},{yes},No,{yes},{yes},{yes}
@@ -16,4 +16,5 @@ Service invocation,{yes},{yes},No,No,No,No
Server Discovery,{yes},{yes},No,No,No,No
Server Discovery in Kubernetes,{yes},No,No,No,No,No
Data Streamer,No,{Yes},No,No,No,No
-Retry Policy,{yes},{yes},No,No,No,No
\ No newline at end of file
+Retry Policy,{yes},{yes},No,No,No,No
+Entry processor invocation,{yes},No,No,No,No,No
\ No newline at end of file
diff --git a/docs/_docs/thin-clients/java-thin-client.adoc
b/docs/_docs/thin-clients/java-thin-client.adoc
index 5a744d545c4..dcc6ac5a5ac 100644
--- a/docs/_docs/thin-clients/java-thin-client.adoc
+++ b/docs/_docs/thin-clients/java-thin-client.adoc
@@ -150,6 +150,26 @@ The following code snippet demonstrates how to execute
basic cache operations fr
include::{sourceCodeFile}[tag=key-value-operations,indent=0]
-------------------------------------------------------------------------------
+=== Entry Processor
+
+An entry processor is used to process cache entries on the nodes where they
are stored. An entry processor does not require the entry to be transferred to
the client in order to perform an operation on it. The operation is performed
remotely, and only the results are transmitted to the client.
+
+Define an entry processor as follows:
+
+[source, java]
+----
+include::{sourceCodeFile}[tags=entry-processor-class,indent=0]
+----
+
+NOTE: The classes of the entry processors must be available on the server
nodes of the cluster.
+
+Then invoke the entry processor:
+
+[source, java]
+----
+include::{sourceCodeFile}[tags=entry-processor,indent=0]
+----
+
=== Executing Scan Queries
Use the `ScanQuery<K, V>` class to get a set of entries that satisfy a given
condition. The thin client sends the query to the cluster node where it is
executed as a regular link:key-value-api/using-cache-queries[scan query].
diff --git
a/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
b/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
index 1e6bb917e8a..413b8e2c003 100644
--- a/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientCache.java
@@ -25,6 +25,9 @@ import java.util.UUID;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.event.CacheEntryListener;
import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.FieldsQueryCursor;
@@ -666,6 +669,139 @@ public interface ClientCache<K, V> {
*/
public IgniteClientFuture<Void> clearAllAsync(Set<? extends K> keys)
throws ClientException;
+ /**
+ * Invokes an {@link EntryProcessor} against the {@link
javax.cache.Cache.Entry} specified by
+ * the provided key. If an {@link javax.cache.Cache.Entry} does not exist
for the specified key,
+ * an attempt is made to load it (if a loader is configured) or a surrogate
+ * {@link javax.cache.Cache.Entry}, consisting of the key with a null
value is used instead.
+ * <p>
+ * An instance of entry processor must be stateless as it may be invoked
multiple times on primary and
+ * backup nodes in the cache. It is guaranteed that the value passed to
the entry processor will be always
+ * the same.
+ * <p>
+ *
+ * @param key The key to the entry.
+ * @param entryProc The {@link EntryProcessor} to invoke.
+ * @param arguments Additional arguments to pass to the {@link
EntryProcessor}.
+ * @param <T> Type of the cache entry processing result.
+ * @return The result of the processing, if any, defined by the {@link
EntryProcessor} implementation.
+ * @throws NullPointerException If key or {@link EntryProcessor} is null.
+ * @throws EntryProcessorException If an exception is thrown by the {@link
+ * EntryProcessor}, a Caching
Implementation
+ * must wrap any {@link Exception} thrown
+ * wrapped in an {@link
EntryProcessorException}.
+ * @throws ClientException If operation is failed.
+ */
+ public <T> T invoke(
+ K key,
+ EntryProcessor<K, V, T> entryProc,
+ Object... arguments
+ ) throws EntryProcessorException, ClientException;
+
+ /**
+ * Asynchronously invokes an {@link EntryProcessor} against the {@link
javax.cache.Cache.Entry} specified by
+ * the provided key. If an {@link javax.cache.Cache.Entry} does not exist
for the specified key,
+ * an attempt is made to load it (if a loader is configured) or a surrogate
+ * {@link javax.cache.Cache.Entry}, consisting of the key with a null
value is used instead.
+ * <p>
+ * An instance of entry processor must be stateless as it may be invoked
multiple times on primary and
+ * backup nodes in the cache. It is guaranteed that the value passed to
the entry processor will be always
+ * the same.
+ * <p>
+ *
+ * @param key The key to the entry.
+ * @param entryProc The {@link EntryProcessor} to invoke.
+ * @param arguments Additional arguments to pass to the {@link
EntryProcessor}.
+ * @param <T> Type of the cache entry processing result.
+ * @return Future representing pending completion of the operation.
+ * @throws NullPointerException If key or {@link EntryProcessor} is null.
+ * @throws ClientException If operation is failed.
+ */
+ public <T> IgniteClientFuture<T> invokeAsync(
+ K key,
+ EntryProcessor<K, V, T> entryProc,
+ Object... arguments
+ ) throws ClientException;
+
+ /**
+ * Invokes each {@link EntryProcessor} against the set of {@link
javax.cache.Cache.Entry}s specified by
+ * the set of keys.
+ * <p>
+ * If an {@link javax.cache.Cache.Entry} does not exist for the specified
key, an attempt is made
+ * to load it (if a loader is configured) or a surrogate {@link
javax.cache.Cache.Entry},
+ * consisting of the key and a value of null is provided.
+ * <p>
+ * The order that the entries for the keys are processed is undefined.
+ * Implementations may choose to process the entries in any order,
including
+ * concurrently. Furthermore there is no guarantee implementations will
+ * use the same {@link EntryProcessor} instance to process each entry, as
+ * the case may be in a non-local cache topology.
+ * <p>
+ * The result of executing the {@link EntryProcessor} is returned as a
+ * {@link Map} of {@link EntryProcessorResult}s, one result per key.
Should the
+ * {@link EntryProcessor} or Caching implementation throw an exception, the
+ * exception is wrapped and re-thrown when a call to
+ * {@link javax.cache.processor.EntryProcessorResult#get()} is made.
+ * <p>
+ * Keys are locked in the order in which they appear in key set. It is
caller's responsibility to
+ * make sure keys always follow same order, such as by using {@link
java.util.TreeSet}. Using unordered map,
+ * such as {@link java.util.HashSet}, while calling this method in
parallel <b>will lead to deadlock</b>.
+ *
+ * @param keys The set of keys for entries to proces.
+ * @param entryProc The EntryProcessor to invoke.
+ * @param args Additional arguments to pass to the {@link EntryProcessor}.
+ * @param <T> Type of the cache entry processing result.
+ * @return The map of {@link EntryProcessorResult}s of the processing per
key,
+ * if any, defined by the {@link EntryProcessor} implementation. No
mappings
+ * will be returned for {@link EntryProcessor}s that return a
+ * <code>null</code> value for a key.
+ * @throws NullPointerException If keys or {@link EntryProcessor} is null.
+ * @throws ClientException If operation is failed.
+ */
+ public <T> Map<K, EntryProcessorResult<T>> invokeAll(
+ Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProc,
+ Object... args
+ ) throws ClientException;
+
+ /**
+ * Asynchronously invokes each {@link EntryProcessor} against the set of
{@link javax.cache.Cache.Entry}s
+ * specified by the set of keys.
+ * <p>
+ * If an {@link javax.cache.Cache.Entry} does not exist for the specified
key, an attempt is made
+ * to load it (if a loader is configured) or a surrogate {@link
javax.cache.Cache.Entry},
+ * consisting of the key and a value of null is provided.
+ * <p>
+ * The order that the entries for the keys are processed is undefined.
+ * Implementations may choose to process the entries in any order,
including
+ * concurrently. Furthermore there is no guarantee implementations will
+ * use the same {@link EntryProcessor} instance to process each entry, as
+ * the case may be in a non-local cache topology.
+ * <p>
+ * The result of executing the {@link EntryProcessor} is returned in the
future as a
+ * {@link Map} of {@link EntryProcessorResult}s, one result per key.
Should the
+ * {@link EntryProcessor} or Caching implementation throw an exception, the
+ * exception is wrapped and re-thrown when a call to
+ * {@link javax.cache.processor.EntryProcessorResult#get()} is made.
+ * <p>
+ * Keys are locked in the order in which they appear in key set. It is
caller's responsibility to
+ * make sure keys always follow same order, such as by using {@link
java.util.TreeSet}. Using unordered map,
+ * such as {@link java.util.HashSet}, while calling this method in
parallel <b>will lead to deadlock</b>.
+ *
+ * @param keys The set of keys for entries to proces.
+ * @param entryProc The EntryProcessor to invoke.
+ * @param args Additional arguments to pass to the {@link EntryProcessor}.
+ * @param <T> Type of the cache entry processing result.
+ * @return Future representing pending completion of the operation.
+ * @throws NullPointerException If keys or {@link EntryProcessor} is null.
+ * @throws ClientException If operation is failed.
+ */
+ public <T> IgniteClientFuture<Map<K, EntryProcessorResult<T>>>
invokeAllAsync(
+ Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProc,
+ Object... args
+ ) throws ClientException;
+
/**
* Returns cache that will operate with binary objects.
* <p>
diff --git
a/modules/core/src/main/java/org/apache/ignite/client/ClientOperationType.java
b/modules/core/src/main/java/org/apache/ignite/client/ClientOperationType.java
index 17a64b27c26..913ec88107d 100644
---
a/modules/core/src/main/java/org/apache/ignite/client/ClientOperationType.java
+++
b/modules/core/src/main/java/org/apache/ignite/client/ClientOperationType.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client;
import java.util.Collection;
import java.util.Set;
+import javax.cache.processor.EntryProcessor;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -151,6 +152,16 @@ public enum ClientOperationType {
*/
CACHE_GET_AND_PUT_IF_ABSENT,
+ /**
+ * Get and put if absent ({@link ClientCache#invoke(Object,
EntryProcessor, Object...)}).
+ */
+ CACHE_INVOKE,
+
+ /**
+ * Get and put if absent ({@link ClientCache#invokeAll(Set,
EntryProcessor, Object...)}).
+ */
+ CACHE_INVOKE_ALL,
+
/**
* Scan query ({@link ClientCache#query(Query)}).
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientJCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientJCacheAdapter.java
index 45d3cf61bb9..bb78d820092 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientJCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientJCacheAdapter.java
@@ -136,15 +136,21 @@ class ClientJCacheAdapter<K, V> implements Cache<K, V> {
}
/** {@inheritDoc} */
- @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProc,
- Object... arguments) throws EntryProcessorException {
- throw new UnsupportedOperationException();
+ @Override public <T> T invoke(
+ K key,
+ EntryProcessor<K, V, T> entryProc,
+ Object... arguments
+ ) throws EntryProcessorException {
+ return delegate.invoke(key, entryProc, arguments);
}
/** {@inheritDoc} */
- @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<?
extends K> keys,
- EntryProcessor<K, V, T> entryProc, Object... arguments) {
- throw new UnsupportedOperationException();
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
+ Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProc,
+ Object... arguments
+ ) {
+ return delegate.invokeAll(keys, entryProc, arguments);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index c97ae47928a..9a2181ec9a3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -126,6 +126,12 @@ public enum ClientOperation {
/** Cache remove all conflict. */
CACHE_REMOVE_ALL_CONFLICT(1023),
+ /** Cache invoke. */
+ CACHE_INVOKE(1024),
+
+ /** Cache invoke all. */
+ CACHE_INVOKE_ALL(1025),
+
/** Cache partitions. */
CACHE_PARTITIONS(1101),
@@ -391,6 +397,12 @@ public enum ClientOperation {
case CACHE_GET_AND_PUT_IF_ABSENT:
return ClientOperationType.CACHE_GET_AND_PUT_IF_ABSENT;
+ case CACHE_INVOKE:
+ return ClientOperationType.CACHE_INVOKE;
+
+ case CACHE_INVOKE_ALL:
+ return ClientOperationType.CACHE_INVOKE_ALL;
+
case CACHE_CLEAR:
return ClientOperationType.CACHE_CLEAR_EVERYTHING;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServerError.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServerError.java
index 626a733a24b..cb5951beead 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServerError.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientServerError.java
@@ -24,6 +24,9 @@ public class ClientServerError extends ClientError {
/** Server error code. */
private final int code;
+ /** Server error message. */
+ private final String msg;
+
/** Serial version uid. */
private static final long serialVersionUID = 0L;
@@ -35,6 +38,7 @@ public class ClientServerError extends ClientError {
String.format("Ignite failed to process request [%s]: %s (server
status code [%s])", reqId, srvMsg, srvCode)
);
+ msg = srvMsg;
code = srvCode;
}
@@ -44,4 +48,11 @@ public class ClientServerError extends ClientError {
public int getCode() {
return code;
}
+
+ /**
+ * @return Server error message.
+ */
+ public String getServerErrorMessage() {
+ return msg;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
index 7676fcc323c..25c22ce8969 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
@@ -77,7 +77,10 @@ public enum ProtocolBitmaskFeature {
INDEX_QUERY_LIMIT(15),
/** Service topology. */
- SERVICE_TOPOLOGY(16);
+ SERVICE_TOPOLOGY(16),
+
+ /** Cache invoke/invokeAll operations. */
+ CACHE_INVOKE(17);
/** */
private static final EnumSet<ProtocolBitmaskFeature>
ALL_FEATURES_AS_ENUM_SET =
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index 130447a2309..3d280b40d7a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.client.thin;
import java.io.IOException;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -31,6 +32,10 @@ import javax.cache.configuration.FactoryBuilder;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListener;
import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.FieldsQueryCursor;
@@ -49,6 +54,7 @@ import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
import org.apache.ignite.client.IgniteClientFuture;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
@@ -56,8 +62,11 @@ import
org.apache.ignite.internal.binary.streams.BinaryOutputStream;
import org.apache.ignite.internal.cache.query.InIndexQueryCriterion;
import org.apache.ignite.internal.cache.query.RangeIndexQueryCriterion;
import
org.apache.ignite.internal.client.thin.TcpClientTransactions.TcpClientTransaction;
+import org.apache.ignite.internal.processors.cache.CacheInvokeResult;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -772,6 +781,155 @@ public class TcpClientCache<K, V> implements
ClientCache<K, V> {
);
}
+ /** {@inheritDoc} */
+ @Override public <T> T invoke(
+ K key,
+ EntryProcessor<K, V, T> entryProc,
+ Object... arguments
+ ) throws EntryProcessorException, ClientException {
+ if (key == null)
+ throw new NullPointerException("key");
+
+ if (entryProc == null)
+ throw new NullPointerException("entryProc");
+
+ try {
+ return cacheSingleKeyOperation(
+ key,
+ ClientOperation.CACHE_INVOKE,
+ req -> writeEntryProcessor(req, entryProc, arguments),
+ this::readObject
+ );
+ }
+ catch (Exception e) {
+ ClientServerError serverErr = X.cause(e, ClientServerError.class);
+
+ if (serverErr != null && serverErr.getCode() ==
ClientStatus.ENTRY_PROCESSOR_EXCEPTION)
+ throw new
EntryProcessorException(serverErr.getServerErrorMessage());
+ else
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteClientFuture<T> invokeAsync(
+ K key,
+ EntryProcessor<K, V, T> entryProc,
+ Object... arguments
+ ) throws ClientException {
+ if (key == null)
+ throw new NullPointerException("key");
+
+ if (entryProc == null)
+ throw new NullPointerException("entryProc");
+
+ CompletableFuture<T> resFut = new CompletableFuture<>();
+
+ IgniteClientFuture<T> opFut = cacheSingleKeyOperationAsync(
+ key,
+ ClientOperation.CACHE_INVOKE,
+ req -> writeEntryProcessor(req, entryProc, arguments),
+ this::readObject
+ );
+
+ opFut.whenComplete((res, err) -> {
+ ClientServerError serverErr = X.cause(err,
ClientServerError.class);
+
+ if (serverErr != null && serverErr.getCode() ==
ClientStatus.ENTRY_PROCESSOR_EXCEPTION)
+ resFut.completeExceptionally(new
EntryProcessorException(serverErr.getServerErrorMessage()));
+ else if (err != null)
+ resFut.completeExceptionally(err);
+ else
+ resFut.complete(res);
+ });
+
+ return new IgniteClientFutureImpl<>(resFut);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
+ Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProc,
+ Object... arguments
+ ) throws ClientException {
+ if (keys == null)
+ throw new NullPointerException("keys");
+
+ if (entryProc == null)
+ throw new NullPointerException("entryProc");
+
+ TcpClientTransaction tx = transactions.tx();
+
+ return txAwareService(null, tx,
+ ClientOperation.CACHE_INVOKE_ALL,
+ req -> {
+ writeKeys(keys, req, tx);
+ writeEntryProcessor(req, entryProc, arguments);
+ },
+ this::readEntryProcessorResult);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteClientFuture<Map<K, EntryProcessorResult<T>>>
invokeAllAsync(
+ Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProc,
+ Object... arguments
+ ) throws ClientException {
+ if (keys == null)
+ throw new NullPointerException("keys");
+
+ if (entryProc == null)
+ throw new NullPointerException("entryProc");
+
+ TcpClientTransaction tx = transactions.tx();
+
+ return txAwareServiceAsync(null, tx,
+ ClientOperation.CACHE_INVOKE_ALL,
+ req -> {
+ writeKeys(keys, req, tx);
+ writeEntryProcessor(req, entryProc, arguments);
+ },
+ this::readEntryProcessorResult);
+ }
+
+ /** */
+ private <T> void writeEntryProcessor(PayloadOutputChannel ch,
EntryProcessor<K, V, T> entryProc, Object... arguments) {
+ if
(!ch.clientChannel().protocolCtx().isFeatureSupported(ProtocolBitmaskFeature.CACHE_INVOKE))
+ throw new
ClientFeatureNotSupportedByServerException(ProtocolBitmaskFeature.CACHE_INVOKE);
+
+ writeObject(ch, entryProc);
+ ch.out().writeByte(JAVA_PLATFORM);
+ ch.out().writeInt(arguments.length);
+ for (int i = 0; i < arguments.length; i++)
+ writeObject(ch, arguments[i]);
+ }
+
+ /** */
+ private <T> Map<K, EntryProcessorResult<T>>
readEntryProcessorResult(PayloadInputChannel ch) {
+ try (BinaryReaderExImpl r = serDes.createBinaryReader(ch.in())) {
+ int cnt = r.readInt();
+ Map<K, EntryProcessorResult<T>> res = new LinkedHashMap<>();
+
+ for (int i = 0; i < cnt; i++) {
+ K key = readObject(ch);
+ EntryProcessorResult<T> val;
+
+ boolean success = r.readBoolean();
+ if (success)
+ val = CacheInvokeResult.fromResult(readObject(ch));
+ else
+ val = CacheInvokeResult.fromError(new
EntryProcessorException(r.readString()));
+
+ res.put(key, val);
+ }
+
+ return res;
+ }
+ catch (IOException e) {
+ throw new BinaryObjectException(e);
+ }
+ }
+
/** {@inheritDoc} */
@Override public <K1, V1> ClientCache<K1, V1> withKeepBinary() {
return keepBinary ? (ClientCache<K1, V1>)this :
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
index b0f673ac5ee..ffc966738dd 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
@@ -75,7 +75,10 @@ public enum ClientBitmaskFeature implements
ThinProtocolFeature {
INDEX_QUERY_LIMIT(15),
/** Service topology. */
- SERVICE_TOPOLOGY(16);
+ SERVICE_TOPOLOGY(16),
+
+ /** Cache invoke/invokeAll operations. */
+ CACHE_INVOKE(17);
/** */
private static final EnumSet<ClientBitmaskFeature>
ALL_FEATURES_AS_ENUM_SET =
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index 6a7562f1a1c..60a897685d3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -54,6 +54,8 @@ import
org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGe
import
org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetRequest;
import
org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetSizeRequest;
import
org.apache.ignite.internal.processors.platform.client.cache.ClientCacheIndexQueryRequest;
+import
org.apache.ignite.internal.processors.platform.client.cache.ClientCacheInvokeAllRequest;
+import
org.apache.ignite.internal.processors.platform.client.cache.ClientCacheInvokeRequest;
import
org.apache.ignite.internal.processors.platform.client.cache.ClientCacheLocalPeekRequest;
import
org.apache.ignite.internal.processors.platform.client.cache.ClientCacheNodePartitionsRequest;
import
org.apache.ignite.internal.processors.platform.client.cache.ClientCachePartitionsRequest;
@@ -199,6 +201,12 @@ public class ClientMessageParser implements
ClientListenerMessageParser {
/** */
private static final short OP_CACHE_REMOVE_ALL_CONFLICT = 1023;
+ /** */
+ private static final short OP_CACHE_INVOKE = 1024;
+
+ /** */
+ private static final short OP_CACHE_INVOKE_ALL = 1025;
+
/* Cache create / destroy, configuration. */
/** */
private static final short OP_CACHE_GET_NAMES = 1050;
@@ -549,6 +557,12 @@ public class ClientMessageParser implements
ClientListenerMessageParser {
case OP_CACHE_REMOVE_ALL_CONFLICT:
return new ClientCacheRemoveAllConflictRequest(reader);
+ case OP_CACHE_INVOKE:
+ return new ClientCacheInvokeRequest(reader);
+
+ case OP_CACHE_INVOKE_ALL:
+ return new ClientCacheInvokeAllRequest(reader);
+
case OP_CACHE_CREATE_WITH_NAME:
return new ClientCacheCreateWithNameRequest(reader);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientStatus.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientStatus.java
index f91c7b73769..a73f8c30a8f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientStatus.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientStatus.java
@@ -70,6 +70,9 @@ public final class ClientStatus {
/** Too many compute tasks. */
public static final int TOO_MANY_COMPUTE_TASKS = 1030;
+ /** Entry processor invokation error. */
+ public static final int ENTRY_PROCESSOR_EXCEPTION = 1040;
+
/** Authentication failed. */
public static final int AUTH_FAILED = 2000;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheInvokeAllRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheInvokeAllRequest.java
new file mode 100644
index 00000000000..58a324bdd58
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheInvokeAllRequest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.cache;
+
+import java.util.Map;
+import javax.cache.processor.EntryProcessorResult;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Cache invokeAll request.
+ */
+public class ClientCacheInvokeAllRequest extends ClientCacheKeysRequest {
+ /** */
+ private final ClientCacheInvokeRequest.EntryProcessorReader
entryProcReader;
+
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientCacheInvokeAllRequest(BinaryReaderExImpl reader) {
+ super(reader);
+
+ entryProcReader = new
ClientCacheInvokeRequest.EntryProcessorReader(reader);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientResponse process(ClientConnectionContext ctx) {
+ Map<Object, EntryProcessorResult<Object>> val =
cache(ctx).invokeAll(keys(),
+ entryProcReader.getEntryProcessor(),
entryProcReader.getArgs(isKeepBinary()));
+
+ return new ClientCacheInvokeAllResponse(requestId(), val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse>
processAsync(ClientConnectionContext ctx) {
+ return chainFuture(
+ cache(ctx).invokeAllAsync(keys(),
entryProcReader.getEntryProcessor(), entryProcReader.getArgs(isKeepBinary())),
+ v -> new ClientCacheInvokeAllResponse(requestId(), v));
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheInvokeAllResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheInvokeAllResponse.java
new file mode 100644
index 00000000000..fc90837d97d
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheInvokeAllResponse.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.cache;
+
+import java.util.Map;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * InvokeAll response.
+ */
+class ClientCacheInvokeAllResponse extends ClientResponse {
+ /** Result. */
+ private final Map<Object, EntryProcessorResult<Object>> res;
+
+ /**
+ * Ctor.
+ *
+ * @param reqId Request id.
+ * @param res Result.
+ */
+ ClientCacheInvokeAllResponse(long reqId, Map<Object,
EntryProcessorResult<Object>> res) {
+ super(reqId);
+
+ assert res != null;
+
+ this.res = res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void encode(ClientConnectionContext ctx,
BinaryRawWriterEx writer) {
+ super.encode(ctx, writer);
+
+ writer.writeInt(res.size());
+
+ for (Map.Entry<Object, EntryProcessorResult<Object>> entry :
res.entrySet()) {
+ writer.writeObjectDetached(entry.getKey());
+ EntryProcessorResult<Object> epRes = entry.getValue();
+
+ try {
+ Object val = epRes.get();
+ writer.writeBoolean(true);
+ writer.writeObjectDetached(val);
+ }
+ catch (EntryProcessorException e) {
+ writer.writeBoolean(false);
+ String msg = e.getMessage();
+
+ if
(ctx.kernalContext().clientListener().sendServerExceptionStackTraceToClient())
+ msg += U.nl() + X.getFullStackTrace(e);
+
+ writer.writeString(msg);
+ }
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheInvokeRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheInvokeRequest.java
new file mode 100644
index 00000000000..f8e139785a1
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheInvokeRequest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.cache;
+
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.binary.BinaryObjectImpl;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import
org.apache.ignite.internal.processors.platform.client.ClientObjectResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientPlatform;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientStatus;
+import
org.apache.ignite.internal.processors.platform.client.IgniteClientException;
+import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Cache invoke request.
+ */
+public class ClientCacheInvokeRequest extends ClientCacheKeyRequest {
+ /** */
+ private final EntryProcessorReader entryProcReader;
+
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientCacheInvokeRequest(BinaryReaderExImpl reader) {
+ super(reader);
+
+ entryProcReader = new EntryProcessorReader(reader);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientResponse process0(ClientConnectionContext ctx) {
+ try {
+ Object val = cache(ctx).invoke(key(),
entryProcReader.getEntryProcessor(),
+ entryProcReader.getArgs(isKeepBinary()));
+
+ return new ClientObjectResponse(requestId(), val);
+ }
+ catch (EntryProcessorException e) {
+ throw new
IgniteClientException(ClientStatus.ENTRY_PROCESSOR_EXCEPTION, e.getMessage(),
e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteInternalFuture<ClientResponse>
processAsync0(ClientConnectionContext ctx) {
+ return chainFuture(
+ cache(ctx).invokeAsync(key(), entryProcReader.getEntryProcessor(),
entryProcReader.getArgs(isKeepBinary())),
+ v -> new ClientObjectResponse(requestId(), v)).chain(f -> {
+ try {
+ return f.get();
+ }
+ catch (Exception e) {
+ Exception e0 = U.unwrap(e);
+
+ if (X.hasCause(e0, EntryProcessorException.class))
+ throw new
IgniteClientException(ClientStatus.ENTRY_PROCESSOR_EXCEPTION, e0.getMessage(),
e0);
+ else
+ throw new GridClosureException(e0);
+ }
+ });
+ }
+
+ /** Helper class to read entry processor and it's arguments. */
+ public static class EntryProcessorReader {
+ /** */
+ private final Object entryProc;
+
+ /** */
+ private final byte entryProcPlatform;
+
+ /** */
+ private final Object[] args;
+
+ /** Objects reader. */
+ private final BinaryReaderExImpl reader;
+
+ /** */
+ private final int argsStartPos;
+
+ /** */
+ public EntryProcessorReader(BinaryReaderExImpl reader) {
+ entryProc = reader.readObjectDetached();
+ entryProcPlatform = reader.readByte();
+
+ int argCnt = reader.readInt();
+
+ // We can't deserialize some types (arrays of user defined types
for example) from detached objects.
+ // On the other hand, deserialize should be done as part of
process() call (not in constructor) for proper
+ // error handling.
+ // To overcome these issues we store binary reader reference,
parse request in constructor (by reading detached
+ // objects), restore arguments starting position in input stream
and deserialize arguments from input stream
+ // in process() method.
+ this.reader = reader;
+ argsStartPos = reader.in().position();
+
+ args = new Object[argCnt];
+
+ for (int i = 0; i < argCnt; i++)
+ args[i] = reader.readObjectDetached();
+ }
+
+ /** */
+ public EntryProcessor<Object, Object, Object> getEntryProcessor() {
+ if (!(entryProc instanceof BinaryObject)) {
+ throw new IgniteClientException(ClientStatus.FAILED,
+ "Entry processor should be marshalled as a BinaryObject: "
+ entryProc.getClass());
+ }
+
+ BinaryObjectImpl bo = (BinaryObjectImpl)entryProc;
+
+ if (entryProcPlatform == ClientPlatform.JAVA)
+ return bo.deserialize();
+
+ throw new IgniteClientException(ClientStatus.FAILED, "Unsupported
entry processor platform: " +
+ entryProcPlatform);
+ }
+
+ /** */
+ public Object[] getArgs(boolean keepBinary) {
+ reader.in().position(argsStartPos);
+
+ // Deserialize entry processor's arguments when not in keepBinary
mode.
+ if (!keepBinary && args.length > 0) {
+ for (int i = 0; i < args.length; i++)
+ args[i] = reader.readObject();
+ }
+
+ return args;
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/BlockingTxOpsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/BlockingTxOpsTest.java
index 0b242d393cb..b5f6e494ba4 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/BlockingTxOpsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/BlockingTxOpsTest.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.client.thin;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientCacheConfiguration;
@@ -234,6 +236,20 @@ public class BlockingTxOpsTest extends
AbstractThinClientTest {
() -> cache.replace(0, 1),
() -> assertEquals(1, cache.get(0))
);
+
+ // Invoke operation.
+ checkOpMultithreaded(client,
+ null,
+ () -> cache.invoke(0, new TestEntryProcessor(), 0),
+ () -> assertEquals(0, cache.get(0))
+ );
+
+ // Invoke all operation.
+ checkOpMultithreaded(client,
+ null,
+ () -> cache.invokeAll(new TreeSet<>(F.asList(0, 1)),
new TestEntryProcessor(), 0),
+ () -> assertEquals(F.asMap(0, 0, 1, 0),
cache.getAll(new TreeSet<>(F.asList(0, 1))))
+ );
}
}
}
@@ -370,4 +386,15 @@ public class BlockingTxOpsTest extends
AbstractThinClientTest {
}
}
}
+
+ /** */
+ static class TestEntryProcessor implements EntryProcessor<Object, Object,
Object> {
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Object, Object> e,
Object... args) {
+ if (args != null && args.length >= 1)
+ e.setValue(args[0]);
+
+ return null;
+ }
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/InvokeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/InvokeTest.java
new file mode 100644
index 00000000000..dc1b5e90e69
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/InvokeTest.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientCacheConfiguration;
+import org.apache.ignite.client.ClientTransaction;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.IgniteClientFuture;
+import org.apache.ignite.client.Person;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Checks entry processor invocation for thin client.
+ */
+@RunWith(Parameterized.class)
+public class InvokeTest extends AbstractThinClientTest {
+ /** */
+ private static final int NODE_CNT = 3;
+
+ /** Client. */
+ private static IgniteClient client;
+
+ /** Client. */
+ private static ClientCache<Integer, Object> cache;
+
+ /** */
+ @Parameterized.Parameter
+ public boolean atomic;
+
+ /** */
+ @Parameterized.Parameters(name = "Atomic: {0}")
+ public static Collection<Object> params() {
+ return F.asList(true, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(NODE_CNT);
+
+ client = startClient(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cache = client.getOrCreateCache(new ClientCacheConfiguration()
+ .setName(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(atomic ? CacheAtomicityMode.ATOMIC :
CacheAtomicityMode.TRANSACTIONAL)
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ client.destroyCache(DEFAULT_CACHE_NAME);
+ }
+
+ /**
+ * Test cache invoke operations, simple case.
+ */
+ @Test
+ public void testInvokeSimpleCase() {
+ assertEquals(0, (int)cache.invoke(0, new IncrementProcessor()));
+ assertEquals(1, (int)cache.invoke(0, new IncrementProcessor()));
+ }
+
+ /**
+ * Test cache invokeAll operations, simple case.
+ */
+ @Test
+ public void testInvokeAllSimpleCase() {
+ Map<Integer, EntryProcessorResult<Integer>> map = cache.invokeAll(new
HashSet<>(
+ F.asList(0, 1)), new IncrementProcessor());
+
+ assertEquals(2, map.size());
+ assertEquals((Integer)0, map.get(0).get());
+ assertEquals((Integer)0, map.get(1).get());
+
+ map = cache.invokeAll(new HashSet<>(F.asList(1, 2)), new
IncrementProcessor());
+ assertEquals(2, map.size());
+ assertEquals((Integer)1, map.get(1).get());
+ assertEquals((Integer)0, map.get(2).get());
+ }
+
+ /**
+ * Test async cache invoke/invokeAll.
+ */
+ @Test
+ public void testAsync() throws Exception {
+ assertEquals(3, cache.invokeAsync(0, new TestEntryProcessor(), 1, 2,
3).get());
+ assertEquals(2, cache.get(0));
+
+ Map<Integer, EntryProcessorResult<Object>> map =
cache.invokeAllAsync(new HashSet<>(
+ F.asList(0, 1)), new TestEntryProcessor(), 1, 2, 3).get();
+
+ assertEquals(2, map.size());
+ assertEquals(3, map.get(0).get());
+ assertEquals(3, map.get(1).get());
+ assertEquals(1, cache.get(0));
+ assertEquals(2, cache.get(1));
+ }
+
+ /**
+ * Test exception handling.
+ */
+ @Test
+ public void testExceptionHandling() {
+ try {
+ cache.invoke(0, new FailingEntryProcessor());
+ fail();
+ }
+ catch (EntryProcessorException e) {
+ assertTrue("Failed".equals(e.getMessage()));
+ }
+
+ Map<Integer, EntryProcessorResult<Object>> res = cache.invokeAll(
+ new HashSet<>(F.asList(0, 1)), new FailingEntryProcessor());
+
+ assertEquals(2, res.size());
+
+ try {
+ res.get(0).get();
+ fail();
+ }
+ catch (EntryProcessorException e) {
+ assertTrue("Failed".equals(e.getMessage()));
+ }
+
+ try {
+ res.get(1).get();
+ fail();
+ }
+ catch (EntryProcessorException e) {
+ assertTrue("Failed".equals(e.getMessage()));
+ }
+ }
+
+ /**
+ * Test exception handling by async operations.
+ */
+ @Test
+ public void testAsyncExceptionHandling() throws Exception {
+ IgniteClientFuture<?> fut = cache.invokeAsync(0, new
FailingEntryProcessor());
+
+ try {
+ fut.get();
+ fail();
+ }
+ catch (ExecutionException e) {
+ assertTrue(X.hasCause(e, "Failed", EntryProcessorException.class));
+ }
+
+ Map<Integer, EntryProcessorResult<Object>> res = cache.invokeAllAsync(
+ new HashSet<>(F.asList(0, 1)), new FailingEntryProcessor()).get();
+
+ assertEquals(2, res.size());
+
+ try {
+ res.get(0).get();
+ fail();
+ }
+ catch (EntryProcessorException e) {
+ assertTrue("Failed".equals(e.getMessage()));
+ }
+
+ try {
+ res.get(1).get();
+ fail();
+ }
+ catch (EntryProcessorException e) {
+ assertTrue("Failed".equals(e.getMessage()));
+ }
+ }
+
+ /**
+ * Test withKeepBinary flag.
+ */
+ @Test
+ public void testWithKeepBinary() {
+ Person person = new Person(0, "name");
+
+ ClientCache<Integer, Object> keepBinaryCache = cache.withKeepBinary();
+ Object res = keepBinaryCache.invoke(0, new
BinaryObjectEntryProcessor(), person);
+
+ assertEquals(client.binary().toBinary(person), res);
+
+ try {
+ cache.invoke(0, new BinaryObjectEntryProcessor(), person);
+ fail();
+ }
+ catch (EntryProcessorException ignore) {
+ // Expected.
+ }
+ }
+
+ /**
+ * Test arguments and result serialization.
+ */
+ @Test
+ public void testSerialization() {
+ checkSerialization(1, 2, 3);
+
+ Person p1 = new Person(1, "name1");
+ Person p2 = new Person(2, "name2");
+ Person p3 = new Person(3, "name3");
+ checkSerialization(p1, p2, p3);
+ checkSerialization(new Object[] {p1, p2}, new Object[] {p2, p3}, new
Object[] {p3, p1});
+ checkSerialization(F.asList(p1, p2), F.asList(p2, p3), F.asList(p3,
p1));
+ }
+
+ /** */
+ private void checkSerialization(Object valIfExists, Object valIfNotExists,
Object retVal) {
+ // Remove keys.
+ cache.invoke(0, new TestEntryProcessor(), null, null, null);
+ cache.invokeAll(new HashSet<>(F.asList(0, 1)), new
TestEntryProcessor(), null, null, null);
+
+ Object res = cache.invoke(0, new TestEntryProcessor(), valIfExists,
valIfNotExists, retVal);
+ assertEqualsArraysAware(res, retVal);
+ assertEqualsArraysAware(valIfNotExists, cache.get(0));
+
+ cache.put(0, 0); // Reset value for existing key.
+ cache.invoke(0, new TestEntryProcessor(), valIfExists, valIfNotExists,
retVal);
+ assertEqualsArraysAware(valIfExists, cache.get(0));
+
+ cache.put(0, 0); // Reset value for existing key.
+ Map<Integer, EntryProcessorResult<Object>> resMap =
cache.invokeAll(new HashSet<>(F.asList(0, 1)),
+ new TestEntryProcessor(), valIfExists, valIfNotExists, retVal);
+ assertEquals(2, resMap.size());
+ assertEqualsArraysAware(retVal, resMap.get(0).get());
+ assertEqualsArraysAware(retVal, resMap.get(1).get());
+ assertEqualsArraysAware(valIfExists, cache.get(0));
+ assertEqualsArraysAware(valIfNotExists, cache.get(1));
+ }
+
+ /**
+ * Test that invoke/invokeAll is transactional.
+ */
+ @Test
+ public void testExplicitTx() {
+ Assume.assumeFalse(atomic);
+
+ try (ClientTransaction tx = client.transactions().txStart()) {
+ cache.invoke(0, new TestEntryProcessor(), 1, 2, 3);
+ assertEquals(2, cache.get(0));
+
+ cache.invoke(0, new TestEntryProcessor(), 1, 2, 3);
+ assertEquals(1, cache.get(0));
+
+ cache.invokeAll(new HashSet<>(F.asList(0, 1)), new
TestEntryProcessor(), 1, 2, 3);
+
+ assertEquals(F.asMap(0, 1, 1, 2), cache.getAll(new
HashSet<>(F.asList(0, 1))));
+
+ tx.rollback();
+ }
+
+ assertFalse(cache.containsKey(0));
+ assertFalse(cache.containsKey(1));
+
+ try (ClientTransaction tx = client.transactions().txStart()) {
+ cache.invoke(0, new TestEntryProcessor(), 1, 2, 3);
+ cache.invokeAll(new HashSet<>(F.asList(0, 1)), new
TestEntryProcessor(), 1, 2, 3);
+
+ tx.commit();
+ }
+
+ assertEquals(F.asMap(0, 1, 1, 2), cache.getAll(new
HashSet<>(F.asList(0, 1))));
+
+ }
+
+ /** */
+ protected static class IncrementProcessor implements
EntryProcessor<Integer, Object, Integer> {
+ /** {@inheritDoc} */
+ @Override public Integer process(MutableEntry<Integer, Object> e,
Object... arguments) {
+ Integer val = (Integer)e.getValue();
+
+ e.setValue(val == null ? 0 : val + 1);
+
+ return (Integer)e.getValue();
+ }
+ }
+
+ /** */
+ protected static class TestEntryProcessor implements
EntryProcessor<Integer, Object, Object> {
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Integer, Object> e,
Object... arguments) {
+ // arguments[0] - value if exists.
+ // arguments[1] - value if not exists.
+ // arguments[2] - returned value.
+ if (arguments == null || arguments.length < 3)
+ throw new EntryProcessorException("Unexpected arguments: " +
Arrays.toString(arguments));
+
+ if (arguments[0] == null)
+ e.remove();
+ else
+ e.setValue(e.exists() ? arguments[0] : arguments[1]);
+
+ return arguments[2];
+ }
+ }
+
+ /** */
+ protected static class FailingEntryProcessor implements
EntryProcessor<Integer, Object, Object> {
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Integer, Object> e,
Object... arguments) {
+ throw new EntryProcessorException("Failed");
+ }
+ }
+
+ /** */
+ protected static class BinaryObjectEntryProcessor implements
EntryProcessor<Integer, Object, Object> {
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Integer, Object> e,
Object... arguments) {
+ if (arguments == null || arguments.length < 1 || !(arguments[0]
instanceof BinaryObject))
+ throw new EntryProcessorException("Expected binary object
argument");
+
+ return arguments[0];
+ }
+ }
+}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 121bd995b80..a013e7f69b4 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -30,6 +30,7 @@ import
org.apache.ignite.internal.client.thin.DataReplicationOperationsTest;
import org.apache.ignite.internal.client.thin.FunctionalTest;
import org.apache.ignite.internal.client.thin.IgniteSetTest;
import org.apache.ignite.internal.client.thin.InactiveClusterCacheRequestTest;
+import org.apache.ignite.internal.client.thin.InvokeTest;
import org.apache.ignite.internal.client.thin.MetadataRegistrationTest;
import
org.apache.ignite.internal.client.thin.OptimizedMarshallerClassesCachedTest;
import org.apache.ignite.internal.client.thin.ReliableChannelTest;
@@ -94,6 +95,7 @@ import org.junit.runners.Suite;
AffinityMetricsTest.class,
ClusterGroupClusterRestartTest.class,
BlockingTxOpsTest.class,
+ InvokeTest.class,
})
public class ClientTestSuite {
// No-op.