This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bc4855de8b IGNITE-22842 Improve synchronous client API performance
(#4488)
bc4855de8b is described below
commit bc4855de8b290434d70cdaf0e7a68ad0157c00b3
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Thu Oct 3 19:41:46 2024 +0300
IGNITE-22842 Improve synchronous client API performance (#4488)
---
.../ignite/internal/client/ReliableChannel.java | 6 +-
.../ignite/internal/client/TcpClientChannel.java | 155 ++++++++-----
.../ignite/internal/client/table/ClientTable.java | 19 +-
.../table/api/PublicApiClientKeyValueView.java | 250 +++++++++++++++++++++
.../table/api/PublicApiClientRecordView.java | 224 ++++++++++++++++++
.../client/table/api/PublicApiClientViewBase.java | 107 +++++++++
6 files changed, 690 insertions(+), 71 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index ce56bb44a6..b8759c8e0a 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.client;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import static
org.apache.ignite.lang.ErrorGroups.Client.CLUSTER_ID_MISMATCH_ERR;
import static org.apache.ignite.lang.ErrorGroups.Client.CONFIGURATION_ERR;
@@ -36,7 +37,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
@@ -728,9 +728,7 @@ public final class ReliableChannel implements AutoCloseable
{
@Nullable
private static IgniteClientConnectionException
unwrapConnectionException(Throwable err) {
- while (err instanceof CompletionException) {
- err = err.getCause();
- }
+ err = unwrapCause(err);
if (!(err instanceof IgniteClientConnectionException)) {
return null;
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 54eac0c4ce..80012c5436 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.client;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.util.ExceptionUtils.copyExceptionWithCause;
import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
@@ -62,6 +64,7 @@ import
org.apache.ignite.internal.future.timeout.TimeoutObject;
import org.apache.ignite.internal.future.timeout.TimeoutWorker;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.thread.PublicApiThreading;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.ViewUtils;
import org.apache.ignite.lang.ErrorGroups.Table;
@@ -96,7 +99,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
private final AtomicLong reqId = new AtomicLong(1);
/** Pending requests. */
- private final ConcurrentMap<Long, ClientRequestFuture<?>> pendingReqs =
new ConcurrentHashMap<>();
+ private final ConcurrentMap<Long, TimeoutObjectImpl> pendingReqs = new
ConcurrentHashMap<>();
/** Notification handlers. */
private final Map<Long, CompletableFuture<PayloadInputChannel>>
notificationHandlers = new ConcurrentHashMap<>();
@@ -251,8 +254,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
sock.close();
}
- for (ClientRequestFuture<?> pendingReq : pendingReqs.values()) {
- pendingReq.completeExceptionally(
+ for (TimeoutObjectImpl pendingReq : pendingReqs.values()) {
+ pendingReq.future().completeExceptionally(
new IgniteClientConnectionException(CONNECTION_ERR,
"Channel is closed", endpoint(), cause));
}
@@ -272,13 +275,13 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
/** {@inheritDoc} */
@Override
public void onMessage(ByteBuf buf) {
- asyncContinuationExecutor.execute(() -> {
- try (var unpacker = new ClientMessageUnpacker(buf)) {
- processNextMessage(unpacker);
- } catch (Throwable t) {
- close(t, false);
- }
- });
+ try {
+ var unpacker = new ClientMessageUnpacker(buf);
+
+ processNextMessage(unpacker);
+ } catch (Throwable t) {
+ close(t, false);
+ }
}
/** {@inheritDoc} */
@@ -315,12 +318,12 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
notificationHandlers.put(id, notificationFut);
}
- ClientRequestFuture<T> fut = send(opCode, id, payloadWriter,
payloadReader, notificationFut, operationTimeout);
+ CompletableFuture<T> fut = send(opCode, id, payloadWriter,
payloadReader, notificationFut, operationTimeout);
return fut;
} catch (Throwable t) {
- return CompletableFuture.failedFuture(t);
+ return failedFuture(t);
}
}
@@ -333,7 +336,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
* @param notificationFut Optional notification future.
* @return Request future.
*/
- private <T> ClientRequestFuture<T> send(
+ private <T> CompletableFuture<T> send(
int opCode,
long id,
@Nullable PayloadWriter payloadWriter,
@@ -345,9 +348,9 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
throw new IgniteClientConnectionException(CONNECTION_ERR, "Channel
is closed", endpoint());
}
- ClientRequestFuture<T> fut = new ClientRequestFuture<>(payloadReader,
notificationFut, timeout);
+ var fut = new CompletableFuture<ClientMessageUnpacker>();
- pendingReqs.put(id, fut);
+ pendingReqs.put(id, new TimeoutObjectImpl(timeout, fut));
metrics.requestsActiveIncrement();
@@ -380,7 +383,26 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
}
});
- return fut;
+ if (PublicApiThreading.executingSyncPublicApi()) {
+ try {
+ ClientMessageUnpacker unpacker = fut.get();
+
+ return completedFuture(complete(payloadReader,
notificationFut, unpacker));
+ } catch (Exception e) {
+ return failedFuture(e);
+ }
+ }
+
+ return fut.handleAsync(
+ (unpacker, err) -> {
+ if (err != null) {
+ throw sneakyThrow(err);
+ }
+
+ return complete(payloadReader, notificationFut,
unpacker);
+ },
+ asyncContinuationExecutor
+ );
} catch (Throwable t) {
log.warn("Failed to send request [id=" + id + ", op=" + opCode +
", remoteAddress=" + cfg.getAddress() + "]: "
+ t.getMessage(), t);
@@ -398,22 +420,26 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
/**
* Completes the request future.
*
- * @param pendingReq Request future.
+ * @param payloadReader Payload reader.
+ * @param notificationFut Notify future.
+ * @param unpacker Unpacked message.
*/
- private <T> void complete(ClientRequestFuture<T> pendingReq,
ClientMessageUnpacker unpacker) {
- if (pendingReq.payloadReader == null) {
- pendingReq.complete(null);
- } else {
- try {
- T res = pendingReq.payloadReader.apply(new
PayloadInputChannel(this, unpacker, pendingReq.notificationFut));
- pendingReq.complete(res);
- } catch (Throwable e) {
- log.error("Failed to deserialize server response
[remoteAddress=" + cfg.getAddress() + "]: " + e.getMessage(), e);
-
- pendingReq.completeExceptionally(
- new IgniteException(PROTOCOL_ERR, "Failed to
deserialize server response: " + e.getMessage(), e));
+ private <T> T complete(
+ PayloadReader<T> payloadReader,
+ CompletableFuture<PayloadInputChannel> notificationFut,
+ ClientMessageUnpacker unpacker
+ ) {
+ try (unpacker) {
+ if (payloadReader != null) {
+ return payloadReader.apply(new PayloadInputChannel(this,
unpacker, notificationFut));
}
+ } catch (Throwable e) {
+ log.error("Failed to deserialize server response [remoteAddress="
+ cfg.getAddress() + "]: " + e.getMessage(), e);
+
+ throw new IgniteException(PROTOCOL_ERR, "Failed to deserialize
server response: " + e.getMessage(), e);
}
+
+ return null;
}
/**
@@ -422,7 +448,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
private void processNextMessage(ClientMessageUnpacker unpacker) throws
IgniteException {
if (protocolCtx == null) {
// Process handshake.
- complete(pendingReqs.remove(-1L), unpacker);
+ pendingReqs.remove(-1L).future().complete(unpacker);
return;
}
@@ -436,12 +462,15 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
if (ResponseFlags.getNotificationFlag(flags)) {
handleNotification(resId, unpacker, err);
+
return;
}
- ClientRequestFuture<?> pendingReq = pendingReqs.remove(resId);
+ TimeoutObjectImpl pendingReq = pendingReqs.remove(resId);
if (pendingReq == null) {
+ unpacker.close();
+
log.error("Unexpected response ID [remoteAddress=" +
cfg.getAddress() + "]: " + resId);
throw new IgniteClientConnectionException(PROTOCOL_ERR,
String.format("Unexpected response ID [%s]", resId), endpoint());
@@ -451,11 +480,15 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
if (err == null) {
metrics.requestsCompletedIncrement();
- complete(pendingReq, unpacker);
+
+ pendingReq.future().complete(unpacker);
} else {
metrics.requestsFailedIncrement();
notificationHandlers.remove(resId);
- pendingReq.completeExceptionally(err);
+
+ unpacker.close();
+
+ pendingReq.future().completeExceptionally(err);
}
}
@@ -484,7 +517,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
throw new IgniteClientConnectionException(PROTOCOL_ERR,
String.format("Unexpected notification ID [%s]", id), endpoint());
}
- try {
+ try (unpacker) {
if (err != null) {
handler.completeExceptionally(err);
} else {
@@ -581,10 +614,9 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
}
/** Client handshake. */
- private CompletableFuture<Object> handshakeAsync(ProtocolVersion ver)
- throws IgniteClientConnectionException {
- ClientRequestFuture<Object> fut = new ClientRequestFuture<>(r ->
handshakeRes(r.in()), null, connectTimeout);
- pendingReqs.put(-1L, fut);
+ private CompletableFuture<Object> handshakeAsync(ProtocolVersion ver)
throws IgniteClientConnectionException {
+ var fut = new CompletableFuture<ClientMessageUnpacker>();
+ pendingReqs.put(-1L, new TimeoutObjectImpl(connectTimeout, fut));
handshakeReqAsync(ver).addListener(f -> {
if (!f.isSuccess()) {
@@ -594,20 +626,24 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
});
return fut
- .handle((res, err) -> {
+ .handleAsync((unpacker, err) -> {
if (err != null) {
if (err instanceof TimeoutException || err.getCause()
instanceof TimeoutException) {
metrics.handshakesFailedTimeoutIncrement();
throw new
IgniteClientConnectionException(CONNECTION_ERR, "Handshake timeout",
endpoint(), err);
- } else {
- metrics.handshakesFailedIncrement();
}
+ metrics.handshakesFailedIncrement();
throw new
IgniteClientConnectionException(CONNECTION_ERR, "Handshake error", endpoint(),
err);
}
- return res;
- });
+ try {
+ return complete(r -> handshakeRes(r.in()), null,
unpacker);
+ } catch (Throwable th) {
+ metrics.handshakesFailedIncrement();
+ throw new
IgniteClientConnectionException(CONNECTION_ERR, "Handshake error", endpoint(),
th);
+ }
+ }, asyncContinuationExecutor);
}
/**
@@ -769,25 +805,24 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
}
/**
- * Client request future.
+ * Timeout object wrapper for the completable future.
*/
- private static class ClientRequestFuture<T> extends CompletableFuture<T>
implements TimeoutObject<CompletableFuture<T>> {
- @Nullable
- private final PayloadReader<T> payloadReader;
-
- @Nullable
- private final CompletableFuture<PayloadInputChannel> notificationFut;
-
+ private static class TimeoutObjectImpl implements
TimeoutObject<CompletableFuture<ClientMessageUnpacker>> {
+ /** End time (milliseconds since Unix epoch). */
private final long endTime;
- private ClientRequestFuture(
- @Nullable PayloadReader<T> payloadReader,
- @Nullable CompletableFuture<PayloadInputChannel>
notificationFut,
- long timeout
- ) {
- this.payloadReader = payloadReader;
- this.notificationFut = notificationFut;
+ /** Target future. */
+ private final CompletableFuture<ClientMessageUnpacker> fut;
+
+ /**
+ * Constructor.
+ *
+ * @param timeout Timeout in milliseconds.
+ * @param fut Target future.
+ */
+ public TimeoutObjectImpl(long timeout,
CompletableFuture<ClientMessageUnpacker> fut) {
this.endTime = timeout > 0 ? coarseCurrentTimeMillis() + timeout :
0;
+ this.fut = fut;
}
@Override
@@ -796,8 +831,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
}
@Override
- public CompletableFuture<T> future() {
- return this;
+ public CompletableFuture<ClientMessageUnpacker> future() {
+ return fut;
}
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index 7a475b8985..8ab926ef08 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -40,6 +40,8 @@ import
org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.proto.ColumnTypeConverter;
import org.apache.ignite.internal.client.sql.ClientSql;
+import org.apache.ignite.internal.client.table.api.PublicApiClientKeyValueView;
+import org.apache.ignite.internal.client.table.api.PublicApiClientRecordView;
import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
import org.apache.ignite.internal.client.tx.ClientTransaction;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -152,12 +154,12 @@ public class ClientTable implements Table {
public <R> RecordView<R> recordView(Mapper<R> recMapper) {
Objects.requireNonNull(recMapper);
- return new ClientRecordView<>(this, sql, recMapper);
+ return new PublicApiClientRecordView<>(new ClientRecordView<>(this,
sql, recMapper));
}
@Override
public RecordView<Tuple> recordView() {
- return new ClientRecordBinaryView(this, sql);
+ return new PublicApiClientRecordView<>(new
ClientRecordBinaryView(this, sql));
}
/** {@inheritDoc} */
@@ -166,13 +168,13 @@ public class ClientTable implements Table {
Objects.requireNonNull(keyMapper);
Objects.requireNonNull(valMapper);
- return new ClientKeyValueView<>(this, sql, keyMapper, valMapper);
+ return new PublicApiClientKeyValueView<>(new
ClientKeyValueView<>(this, sql, keyMapper, valMapper));
}
/** {@inheritDoc} */
@Override
public KeyValueView<Tuple, Tuple> keyValueView() {
- return new ClientKeyValueBinaryView(this, sql);
+ return new PublicApiClientKeyValueView<>(new
ClientKeyValueBinaryView(this, sql));
}
CompletableFuture<ClientSchema> getLatestSchema() {
@@ -217,7 +219,7 @@ public class ClientTable implements Table {
ClientSchema last = null;
for (var i = 0; i < schemaCnt; i++) {
- last = readSchema(r.in());
+ last = readSchema(r.in(), ver);
if (log.isDebugEnabled()) {
log.debug("Schema loaded [tableId=" + id + ",
schemaVersion=" + last.version() + "]");
@@ -228,7 +230,7 @@ public class ClientTable implements Table {
});
}
- private ClientSchema readSchema(ClientMessageUnpacker in) {
+ private ClientSchema readSchema(ClientMessageUnpacker in, int targetVer) {
var schemaVer = in.unpackInt();
var colCnt = in.unpackInt();
var columns = new ClientColumn[colCnt];
@@ -257,7 +259,10 @@ public class ClientTable implements Table {
}
var schema = new ClientSchema(schemaVer, columns, marshallers);
- schemas.put(schemaVer, CompletableFuture.completedFuture(schema));
+
+ if (schemaVer != targetVer) {
+ schemas.put(schemaVer, CompletableFuture.completedFuture(schema));
+ }
synchronized (latestSchemaLock) {
if (schemaVer > latestSchemaVer) {
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientKeyValueView.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientKeyValueView.java
new file mode 100644
index 0000000000..7f4de6f43f
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientKeyValueView.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.table.api;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.lang.NullableValue;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wrapper around {@link KeyValueView} that grants a specific role to the
synchronous or asynchronous invoked.
+ *
+ * @see PublicApiThreading#execUserSyncOperation(Supplier)
+ * @see PublicApiThreading#execUserAsyncOperation(Supplier)
+ */
+public class PublicApiClientKeyValueView<K, V> extends
PublicApiClientViewBase<Entry<K, V>> implements KeyValueView<K, V> {
+ private final KeyValueView<K, V> view;
+
+ /**
+ * Constructor.
+ *
+ * @param view View to wrap.
+ */
+ public PublicApiClientKeyValueView(KeyValueView<K, V> view) {
+ super(view, view);
+
+ this.view = view;
+ }
+
+ @Override
+ public @Nullable V get(@Nullable Transaction tx, K key) {
+ return executeSyncOp(() -> view.get(tx, key));
+ }
+
+ @Override
+ public CompletableFuture<V> getAsync(@Nullable Transaction tx, K key) {
+ return executeAsyncOp(() -> view.getAsync(tx, key));
+ }
+
+ @Override
+ public NullableValue<V> getNullable(@Nullable Transaction tx, K key) {
+ return executeSyncOp(() -> view.getNullable(tx, key));
+ }
+
+ @Override
+ public CompletableFuture<NullableValue<V>> getNullableAsync(@Nullable
Transaction tx, K key) {
+ return executeAsyncOp(() -> view.getNullableAsync(tx, key));
+ }
+
+ @Override
+ public @Nullable V getOrDefault(@Nullable Transaction tx, K key, @Nullable
V defaultValue) {
+ return executeSyncOp(() -> view.getOrDefault(tx, key, defaultValue));
+ }
+
+ @Override
+ public CompletableFuture<V> getOrDefaultAsync(@Nullable Transaction tx, K
key, @Nullable V defaultValue) {
+ return executeAsyncOp(() -> view.getOrDefaultAsync(tx, key,
defaultValue));
+ }
+
+ @Override
+ public Map<K, V> getAll(@Nullable Transaction tx, Collection<K> keys) {
+ return executeSyncOp(() -> view.getAll(tx, keys));
+ }
+
+ @Override
+ public CompletableFuture<Map<K, V>> getAllAsync(@Nullable Transaction tx,
Collection<K> keys) {
+ return executeAsyncOp(() -> view.getAllAsync(tx, keys));
+ }
+
+ @Override
+ public boolean contains(@Nullable Transaction tx, K key) {
+ return executeSyncOp(() -> view.contains(tx, key));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsAsync(@Nullable Transaction tx,
K key) {
+ return executeAsyncOp(() -> view.containsAsync(tx, key));
+ }
+
+ @Override
+ public boolean containsAll(@Nullable Transaction tx, Collection<K> keys) {
+ return executeSyncOp(() -> view.containsAll(tx, keys));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsAllAsync(@Nullable Transaction
tx, Collection<K> keys) {
+ return executeAsyncOp(() -> view.containsAllAsync(tx, keys));
+ }
+
+ @Override
+ public void put(@Nullable Transaction tx, K key, @Nullable V val) {
+ executeSyncOp(() -> view.put(tx, key, val));
+ }
+
+ @Override
+ public CompletableFuture<Void> putAsync(@Nullable Transaction tx, K key,
@Nullable V val) {
+ return executeAsyncOp(() -> view.putAsync(tx, key, val));
+ }
+
+ @Override
+ public void putAll(@Nullable Transaction tx, Map<K, V> pairs) {
+ executeSyncOp(() -> view.putAll(tx, pairs));
+ }
+
+ @Override
+ public CompletableFuture<Void> putAllAsync(@Nullable Transaction tx,
Map<K, V> pairs) {
+ return executeAsyncOp(() -> view.putAllAsync(tx, pairs));
+ }
+
+ @Override
+ public @Nullable V getAndPut(@Nullable Transaction tx, K key, @Nullable V
val) {
+ return executeSyncOp(() -> view.getAndPut(tx, key, val));
+ }
+
+ @Override
+ public CompletableFuture<V> getAndPutAsync(@Nullable Transaction tx, K
key, @Nullable V val) {
+ return executeAsyncOp(() -> view.getAndPutAsync(tx, key, val));
+ }
+
+ @Override
+ public NullableValue<V> getNullableAndPut(@Nullable Transaction tx, K key,
@Nullable V val) {
+ return executeSyncOp(() -> view.getNullableAndPut(tx, key, val));
+ }
+
+ @Override
+ public CompletableFuture<NullableValue<V>>
getNullableAndPutAsync(@Nullable Transaction tx, K key, @Nullable V val) {
+ return executeAsyncOp(() -> view.getNullableAndPutAsync(tx, key, val));
+ }
+
+ @Override
+ public boolean putIfAbsent(@Nullable Transaction tx, K key, @Nullable V
val) {
+ return executeSyncOp(() -> view.putIfAbsent(tx, key, val));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> putIfAbsentAsync(@Nullable Transaction
tx, K key, @Nullable V val) {
+ return executeAsyncOp(() -> view.putIfAbsentAsync(tx, key, val));
+ }
+
+ @Override
+ public boolean remove(@Nullable Transaction tx, K key) {
+ return executeSyncOp(() -> view.remove(tx, key));
+ }
+
+ @Override
+ public boolean remove(@Nullable Transaction tx, K key, V val) {
+ return executeSyncOp(() -> view.remove(tx, key, val));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> removeAsync(@Nullable Transaction tx, K
key) {
+ return executeAsyncOp(() -> view.removeAsync(tx, key));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> removeAsync(@Nullable Transaction tx, K
key, V val) {
+ return executeAsyncOp(() -> view.removeAsync(tx, key, val));
+ }
+
+ @Override
+ public Collection<K> removeAll(@Nullable Transaction tx, Collection<K>
keys) {
+ return executeSyncOp(() -> view.removeAll(tx, keys));
+ }
+
+ @Override
+ public CompletableFuture<Collection<K>> removeAllAsync(@Nullable
Transaction tx, Collection<K> keys) {
+ return executeAsyncOp(() -> view.removeAllAsync(tx, keys));
+ }
+
+ @Override
+ public @Nullable V getAndRemove(@Nullable Transaction tx, K key) {
+ return executeSyncOp(() -> view.getAndRemove(tx, key));
+ }
+
+ @Override
+ public CompletableFuture<V> getAndRemoveAsync(@Nullable Transaction tx, K
key) {
+ return executeAsyncOp(() -> view.getAndRemoveAsync(tx, key));
+ }
+
+ @Override
+ public NullableValue<V> getNullableAndRemove(@Nullable Transaction tx, K
key) {
+ return executeSyncOp(() -> view.getNullableAndRemove(tx, key));
+ }
+
+ @Override
+ public CompletableFuture<NullableValue<V>>
getNullableAndRemoveAsync(@Nullable Transaction tx, K key) {
+ return executeAsyncOp(() -> view.getNullableAndRemoveAsync(tx, key));
+ }
+
+ @Override
+ public boolean replace(@Nullable Transaction tx, K key, @Nullable V val) {
+ return executeSyncOp(() -> view.replace(tx, key, val));
+ }
+
+ @Override
+ public boolean replace(@Nullable Transaction tx, K key, V oldValue,
@Nullable V newValue) {
+ return executeSyncOp(() -> view.replace(tx, key, oldValue, newValue));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, K
key, @Nullable V val) {
+ return executeAsyncOp(() -> view.replaceAsync(tx, key, val));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, K
key, @Nullable V oldVal, @Nullable V newVal) {
+ return executeAsyncOp(() -> view.replaceAsync(tx, key, oldVal,
newVal));
+ }
+
+ @Override
+ public @Nullable V getAndReplace(@Nullable Transaction tx, @Nullable K
key, @Nullable V val) {
+ return executeSyncOp(() -> view.getAndReplace(tx, key, val));
+ }
+
+ @Override
+ public CompletableFuture<V> getAndReplaceAsync(@Nullable Transaction tx, K
key, @Nullable V val) {
+ return executeAsyncOp(() -> view.getAndReplaceAsync(tx, key, val));
+ }
+
+ @Override
+ public NullableValue<V> getNullableAndReplace(@Nullable Transaction tx, K
key, @Nullable V val) {
+ return executeSyncOp(() -> view.getNullableAndReplace(tx, key, val));
+ }
+
+ @Override
+ public CompletableFuture<NullableValue<V>>
getNullableAndReplaceAsync(@Nullable Transaction tx, K key, @Nullable V val) {
+ return executeAsyncOp(() -> view.getNullableAndReplaceAsync(tx, key,
val));
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientRecordView.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientRecordView.java
new file mode 100644
index 0000000000..94ca4ed098
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientRecordView.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.table.api;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wrapper around {@link RecordView} that grants a specific role to the
synchronous or asynchronous invoked.
+ *
+ * @see PublicApiThreading#execUserSyncOperation(Supplier)
+ * @see PublicApiThreading#execUserAsyncOperation(Supplier)
+ */
+public class PublicApiClientRecordView<R> extends PublicApiClientViewBase<R>
implements RecordView<R>, Wrapper {
+ private final RecordView<R> view;
+
+ /**
+ * Constructor.
+ *
+ * @param view View to wrap.
+ */
+ public PublicApiClientRecordView(RecordView<R> view) {
+ super(view, view);
+
+ this.view = view;
+ }
+
+ @Override
+ public R get(@Nullable Transaction tx, R keyRec) {
+ return executeSyncOp(() -> view.get(tx, keyRec));
+ }
+
+ @Override
+ public CompletableFuture<R> getAsync(@Nullable Transaction tx, R keyRec) {
+ return executeAsyncOp(() -> view.getAsync(tx, keyRec));
+ }
+
+ @Override
+ public List<R> getAll(@Nullable Transaction tx, Collection<R> keyRecs) {
+ return executeSyncOp(() -> view.getAll(tx, keyRecs));
+ }
+
+ @Override
+ public CompletableFuture<List<R>> getAllAsync(@Nullable Transaction tx,
Collection<R> keyRecs) {
+ return executeAsyncOp(() -> view.getAllAsync(tx, keyRecs));
+ }
+
+ @Override
+ public boolean contains(@Nullable Transaction tx, R keyRec) {
+ return executeSyncOp(() -> view.contains(tx, keyRec));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsAsync(@Nullable Transaction tx,
R keyRec) {
+ return executeAsyncOp(() -> view.containsAsync(tx, keyRec));
+ }
+
+ @Override
+ public boolean containsAll(@Nullable Transaction tx, Collection<R> keys) {
+ return executeSyncOp(() -> view.containsAll(tx, keys));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsAllAsync(@Nullable Transaction
tx, Collection<R> keys) {
+ return executeAsyncOp(() -> view.containsAllAsync(tx, keys));
+ }
+
+ @Override
+ public void upsert(@Nullable Transaction tx, R rec) {
+ executeSyncOp(() -> view.upsert(tx, rec));
+ }
+
+ @Override
+ public CompletableFuture<Void> upsertAsync(@Nullable Transaction tx, R
rec) {
+ return executeAsyncOp(() -> view.upsertAsync(tx, rec));
+ }
+
+ @Override
+ public void upsertAll(@Nullable Transaction tx, Collection<R> recs) {
+ executeSyncOp(() -> view.upsertAll(tx, recs));
+ }
+
+ @Override
+ public CompletableFuture<Void> upsertAllAsync(@Nullable Transaction tx,
Collection<R> recs) {
+ return executeAsyncOp(() -> view.upsertAllAsync(tx, recs));
+ }
+
+ @Override
+ public R getAndUpsert(@Nullable Transaction tx, R rec) {
+ return executeSyncOp(() -> view.getAndUpsert(tx, rec));
+ }
+
+ @Override
+ public CompletableFuture<R> getAndUpsertAsync(@Nullable Transaction tx, R
rec) {
+ return executeAsyncOp(() -> view.getAndUpsertAsync(tx, rec));
+ }
+
+ @Override
+ public boolean insert(@Nullable Transaction tx, R rec) {
+ return executeSyncOp(() -> view.insert(tx, rec));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> insertAsync(@Nullable Transaction tx, R
rec) {
+ return executeAsyncOp(() -> view.insertAsync(tx, rec));
+ }
+
+ @Override
+ public List<R> insertAll(@Nullable Transaction tx, Collection<R> recs) {
+ return executeSyncOp(() -> view.insertAll(tx, recs));
+ }
+
+ @Override
+ public CompletableFuture<List<R>> insertAllAsync(@Nullable Transaction tx,
Collection<R> recs) {
+ return executeAsyncOp(() -> view.insertAllAsync(tx, recs));
+ }
+
+ @Override
+ public boolean replace(@Nullable Transaction tx, R rec) {
+ return executeSyncOp(() -> view.replace(tx, rec));
+ }
+
+ @Override
+ public boolean replace(@Nullable Transaction tx, R oldRec, R newRec) {
+ return executeSyncOp(() -> view.replace(tx, oldRec, newRec));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, R
rec) {
+ return executeAsyncOp(() -> view.replaceAsync(tx, rec));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, R
oldRec, R newRec) {
+ return executeAsyncOp(() -> view.replaceAsync(tx, oldRec, newRec));
+ }
+
+ @Override
+ public R getAndReplace(@Nullable Transaction tx, R rec) {
+ return executeSyncOp(() -> view.getAndReplace(tx, rec));
+ }
+
+ @Override
+ public CompletableFuture<R> getAndReplaceAsync(@Nullable Transaction tx, R
rec) {
+ return executeAsyncOp(() -> view.getAndReplaceAsync(tx, rec));
+ }
+
+ @Override
+ public boolean delete(@Nullable Transaction tx, R keyRec) {
+ return executeSyncOp(() -> view.delete(tx, keyRec));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> deleteAsync(@Nullable Transaction tx, R
keyRec) {
+ return executeAsyncOp(() -> view.deleteAsync(tx, keyRec));
+ }
+
+ @Override
+ public boolean deleteExact(@Nullable Transaction tx, R rec) {
+ return executeSyncOp(() -> view.deleteExact(tx, rec));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> deleteExactAsync(@Nullable Transaction
tx, R rec) {
+ return executeAsyncOp(() -> view.deleteExactAsync(tx, rec));
+ }
+
+ @Override
+ public R getAndDelete(@Nullable Transaction tx, R keyRec) {
+ return executeSyncOp(() -> view.getAndDelete(tx, keyRec));
+ }
+
+ @Override
+ public CompletableFuture<R> getAndDeleteAsync(@Nullable Transaction tx, R
keyRec) {
+ return executeAsyncOp(() -> view.getAndDeleteAsync(tx, keyRec));
+ }
+
+ @Override
+ public List<R> deleteAll(@Nullable Transaction tx, Collection<R> keyRecs) {
+ return executeSyncOp(() -> view.deleteAll(tx, keyRecs));
+ }
+
+ @Override
+ public CompletableFuture<List<R>> deleteAllAsync(@Nullable Transaction tx,
Collection<R> keyRecs) {
+ return executeAsyncOp(() -> view.deleteAllAsync(tx, keyRecs));
+ }
+
+ @Override
+ public List<R> deleteAllExact(@Nullable Transaction tx, Collection<R>
recs) {
+ return executeSyncOp(() -> view.deleteAllExact(tx, recs));
+ }
+
+ @Override
+ public CompletableFuture<List<R>> deleteAllExactAsync(@Nullable
Transaction tx, Collection<R> recs) {
+ return executeAsyncOp(() -> view.deleteAllExactAsync(tx, recs));
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> classToUnwrap) {
+ return classToUnwrap.cast(view);
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientViewBase.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientViewBase.java
new file mode 100644
index 0000000000..d9e2052b3b
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/api/PublicApiClientViewBase.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.table.api;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Publisher;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.lang.AsyncCursor;
+import org.apache.ignite.lang.Cursor;
+import org.apache.ignite.table.DataStreamerItem;
+import org.apache.ignite.table.DataStreamerOptions;
+import org.apache.ignite.table.DataStreamerTarget;
+import org.apache.ignite.table.ReceiverDescriptor;
+import org.apache.ignite.table.criteria.Criteria;
+import org.apache.ignite.table.criteria.CriteriaQueryOptions;
+import org.apache.ignite.table.criteria.CriteriaQuerySource;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+abstract class PublicApiClientViewBase<T> implements DataStreamerTarget<T>,
CriteriaQuerySource<T> {
+ private final DataStreamerTarget<T> streamerTarget;
+ private final CriteriaQuerySource<T> querySource;
+
+ PublicApiClientViewBase(
+ DataStreamerTarget<T> streamerTarget,
+ CriteriaQuerySource<T> querySource
+ ) {
+ this.streamerTarget = streamerTarget;
+ this.querySource = querySource;
+ }
+
+ @Override
+ public CompletableFuture<Void> streamData(Publisher<DataStreamerItem<T>>
publisher, @Nullable DataStreamerOptions options) {
+ return executeAsyncOp(() -> streamerTarget.streamData(publisher,
options));
+ }
+
+ @Override
+ public <E, V, R, A> CompletableFuture<Void> streamData(
+ Publisher<E> publisher,
+ Function<E, T> keyFunc,
+ Function<E, V> payloadFunc,
+ ReceiverDescriptor<A> receiver,
+ @Nullable Flow.Subscriber<R> resultSubscriber,
+ @Nullable DataStreamerOptions options,
+ A receiverArg) {
+ return executeAsyncOp(() -> streamerTarget.streamData(
+ publisher,
+ keyFunc,
+ payloadFunc,
+ receiver,
+ resultSubscriber,
+ options,
+ receiverArg));
+ }
+
+ @Override
+ public Cursor<T> query(
+ @Nullable Transaction tx,
+ @Nullable Criteria criteria,
+ @Nullable String indexName,
+ @Nullable CriteriaQueryOptions opts
+ ) {
+ return executeSyncOp(() -> querySource.query(tx, criteria, indexName,
opts));
+ }
+
+ @Override
+ public CompletableFuture<AsyncCursor<T>> queryAsync(
+ @Nullable Transaction tx,
+ @Nullable Criteria criteria,
+ @Nullable String indexName,
+ @Nullable CriteriaQueryOptions opts
+ ) {
+ return executeAsyncOp(() -> querySource.queryAsync(tx, criteria,
indexName, opts));
+ }
+
+ final <U> CompletableFuture<U>
executeAsyncOp(Supplier<CompletableFuture<U>> operation) {
+ CompletableFuture<U> future =
PublicApiThreading.execUserAsyncOperation(operation);
+
+ return future;
+ }
+
+ static <T> T executeSyncOp(Supplier<T> operation) {
+ return PublicApiThreading.execUserSyncOperation(operation);
+ }
+
+ static void executeSyncOp(Runnable operation) {
+ PublicApiThreading.execUserSyncOperation(operation);
+ }
+}