This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new bc4855de8b IGNITE-22842 Improve synchronous client API performance 
(#4488)
bc4855de8b is described below

commit bc4855de8b290434d70cdaf0e7a68ad0157c00b3
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Thu Oct 3 19:41:46 2024 +0300

    IGNITE-22842 Improve synchronous client API performance (#4488)
---
 .../ignite/internal/client/ReliableChannel.java    |   6 +-
 .../ignite/internal/client/TcpClientChannel.java   | 155 ++++++++-----
 .../ignite/internal/client/table/ClientTable.java  |  19 +-
 .../table/api/PublicApiClientKeyValueView.java     | 250 +++++++++++++++++++++
 .../table/api/PublicApiClientRecordView.java       | 224 ++++++++++++++++++
 .../client/table/api/PublicApiClientViewBase.java  | 107 +++++++++
 6 files changed, 690 insertions(+), 71 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index ce56bb44a6..b8759c8e0a 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.client;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 import static 
org.apache.ignite.lang.ErrorGroups.Client.CLUSTER_ID_MISMATCH_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Client.CONFIGURATION_ERR;
@@ -36,7 +37,6 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
@@ -728,9 +728,7 @@ public final class ReliableChannel implements AutoCloseable 
{
 
     @Nullable
     private static IgniteClientConnectionException 
unwrapConnectionException(Throwable err) {
-        while (err instanceof CompletionException) {
-            err = err.getCause();
-        }
+        err = unwrapCause(err);
 
         if (!(err instanceof IgniteClientConnectionException)) {
             return null;
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 54eac0c4ce..80012c5436 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.client;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.util.ExceptionUtils.copyExceptionWithCause;
 import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
 import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
@@ -62,6 +64,7 @@ import 
org.apache.ignite.internal.future.timeout.TimeoutObject;
 import org.apache.ignite.internal.future.timeout.TimeoutWorker;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.thread.PublicApiThreading;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.util.ViewUtils;
 import org.apache.ignite.lang.ErrorGroups.Table;
@@ -96,7 +99,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     private final AtomicLong reqId = new AtomicLong(1);
 
     /** Pending requests. */
-    private final ConcurrentMap<Long, ClientRequestFuture<?>> pendingReqs = 
new ConcurrentHashMap<>();
+    private final ConcurrentMap<Long, TimeoutObjectImpl> pendingReqs = new 
ConcurrentHashMap<>();
 
     /** Notification handlers. */
     private final Map<Long, CompletableFuture<PayloadInputChannel>> 
notificationHandlers = new ConcurrentHashMap<>();
@@ -251,8 +254,8 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                 sock.close();
             }
 
-            for (ClientRequestFuture<?> pendingReq : pendingReqs.values()) {
-                pendingReq.completeExceptionally(
+            for (TimeoutObjectImpl pendingReq : pendingReqs.values()) {
+                pendingReq.future().completeExceptionally(
                         new IgniteClientConnectionException(CONNECTION_ERR, 
"Channel is closed", endpoint(), cause));
             }
 
@@ -272,13 +275,13 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     /** {@inheritDoc} */
     @Override
     public void onMessage(ByteBuf buf) {
-        asyncContinuationExecutor.execute(() -> {
-            try (var unpacker = new ClientMessageUnpacker(buf)) {
-                processNextMessage(unpacker);
-            } catch (Throwable t) {
-                close(t, false);
-            }
-        });
+        try {
+            var unpacker = new ClientMessageUnpacker(buf);
+
+            processNextMessage(unpacker);
+        } catch (Throwable t) {
+            close(t, false);
+        }
     }
 
     /** {@inheritDoc} */
@@ -315,12 +318,12 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                 notificationHandlers.put(id, notificationFut);
             }
 
-            ClientRequestFuture<T> fut = send(opCode, id, payloadWriter, 
payloadReader, notificationFut, operationTimeout);
+            CompletableFuture<T> fut = send(opCode, id, payloadWriter, 
payloadReader, notificationFut, operationTimeout);
 
             return fut;
 
         } catch (Throwable t) {
-            return CompletableFuture.failedFuture(t);
+            return failedFuture(t);
         }
     }
 
@@ -333,7 +336,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
      * @param notificationFut Optional notification future.
      * @return Request future.
      */
-    private <T> ClientRequestFuture<T> send(
+    private <T> CompletableFuture<T> send(
             int opCode,
             long id,
             @Nullable PayloadWriter payloadWriter,
@@ -345,9 +348,9 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             throw new IgniteClientConnectionException(CONNECTION_ERR, "Channel 
is closed", endpoint());
         }
 
-        ClientRequestFuture<T> fut = new ClientRequestFuture<>(payloadReader, 
notificationFut, timeout);
+        var fut = new CompletableFuture<ClientMessageUnpacker>();
 
-        pendingReqs.put(id, fut);
+        pendingReqs.put(id, new TimeoutObjectImpl(timeout, fut));
 
         metrics.requestsActiveIncrement();
 
@@ -380,7 +383,26 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                 }
             });
 
-            return fut;
+            if (PublicApiThreading.executingSyncPublicApi()) {
+                try {
+                    ClientMessageUnpacker unpacker = fut.get();
+
+                    return completedFuture(complete(payloadReader, 
notificationFut, unpacker));
+                } catch (Exception e) {
+                    return failedFuture(e);
+                }
+            }
+
+            return fut.handleAsync(
+                    (unpacker, err) -> {
+                        if (err != null) {
+                            throw sneakyThrow(err);
+                        }
+
+                        return complete(payloadReader, notificationFut, 
unpacker);
+                    },
+                    asyncContinuationExecutor
+            );
         } catch (Throwable t) {
             log.warn("Failed to send request [id=" + id + ", op=" + opCode + 
", remoteAddress=" + cfg.getAddress() + "]: "
                     + t.getMessage(), t);
@@ -398,22 +420,26 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     /**
      * Completes the request future.
      *
-     * @param pendingReq    Request future.
+     * @param payloadReader Payload reader.
+     * @param notificationFut Notify future.
+     * @param unpacker Unpacked message.
      */
-    private <T> void complete(ClientRequestFuture<T> pendingReq, 
ClientMessageUnpacker unpacker) {
-        if (pendingReq.payloadReader == null) {
-            pendingReq.complete(null);
-        } else {
-            try {
-                T res = pendingReq.payloadReader.apply(new 
PayloadInputChannel(this, unpacker, pendingReq.notificationFut));
-                pendingReq.complete(res);
-            } catch (Throwable e) {
-                log.error("Failed to deserialize server response 
[remoteAddress=" + cfg.getAddress() + "]: " + e.getMessage(), e);
-
-                pendingReq.completeExceptionally(
-                        new IgniteException(PROTOCOL_ERR, "Failed to 
deserialize server response: " + e.getMessage(), e));
+    private <T> T complete(
+            PayloadReader<T> payloadReader,
+            CompletableFuture<PayloadInputChannel> notificationFut,
+            ClientMessageUnpacker unpacker
+    ) {
+        try (unpacker) {
+            if (payloadReader != null) {
+                return payloadReader.apply(new PayloadInputChannel(this, 
unpacker, notificationFut));
             }
+        } catch (Throwable e) {
+            log.error("Failed to deserialize server response [remoteAddress=" 
+ cfg.getAddress() + "]: " + e.getMessage(), e);
+
+            throw new IgniteException(PROTOCOL_ERR, "Failed to deserialize 
server response: " + e.getMessage(), e);
         }
+
+        return null;
     }
 
     /**
@@ -422,7 +448,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     private void processNextMessage(ClientMessageUnpacker unpacker) throws 
IgniteException {
         if (protocolCtx == null) {
             // Process handshake.
-            complete(pendingReqs.remove(-1L), unpacker);
+            pendingReqs.remove(-1L).future().complete(unpacker);
             return;
         }
 
@@ -436,12 +462,15 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
 
         if (ResponseFlags.getNotificationFlag(flags)) {
             handleNotification(resId, unpacker, err);
+
             return;
         }
 
-        ClientRequestFuture<?> pendingReq = pendingReqs.remove(resId);
+        TimeoutObjectImpl pendingReq = pendingReqs.remove(resId);
 
         if (pendingReq == null) {
+            unpacker.close();
+
             log.error("Unexpected response ID [remoteAddress=" + 
cfg.getAddress() + "]: " + resId);
 
             throw new IgniteClientConnectionException(PROTOCOL_ERR, 
String.format("Unexpected response ID [%s]", resId), endpoint());
@@ -451,11 +480,15 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
 
         if (err == null) {
             metrics.requestsCompletedIncrement();
-            complete(pendingReq, unpacker);
+
+            pendingReq.future().complete(unpacker);
         } else {
             metrics.requestsFailedIncrement();
             notificationHandlers.remove(resId);
-            pendingReq.completeExceptionally(err);
+
+            unpacker.close();
+
+            pendingReq.future().completeExceptionally(err);
         }
     }
 
@@ -484,7 +517,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             throw new IgniteClientConnectionException(PROTOCOL_ERR, 
String.format("Unexpected notification ID [%s]", id), endpoint());
         }
 
-        try {
+        try (unpacker) {
             if (err != null) {
                 handler.completeExceptionally(err);
             } else {
@@ -581,10 +614,9 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     }
 
     /** Client handshake. */
-    private CompletableFuture<Object> handshakeAsync(ProtocolVersion ver)
-            throws IgniteClientConnectionException {
-        ClientRequestFuture<Object> fut = new ClientRequestFuture<>(r -> 
handshakeRes(r.in()), null, connectTimeout);
-        pendingReqs.put(-1L, fut);
+    private CompletableFuture<Object> handshakeAsync(ProtocolVersion ver) 
throws IgniteClientConnectionException {
+        var fut = new CompletableFuture<ClientMessageUnpacker>();
+        pendingReqs.put(-1L, new TimeoutObjectImpl(connectTimeout, fut));
 
         handshakeReqAsync(ver).addListener(f -> {
             if (!f.isSuccess()) {
@@ -594,20 +626,24 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         });
 
         return fut
-                .handle((res, err) -> {
+                .handleAsync((unpacker, err) -> {
                     if (err != null) {
                         if (err instanceof TimeoutException || err.getCause() 
instanceof TimeoutException) {
                             metrics.handshakesFailedTimeoutIncrement();
                             throw new 
IgniteClientConnectionException(CONNECTION_ERR, "Handshake timeout", 
endpoint(), err);
-                        } else {
-                            metrics.handshakesFailedIncrement();
                         }
 
+                        metrics.handshakesFailedIncrement();
                         throw new 
IgniteClientConnectionException(CONNECTION_ERR, "Handshake error", endpoint(), 
err);
                     }
 
-                    return res;
-                });
+                    try {
+                        return complete(r -> handshakeRes(r.in()), null, 
unpacker);
+                    } catch (Throwable th) {
+                        metrics.handshakesFailedIncrement();
+                        throw new 
IgniteClientConnectionException(CONNECTION_ERR, "Handshake error", endpoint(), 
th);
+                    }
+                }, asyncContinuationExecutor);
     }
 
     /**
@@ -769,25 +805,24 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     }
 
     /**
-     * Client request future.
+     * Timeout object wrapper for the completable future.
      */
-    private static class ClientRequestFuture<T> extends CompletableFuture<T> 
implements TimeoutObject<CompletableFuture<T>> {
-        @Nullable
-        private final PayloadReader<T> payloadReader;
-
-        @Nullable
-        private final CompletableFuture<PayloadInputChannel> notificationFut;
-
+    private static class TimeoutObjectImpl implements 
TimeoutObject<CompletableFuture<ClientMessageUnpacker>> {
+        /** End time (milliseconds since Unix epoch). */
         private final long endTime;
 
-        private ClientRequestFuture(
-                @Nullable PayloadReader<T> payloadReader,
-                @Nullable CompletableFuture<PayloadInputChannel> 
notificationFut,
-                long timeout
-        ) {
-            this.payloadReader = payloadReader;
-            this.notificationFut = notificationFut;
+        /** Target future. */
+        private final CompletableFuture<ClientMessageUnpacker> fut;
+
+        /**
+         * Constructor.
+         *
+         * @param timeout Timeout in milliseconds.
+         * @param fut Target future.
+         */
+        public TimeoutObjectImpl(long timeout, 
CompletableFuture<ClientMessageUnpacker> fut) {
             this.endTime = timeout > 0 ? coarseCurrentTimeMillis() + timeout : 
0;
+            this.fut = fut;
         }
 
         @Override
@@ -796,8 +831,8 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         }
 
         @Override
-        public CompletableFuture<T> future() {
-            return this;
+        public CompletableFuture<ClientMessageUnpacker> future() {
+            return fut;
         }
     }
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index 7a475b8985..8ab926ef08 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -40,6 +40,8 @@ import 
org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.proto.ColumnTypeConverter;
 import org.apache.ignite.internal.client.sql.ClientSql;
+import org.apache.ignite.internal.client.table.api.PublicApiClientKeyValueView;
+import org.apache.ignite.internal.client.table.api.PublicApiClientRecordView;
 import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
 import org.apache.ignite.internal.client.tx.ClientTransaction;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -152,12 +154,12 @@ public class ClientTable implements Table {
     public <R> RecordView<R> recordView(Mapper<R> recMapper) {
         Objects.requireNonNull(recMapper);
 
-        return new ClientRecordView<>(this, sql, recMapper);
+        return new PublicApiClientRecordView<>(new ClientRecordView<>(this, 
sql, recMapper));
     }
 
     @Override
     public RecordView<Tuple> recordView() {
-        return new ClientRecordBinaryView(this, sql);
+        return new PublicApiClientRecordView<>(new 
ClientRecordBinaryView(this, sql));
     }
 
     /** {@inheritDoc} */
@@ -166,13 +168,13 @@ public class ClientTable implements Table {
         Objects.requireNonNull(keyMapper);
         Objects.requireNonNull(valMapper);
 
-        return new ClientKeyValueView<>(this, sql, keyMapper, valMapper);
+        return new PublicApiClientKeyValueView<>(new 
ClientKeyValueView<>(this, sql, keyMapper, valMapper));
     }
 
     /** {@inheritDoc} */
     @Override
     public KeyValueView<Tuple, Tuple> keyValueView() {
-        return new ClientKeyValueBinaryView(this, sql);
+        return new PublicApiClientKeyValueView<>(new 
ClientKeyValueBinaryView(this, sql));
     }
 
     CompletableFuture<ClientSchema> getLatestSchema() {
@@ -217,7 +219,7 @@ public class ClientTable implements Table {
             ClientSchema last = null;
 
             for (var i = 0; i < schemaCnt; i++) {
-                last = readSchema(r.in());
+                last = readSchema(r.in(), ver);
 
                 if (log.isDebugEnabled()) {
                     log.debug("Schema loaded [tableId=" + id + ", 
schemaVersion=" + last.version() + "]");
@@ -228,7 +230,7 @@ public class ClientTable implements Table {
         });
     }
 
-    private ClientSchema readSchema(ClientMessageUnpacker in) {
+    private ClientSchema readSchema(ClientMessageUnpacker in, int targetVer) {
         var schemaVer = in.unpackInt();
         var colCnt = in.unpackInt();
         var columns = new ClientColumn[colCnt];
@@ -257,7 +259,10 @@ public class ClientTable implements Table {
         }
 
         var schema = new ClientSchema(schemaVer, columns, marshallers);
-        schemas.put(schemaVer, CompletableFuture.completedFuture(schema));
+
+        if (schemaVer != targetVer) {
+            schemas.put(schemaVer, CompletableFuture.completedFuture(schema));
+        }
 
         synchronized (latestSchemaLock) {
             if (schemaVer > latestSchemaVer) {
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientKeyValueView.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientKeyValueView.java
new file mode 100644
index 0000000000..7f4de6f43f
--- /dev/null
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientKeyValueView.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.table.api;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.lang.NullableValue;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wrapper around {@link KeyValueView} that grants a specific role to the 
synchronous or asynchronous invoked.
+ *
+ * @see PublicApiThreading#execUserSyncOperation(Supplier)
+ * @see PublicApiThreading#execUserAsyncOperation(Supplier)
+ */
+public class PublicApiClientKeyValueView<K, V> extends 
PublicApiClientViewBase<Entry<K, V>> implements KeyValueView<K, V> {
+    private final KeyValueView<K, V> view;
+
+    /**
+     * Constructor.
+     *
+     * @param view View to wrap.
+     */
+    public PublicApiClientKeyValueView(KeyValueView<K, V> view) {
+        super(view, view);
+
+        this.view = view;
+    }
+
+    @Override
+    public @Nullable V get(@Nullable Transaction tx, K key) {
+        return executeSyncOp(() -> view.get(tx, key));
+    }
+
+    @Override
+    public CompletableFuture<V> getAsync(@Nullable Transaction tx, K key) {
+        return executeAsyncOp(() -> view.getAsync(tx, key));
+    }
+
+    @Override
+    public NullableValue<V> getNullable(@Nullable Transaction tx, K key) {
+        return executeSyncOp(() -> view.getNullable(tx, key));
+    }
+
+    @Override
+    public CompletableFuture<NullableValue<V>> getNullableAsync(@Nullable 
Transaction tx, K key) {
+        return executeAsyncOp(() -> view.getNullableAsync(tx, key));
+    }
+
+    @Override
+    public @Nullable V getOrDefault(@Nullable Transaction tx, K key, @Nullable 
V defaultValue) {
+        return executeSyncOp(() -> view.getOrDefault(tx, key, defaultValue));
+    }
+
+    @Override
+    public CompletableFuture<V> getOrDefaultAsync(@Nullable Transaction tx, K 
key, @Nullable V defaultValue) {
+        return executeAsyncOp(() -> view.getOrDefaultAsync(tx, key, 
defaultValue));
+    }
+
+    @Override
+    public Map<K, V> getAll(@Nullable Transaction tx, Collection<K> keys) {
+        return executeSyncOp(() -> view.getAll(tx, keys));
+    }
+
+    @Override
+    public CompletableFuture<Map<K, V>> getAllAsync(@Nullable Transaction tx, 
Collection<K> keys) {
+        return executeAsyncOp(() -> view.getAllAsync(tx, keys));
+    }
+
+    @Override
+    public boolean contains(@Nullable Transaction tx, K key) {
+        return executeSyncOp(() -> view.contains(tx, key));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsAsync(@Nullable Transaction tx, 
K key) {
+        return executeAsyncOp(() -> view.containsAsync(tx, key));
+    }
+
+    @Override
+    public boolean containsAll(@Nullable Transaction tx, Collection<K> keys) {
+        return executeSyncOp(() -> view.containsAll(tx, keys));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsAllAsync(@Nullable Transaction 
tx, Collection<K> keys) {
+        return executeAsyncOp(() -> view.containsAllAsync(tx, keys));
+    }
+
+    @Override
+    public void put(@Nullable Transaction tx, K key, @Nullable V val) {
+        executeSyncOp(() -> view.put(tx, key, val));
+    }
+
+    @Override
+    public CompletableFuture<Void> putAsync(@Nullable Transaction tx, K key, 
@Nullable V val) {
+        return executeAsyncOp(() -> view.putAsync(tx, key, val));
+    }
+
+    @Override
+    public void putAll(@Nullable Transaction tx, Map<K, V> pairs) {
+        executeSyncOp(() -> view.putAll(tx, pairs));
+    }
+
+    @Override
+    public CompletableFuture<Void> putAllAsync(@Nullable Transaction tx, 
Map<K, V> pairs) {
+        return executeAsyncOp(() -> view.putAllAsync(tx, pairs));
+    }
+
+    @Override
+    public @Nullable V getAndPut(@Nullable Transaction tx, K key, @Nullable V 
val) {
+        return executeSyncOp(() -> view.getAndPut(tx, key, val));
+    }
+
+    @Override
+    public CompletableFuture<V> getAndPutAsync(@Nullable Transaction tx, K 
key, @Nullable V val) {
+        return executeAsyncOp(() -> view.getAndPutAsync(tx, key, val));
+    }
+
+    @Override
+    public NullableValue<V> getNullableAndPut(@Nullable Transaction tx, K key, 
@Nullable V val) {
+        return executeSyncOp(() -> view.getNullableAndPut(tx, key, val));
+    }
+
+    @Override
+    public CompletableFuture<NullableValue<V>> 
getNullableAndPutAsync(@Nullable Transaction tx, K key, @Nullable V val) {
+        return executeAsyncOp(() -> view.getNullableAndPutAsync(tx, key, val));
+    }
+
+    @Override
+    public boolean putIfAbsent(@Nullable Transaction tx, K key, @Nullable V 
val) {
+        return executeSyncOp(() -> view.putIfAbsent(tx, key, val));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> putIfAbsentAsync(@Nullable Transaction 
tx, K key, @Nullable V val) {
+        return executeAsyncOp(() -> view.putIfAbsentAsync(tx, key, val));
+    }
+
+    @Override
+    public boolean remove(@Nullable Transaction tx, K key) {
+        return executeSyncOp(() -> view.remove(tx, key));
+    }
+
+    @Override
+    public boolean remove(@Nullable Transaction tx, K key, V val) {
+        return executeSyncOp(() -> view.remove(tx, key, val));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> removeAsync(@Nullable Transaction tx, K 
key) {
+        return executeAsyncOp(() -> view.removeAsync(tx, key));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> removeAsync(@Nullable Transaction tx, K 
key, V val) {
+        return executeAsyncOp(() -> view.removeAsync(tx, key, val));
+    }
+
+    @Override
+    public Collection<K> removeAll(@Nullable Transaction tx, Collection<K> 
keys) {
+        return executeSyncOp(() -> view.removeAll(tx, keys));
+    }
+
+    @Override
+    public CompletableFuture<Collection<K>> removeAllAsync(@Nullable 
Transaction tx, Collection<K> keys) {
+        return executeAsyncOp(() -> view.removeAllAsync(tx, keys));
+    }
+
+    @Override
+    public @Nullable V getAndRemove(@Nullable Transaction tx, K key) {
+        return executeSyncOp(() -> view.getAndRemove(tx, key));
+    }
+
+    @Override
+    public CompletableFuture<V> getAndRemoveAsync(@Nullable Transaction tx, K 
key) {
+        return executeAsyncOp(() -> view.getAndRemoveAsync(tx, key));
+    }
+
+    @Override
+    public NullableValue<V> getNullableAndRemove(@Nullable Transaction tx, K 
key) {
+        return executeSyncOp(() -> view.getNullableAndRemove(tx, key));
+    }
+
+    @Override
+    public CompletableFuture<NullableValue<V>> 
getNullableAndRemoveAsync(@Nullable Transaction tx, K key) {
+        return executeAsyncOp(() -> view.getNullableAndRemoveAsync(tx, key));
+    }
+
+    @Override
+    public boolean replace(@Nullable Transaction tx, K key, @Nullable V val) {
+        return executeSyncOp(() -> view.replace(tx, key, val));
+    }
+
+    @Override
+    public boolean replace(@Nullable Transaction tx, K key, V oldValue, 
@Nullable V newValue) {
+        return executeSyncOp(() -> view.replace(tx, key, oldValue, newValue));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, K 
key, @Nullable V val) {
+        return executeAsyncOp(() -> view.replaceAsync(tx, key, val));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, K 
key, @Nullable V oldVal, @Nullable V newVal) {
+        return executeAsyncOp(() -> view.replaceAsync(tx, key, oldVal, 
newVal));
+    }
+
+    @Override
+    public @Nullable V getAndReplace(@Nullable Transaction tx, @Nullable K 
key, @Nullable V val) {
+        return executeSyncOp(() -> view.getAndReplace(tx, key, val));
+    }
+
+    @Override
+    public CompletableFuture<V> getAndReplaceAsync(@Nullable Transaction tx, K 
key, @Nullable V val) {
+        return executeAsyncOp(() -> view.getAndReplaceAsync(tx, key, val));
+    }
+
+    @Override
+    public NullableValue<V> getNullableAndReplace(@Nullable Transaction tx, K 
key, @Nullable V val) {
+        return executeSyncOp(() -> view.getNullableAndReplace(tx, key, val));
+    }
+
+    @Override
+    public CompletableFuture<NullableValue<V>> 
getNullableAndReplaceAsync(@Nullable Transaction tx, K key, @Nullable V val) {
+        return executeAsyncOp(() -> view.getNullableAndReplaceAsync(tx, key, 
val));
+    }
+}
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientRecordView.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientRecordView.java
new file mode 100644
index 0000000000..94ca4ed098
--- /dev/null
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientRecordView.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.table.api;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wrapper around {@link RecordView} that grants a specific role to the 
synchronous or asynchronous invoked.
+ *
+ * @see PublicApiThreading#execUserSyncOperation(Supplier)
+ * @see PublicApiThreading#execUserAsyncOperation(Supplier)
+ */
+public class PublicApiClientRecordView<R> extends PublicApiClientViewBase<R> 
implements RecordView<R>, Wrapper {
+    private final RecordView<R> view;
+
+    /**
+     * Constructor.
+     *
+     * @param view View to wrap.
+     */
+    public PublicApiClientRecordView(RecordView<R> view) {
+        super(view, view);
+
+        this.view = view;
+    }
+
+    @Override
+    public R get(@Nullable Transaction tx, R keyRec) {
+        return executeSyncOp(() -> view.get(tx, keyRec));
+    }
+
+    @Override
+    public CompletableFuture<R> getAsync(@Nullable Transaction tx, R keyRec) {
+        return executeAsyncOp(() -> view.getAsync(tx, keyRec));
+    }
+
+    @Override
+    public List<R> getAll(@Nullable Transaction tx, Collection<R> keyRecs) {
+        return executeSyncOp(() -> view.getAll(tx, keyRecs));
+    }
+
+    @Override
+    public CompletableFuture<List<R>> getAllAsync(@Nullable Transaction tx, 
Collection<R> keyRecs) {
+        return executeAsyncOp(() -> view.getAllAsync(tx, keyRecs));
+    }
+
+    @Override
+    public boolean contains(@Nullable Transaction tx, R keyRec) {
+        return executeSyncOp(() -> view.contains(tx, keyRec));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsAsync(@Nullable Transaction tx, 
R keyRec) {
+        return executeAsyncOp(() -> view.containsAsync(tx, keyRec));
+    }
+
+    @Override
+    public boolean containsAll(@Nullable Transaction tx, Collection<R> keys) {
+        return executeSyncOp(() -> view.containsAll(tx, keys));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsAllAsync(@Nullable Transaction 
tx, Collection<R> keys) {
+        return executeAsyncOp(() -> view.containsAllAsync(tx, keys));
+    }
+
+    @Override
+    public void upsert(@Nullable Transaction tx, R rec) {
+        executeSyncOp(() -> view.upsert(tx, rec));
+    }
+
+    @Override
+    public CompletableFuture<Void> upsertAsync(@Nullable Transaction tx, R 
rec) {
+        return executeAsyncOp(() -> view.upsertAsync(tx, rec));
+    }
+
+    @Override
+    public void upsertAll(@Nullable Transaction tx, Collection<R> recs) {
+        executeSyncOp(() -> view.upsertAll(tx, recs));
+    }
+
+    @Override
+    public CompletableFuture<Void> upsertAllAsync(@Nullable Transaction tx, 
Collection<R> recs) {
+        return executeAsyncOp(() -> view.upsertAllAsync(tx, recs));
+    }
+
+    @Override
+    public R getAndUpsert(@Nullable Transaction tx, R rec) {
+        return executeSyncOp(() -> view.getAndUpsert(tx, rec));
+    }
+
+    @Override
+    public CompletableFuture<R> getAndUpsertAsync(@Nullable Transaction tx, R 
rec) {
+        return executeAsyncOp(() -> view.getAndUpsertAsync(tx, rec));
+    }
+
+    @Override
+    public boolean insert(@Nullable Transaction tx, R rec) {
+        return executeSyncOp(() -> view.insert(tx, rec));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> insertAsync(@Nullable Transaction tx, R 
rec) {
+        return executeAsyncOp(() -> view.insertAsync(tx, rec));
+    }
+
+    @Override
+    public List<R> insertAll(@Nullable Transaction tx, Collection<R> recs) {
+        return executeSyncOp(() -> view.insertAll(tx, recs));
+    }
+
+    @Override
+    public CompletableFuture<List<R>> insertAllAsync(@Nullable Transaction tx, 
Collection<R> recs) {
+        return executeAsyncOp(() -> view.insertAllAsync(tx, recs));
+    }
+
+    @Override
+    public boolean replace(@Nullable Transaction tx, R rec) {
+        return executeSyncOp(() -> view.replace(tx, rec));
+    }
+
+    @Override
+    public boolean replace(@Nullable Transaction tx, R oldRec, R newRec) {
+        return executeSyncOp(() -> view.replace(tx, oldRec, newRec));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, R 
rec) {
+        return executeAsyncOp(() -> view.replaceAsync(tx, rec));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, R 
oldRec, R newRec) {
+        return executeAsyncOp(() -> view.replaceAsync(tx, oldRec, newRec));
+    }
+
+    @Override
+    public R getAndReplace(@Nullable Transaction tx, R rec) {
+        return executeSyncOp(() -> view.getAndReplace(tx, rec));
+    }
+
+    @Override
+    public CompletableFuture<R> getAndReplaceAsync(@Nullable Transaction tx, R 
rec) {
+        return executeAsyncOp(() -> view.getAndReplaceAsync(tx, rec));
+    }
+
+    @Override
+    public boolean delete(@Nullable Transaction tx, R keyRec) {
+        return executeSyncOp(() -> view.delete(tx, keyRec));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> deleteAsync(@Nullable Transaction tx, R 
keyRec) {
+        return executeAsyncOp(() -> view.deleteAsync(tx, keyRec));
+    }
+
+    @Override
+    public boolean deleteExact(@Nullable Transaction tx, R rec) {
+        return executeSyncOp(() -> view.deleteExact(tx, rec));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> deleteExactAsync(@Nullable Transaction 
tx, R rec) {
+        return executeAsyncOp(() -> view.deleteExactAsync(tx, rec));
+    }
+
+    @Override
+    public R getAndDelete(@Nullable Transaction tx, R keyRec) {
+        return executeSyncOp(() -> view.getAndDelete(tx, keyRec));
+    }
+
+    @Override
+    public CompletableFuture<R> getAndDeleteAsync(@Nullable Transaction tx, R 
keyRec) {
+        return executeAsyncOp(() -> view.getAndDeleteAsync(tx, keyRec));
+    }
+
+    @Override
+    public List<R> deleteAll(@Nullable Transaction tx, Collection<R> keyRecs) {
+        return executeSyncOp(() -> view.deleteAll(tx, keyRecs));
+    }
+
+    @Override
+    public CompletableFuture<List<R>> deleteAllAsync(@Nullable Transaction tx, 
Collection<R> keyRecs) {
+        return executeAsyncOp(() -> view.deleteAllAsync(tx, keyRecs));
+    }
+
+    @Override
+    public List<R> deleteAllExact(@Nullable Transaction tx, Collection<R> 
recs) {
+        return executeSyncOp(() -> view.deleteAllExact(tx, recs));
+    }
+
+    @Override
+    public CompletableFuture<List<R>> deleteAllExactAsync(@Nullable 
Transaction tx, Collection<R> recs) {
+        return executeAsyncOp(() -> view.deleteAllExactAsync(tx, recs));
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> classToUnwrap) {
+        return classToUnwrap.cast(view);
+    }
+}
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientViewBase.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientViewBase.java
new file mode 100644
index 0000000000..d9e2052b3b
--- /dev/null
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientViewBase.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.table.api;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Publisher;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.lang.AsyncCursor;
+import org.apache.ignite.lang.Cursor;
+import org.apache.ignite.table.DataStreamerItem;
+import org.apache.ignite.table.DataStreamerOptions;
+import org.apache.ignite.table.DataStreamerTarget;
+import org.apache.ignite.table.ReceiverDescriptor;
+import org.apache.ignite.table.criteria.Criteria;
+import org.apache.ignite.table.criteria.CriteriaQueryOptions;
+import org.apache.ignite.table.criteria.CriteriaQuerySource;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+abstract class PublicApiClientViewBase<T> implements DataStreamerTarget<T>, 
CriteriaQuerySource<T> {
+    private final DataStreamerTarget<T> streamerTarget;
+    private final CriteriaQuerySource<T> querySource;
+
+    PublicApiClientViewBase(
+            DataStreamerTarget<T> streamerTarget,
+            CriteriaQuerySource<T> querySource
+    ) {
+        this.streamerTarget = streamerTarget;
+        this.querySource = querySource;
+    }
+
+    @Override
+    public CompletableFuture<Void> streamData(Publisher<DataStreamerItem<T>> 
publisher, @Nullable DataStreamerOptions options) {
+        return executeAsyncOp(() -> streamerTarget.streamData(publisher, 
options));
+    }
+
+    @Override
+    public <E, V, R, A> CompletableFuture<Void> streamData(
+            Publisher<E> publisher,
+            Function<E, T> keyFunc,
+            Function<E, V> payloadFunc,
+            ReceiverDescriptor<A> receiver,
+            @Nullable Flow.Subscriber<R> resultSubscriber,
+            @Nullable DataStreamerOptions options,
+            A receiverArg) {
+        return executeAsyncOp(() -> streamerTarget.streamData(
+                publisher,
+                keyFunc,
+                payloadFunc,
+                receiver,
+                resultSubscriber,
+                options,
+                receiverArg));
+    }
+
+    @Override
+    public Cursor<T> query(
+            @Nullable Transaction tx,
+            @Nullable Criteria criteria,
+            @Nullable String indexName,
+            @Nullable CriteriaQueryOptions opts
+    ) {
+        return executeSyncOp(() -> querySource.query(tx, criteria, indexName, 
opts));
+    }
+
+    @Override
+    public CompletableFuture<AsyncCursor<T>> queryAsync(
+            @Nullable Transaction tx,
+            @Nullable Criteria criteria,
+            @Nullable String indexName,
+            @Nullable CriteriaQueryOptions opts
+    ) {
+        return executeAsyncOp(() -> querySource.queryAsync(tx, criteria, 
indexName, opts));
+    }
+
+    final <U> CompletableFuture<U> 
executeAsyncOp(Supplier<CompletableFuture<U>> operation) {
+        CompletableFuture<U> future = 
PublicApiThreading.execUserAsyncOperation(operation);
+
+        return future;
+    }
+
+    static <T> T executeSyncOp(Supplier<T> operation) {
+        return PublicApiThreading.execUserSyncOperation(operation);
+    }
+
+    static void executeSyncOp(Runnable operation) {
+        PublicApiThreading.execUserSyncOperation(operation);
+    }
+}


Reply via email to