This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 37fe3e86f1 IGNITE-19888 Java client: Track observable timestamp (#2371)
37fe3e86f1 is described below
commit 37fe3e86f176e3ca43b605d42362dfd9acee1583
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Aug 1 14:14:30 2023 +0300
IGNITE-19888 Java client: Track observable timestamp (#2371)
* Add observable timestamp to standard response header in client protocol
(including errors, because data modification can occur before the error, like
in a compute task - so we still need to update client-side timestamp)
* Add observable timestamp to TX_BEGIN and SQL_EXEC client requests
* Propagate latest known timestamp in Java client
* Fix .NET and C++ clients to ignore the timestamp for now (separate
tickets exist)
---
.../internal/client/proto/ClientMessagePacker.java | 50 ++++++
modules/client-handler/build.gradle | 2 +
.../apache/ignite/client/handler/TestServer.java | 22 ++-
.../ignite/client/handler/ClientHandlerModule.java | 18 +-
.../handler/ClientInboundMessageHandler.java | 40 ++++-
.../requests/sql/ClientSqlExecuteRequest.java | 16 +-
.../handler/requests/table/ClientTableCommon.java | 16 +-
.../table/ClientTupleContainsKeyRequest.java | 2 +-
.../table/ClientTupleDeleteAllExactRequest.java | 2 +-
.../table/ClientTupleDeleteAllRequest.java | 2 +-
.../table/ClientTupleDeleteExactRequest.java | 2 +-
.../requests/table/ClientTupleDeleteRequest.java | 2 +-
.../requests/table/ClientTupleGetAllRequest.java | 2 +-
.../table/ClientTupleGetAndDeleteRequest.java | 2 +-
.../table/ClientTupleGetAndReplaceRequest.java | 2 +-
.../table/ClientTupleGetAndUpsertRequest.java | 2 +-
.../requests/table/ClientTupleGetRequest.java | 2 +-
.../table/ClientTupleInsertAllRequest.java | 2 +-
.../requests/table/ClientTupleInsertRequest.java | 2 +-
.../table/ClientTupleReplaceExactRequest.java | 2 +-
.../requests/table/ClientTupleReplaceRequest.java | 2 +-
.../table/ClientTupleUpsertAllRequest.java | 2 +-
.../requests/table/ClientTupleUpsertRequest.java | 2 +-
.../requests/tx/ClientTransactionBeginRequest.java | 47 +++--
.../ignite/internal/client/ClientChannel.java | 15 +-
.../ignite/internal/client/ReliableChannel.java | 23 +++
.../ignite/internal/client/TcpClientChannel.java | 14 ++
.../ignite/internal/client/sql/ClientSession.java | 2 +
.../internal/client/tx/ClientTransactions.java | 5 +-
.../apache/ignite/client/ClientMetricsTest.java | 2 +-
.../client/ObservableTimestampPropagationTest.java | 97 +++++++++++
.../apache/ignite/client/RequestBalancingTest.java | 2 +-
.../ignite/client/TestClientHandlerModule.java | 15 +-
.../java/org/apache/ignite/client/TestServer.java | 46 ++++-
.../org/apache/ignite/client/fakes/FakeIgnite.java | 103 +----------
.../apache/ignite/client/fakes/FakeTxManager.java | 191 +++++++++++++++++++++
.../RepeatedFinishClientTransactionTest.java | 7 +-
.../cpp/ignite/client/detail/node_connection.cpp | 3 +
.../cpp/ignite/client/detail/sql/sql_impl.cpp | 30 ++--
.../client/detail/transaction/transactions_impl.h | 3 +
.../platforms/cpp/ignite/odbc/query/data_query.cpp | 3 +
.../platforms/cpp/ignite/odbc/sql_connection.cpp | 3 +
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 1 +
.../dotnet/Apache.Ignite.Tests/MetricsTests.cs | 2 +-
.../platforms/dotnet/Apache.Ignite/ErrorGroups.cs | 1 -
.../dotnet/Apache.Ignite/Internal/ClientSocket.cs | 3 +
.../Internal/Proto/MsgPack/MsgPackWriter.cs | 5 +
.../dotnet/Apache.Ignite/Internal/Sql/Sql.cs | 3 +
.../Internal/Transactions/Transactions.cs | 11 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
.../org/apache/ignite/internal/tx/TxManager.java | 2 +-
.../internal/tx/impl/IgniteTransactionsImpl.java | 31 ++--
.../ignite/internal/tx/impl/TxManagerImpl.java | 2 +-
53 files changed, 680 insertions(+), 191 deletions(-)
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
index 3584b6cdad..f4ef190be4 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
@@ -46,6 +46,11 @@ public class ClientMessagePacker implements AutoCloseable {
*/
private boolean closed;
+ /**
+ * Metadata.
+ */
+ private @Nullable Object meta;
+
/**
* Constructor.
*
@@ -179,6 +184,29 @@ public class ClientMessagePacker implements AutoCloseable {
}
}
+ /**
+ * Reserve space for long value.
+ *
+ * @return Index of reserved space.
+ */
+ public int reserveLong() {
+ buf.writeByte(Code.INT64);
+ var index = buf.writerIndex();
+
+ buf.writeLong(0);
+ return index;
+ }
+
+ /**
+ * Set long value at reserved index (see {@link #reserveLong()}).
+ *
+ * @param index Index.
+ * @param v Value.
+ */
+ public void setLong(int index, long v) {
+ buf.setLong(index, v);
+ }
+
/**
* Writes a long value.
*
@@ -559,6 +587,10 @@ public class ClientMessagePacker implements AutoCloseable {
packInt(vals.length);
+ if (vals.length == 0) {
+ return;
+ }
+
// Builder with inline schema.
// Every element in vals is represented by 3 tuple elements: type,
scale, value.
var builder = new BinaryTupleBuilder(vals.length * 3);
@@ -631,6 +663,24 @@ public class ClientMessagePacker implements AutoCloseable {
writePayload(buf);
}
+ /**
+ * Gets metadata.
+ *
+ * @return Metadata.
+ */
+ public @Nullable Object meta() {
+ return meta;
+ }
+
+ /**
+ * Sets metadata.
+ *
+ * @param meta Metadata.
+ */
+ public void meta(@Nullable Object meta) {
+ this.meta = meta;
+ }
+
/**
* {@inheritDoc}
*/
diff --git a/modules/client-handler/build.gradle
b/modules/client-handler/build.gradle
index 008e6208ca..17907c6f81 100644
--- a/modules/client-handler/build.gradle
+++ b/modules/client-handler/build.gradle
@@ -35,6 +35,7 @@ dependencies {
implementation project(':ignite-schema')
implementation project(':ignite-security')
implementation project(':ignite-metrics')
+ implementation project(':ignite-transactions')
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
implementation libs.slf4j.jdk14
@@ -59,6 +60,7 @@ dependencies {
integrationTestImplementation project(':ignite-table')
integrationTestImplementation project(':ignite-metrics')
integrationTestImplementation project(':ignite-security')
+ integrationTestImplementation project(':ignite-transactions')
integrationTestImplementation(testFixtures(project(':ignite-configuration')))
integrationTestImplementation(testFixtures(project(':ignite-core')))
integrationTestImplementation libs.msgpack.core
diff --git
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
index 552806934a..b1ddc6c992 100644
---
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
+++
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
@@ -32,16 +32,17 @@ import
org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
import
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import
org.apache.ignite.internal.security.authentication.AuthenticationManager;
import
org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.tx.IgniteTransactions;
import org.junit.jupiter.api.TestInfo;
import org.mockito.Mockito;
@@ -115,10 +116,21 @@ public class TestServer {
Mockito.when(clusterService.topologyService().localMember().id()).thenReturn("id");
Mockito.when(clusterService.topologyService().localMember().name()).thenReturn("consistent-id");
- var module = new ClientHandlerModule(mock(QueryProcessor.class),
mock(IgniteTablesInternal.class), mock(IgniteTransactions.class),
- registry, mock(IgniteCompute.class), clusterService,
bootstrapFactory, mock(IgniteSql.class),
- () -> CompletableFuture.completedFuture(UUID.randomUUID()),
mock(MetricManager.class), metrics,
- authenticationManager(), authenticationConfiguration
+ var module = new ClientHandlerModule(
+ mock(QueryProcessor.class),
+ mock(IgniteTablesInternal.class),
+ mock(IgniteTransactionsImpl.class),
+ registry,
+ mock(IgniteCompute.class),
+ clusterService,
+ bootstrapFactory,
+ mock(IgniteSql.class),
+ () -> CompletableFuture.completedFuture(UUID.randomUUID()),
+ mock(MetricManager.class),
+ metrics,
+ authenticationManager(),
+ authenticationConfiguration,
+ new HybridClockImpl()
);
module.start();
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index a8fe8e1f79..6dacd07709 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -36,6 +36,7 @@ import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.client.proto.ClientMessageDecoder;
import org.apache.ignite.internal.configuration.AuthenticationConfiguration;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
@@ -44,13 +45,13 @@ import
org.apache.ignite.internal.network.ssl.SslContextProvider;
import
org.apache.ignite.internal.security.authentication.AuthenticationManager;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.tx.IgniteTransactions;
/**
* Client handler module maintains TCP endpoint for thin client connections.
@@ -66,7 +67,7 @@ public class ClientHandlerModule implements IgniteComponent {
private final IgniteTablesInternal igniteTables;
/** Ignite transactions API. */
- private final IgniteTransactions igniteTransactions;
+ private final IgniteTransactionsImpl igniteTransactions;
/** Ignite SQL API. */
private final IgniteSql sql;
@@ -102,6 +103,8 @@ public class ClientHandlerModule implements IgniteComponent
{
private final AuthenticationConfiguration authenticationConfiguration;
+ private final HybridClock clock;
+
/**
* Constructor.
*
@@ -117,11 +120,12 @@ public class ClientHandlerModule implements
IgniteComponent {
* @param metricManager Metric manager.
* @param authenticationManager Authentication manager.
* @param authenticationConfiguration Authentication configuration.
+ * @param clock Hybrid clock.
*/
public ClientHandlerModule(
QueryProcessor queryProcessor,
IgniteTablesInternal igniteTables,
- IgniteTransactions igniteTransactions,
+ IgniteTransactionsImpl igniteTransactions,
ConfigurationRegistry registry,
IgniteCompute igniteCompute,
ClusterService clusterService,
@@ -131,7 +135,8 @@ public class ClientHandlerModule implements IgniteComponent
{
MetricManager metricManager,
ClientHandlerMetricSource metrics,
AuthenticationManager authenticationManager,
- AuthenticationConfiguration authenticationConfiguration) {
+ AuthenticationConfiguration authenticationConfiguration,
+ HybridClock clock) {
assert igniteTables != null;
assert registry != null;
assert queryProcessor != null;
@@ -144,6 +149,7 @@ public class ClientHandlerModule implements IgniteComponent
{
assert metrics != null;
assert authenticationManager != null;
assert authenticationConfiguration != null;
+ assert clock != null;
this.queryProcessor = queryProcessor;
this.igniteTables = igniteTables;
@@ -158,6 +164,7 @@ public class ClientHandlerModule implements IgniteComponent
{
this.metrics = metrics;
this.authenticationManager = authenticationManager;
this.authenticationConfiguration = authenticationConfiguration;
+ this.clock = clock;
}
/** {@inheritDoc} */
@@ -291,7 +298,8 @@ public class ClientHandlerModule implements IgniteComponent
{
sql,
clusterId,
metrics,
- authenticationManager);
+ authenticationManager,
+ clock);
authenticationConfiguration.listen(clientInboundMessageHandler);
return clientInboundMessageHandler;
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index ef195a296d..7e648904b8 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -92,6 +92,8 @@ import
org.apache.ignite.internal.client.proto.ProtocolVersion;
import org.apache.ignite.internal.client.proto.ResponseFlags;
import org.apache.ignite.internal.client.proto.ServerMessageType;
import org.apache.ignite.internal.configuration.AuthenticationView;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
import org.apache.ignite.internal.jdbc.proto.JdbcQueryEventHandler;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -104,6 +106,7 @@ import
org.apache.ignite.internal.security.authentication.UserDetails;
import
org.apache.ignite.internal.security.authentication.UsernamePasswordRequest;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalCheckedException;
@@ -113,7 +116,6 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.security.AuthenticationException;
import org.apache.ignite.security.AuthenticationType;
import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.tx.IgniteTransactions;
import org.jetbrains.annotations.Nullable;
/**
@@ -128,7 +130,7 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
private final IgniteTablesInternal igniteTables;
/** Ignite transactions API. */
- private final IgniteTransactions igniteTransactions;
+ private final IgniteTransactionsImpl igniteTransactions;
/** JDBC Handler. */
private final JdbcQueryEventHandler jdbcQueryEventHandler;
@@ -157,6 +159,9 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
/** Metrics. */
private final ClientHandlerMetricSource metrics;
+ /** Hybrid clock. */
+ private final HybridClock clock;
+
/** Context. */
private ClientContext clientContext;
@@ -185,10 +190,11 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
* @param clusterId Cluster ID.
* @param metrics Metrics.
* @param authenticationManager Authentication manager.
+ * @param clock Hybrid clock.
*/
public ClientInboundMessageHandler(
IgniteTablesInternal igniteTables,
- IgniteTransactions igniteTransactions,
+ IgniteTransactionsImpl igniteTransactions,
QueryProcessor processor,
ClientConnectorView configuration,
IgniteCompute compute,
@@ -196,7 +202,8 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
IgniteSql sql,
UUID clusterId,
ClientHandlerMetricSource metrics,
- AuthenticationManager authenticationManager
+ AuthenticationManager authenticationManager,
+ HybridClock clock
) {
assert igniteTables != null;
assert igniteTransactions != null;
@@ -208,6 +215,7 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
assert clusterId != null;
assert metrics != null;
assert authenticationManager != null;
+ assert clock != null;
this.igniteTables = igniteTables;
this.igniteTransactions = igniteTransactions;
@@ -218,6 +226,7 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
this.clusterId = clusterId;
this.metrics = metrics;
this.authenticationManager = authenticationManager;
+ this.clock = clock;
jdbcQueryCursorHandler = new JdbcQueryCursorHandlerImpl(resources);
jdbcQueryEventHandler =
@@ -386,6 +395,11 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
packer.packLong(requestId);
writeFlags(packer, ctx);
+ // Include server timestamp in error response as well:
+ // an operation can modify data and then throw an exception (e.g.
Compute task),
+ // so we still need to update client-side timestamp to preserve
causality guarantees.
+ packer.packLong(observableTimestamp(null));
+
writeErrorCore(err, packer);
write(packer, ctx);
@@ -456,6 +470,9 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
out.packInt(ServerMessageType.RESPONSE);
out.packLong(requestId);
writeFlags(out, ctx);
+
+ // Observable timestamp should be calculated after the operation
is processed; reserve space, write later.
+ int observableTimestampIdx = out.reserveLong();
out.packNil(); // No error.
var fut = processOperation(in, out, opCode);
@@ -463,6 +480,7 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
if (fut == null) {
// Operation completed synchronously.
in.close();
+ out.setLong(observableTimestampIdx, observableTimestamp(out));
write(out, ctx);
if (LOG.isTraceEnabled()) {
@@ -486,6 +504,7 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
metrics.requestsFailedIncrement();
} else {
+ out.setLong(observableTimestampIdx,
observableTimestamp(out));
write(out, ctx);
metrics.requestsProcessedIncrement();
@@ -724,4 +743,17 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
return null;
}
+
+ private long observableTimestamp(@Nullable ClientMessagePacker out) {
+ // Certain operations can override the timestamp and provide it in the
meta object.
+ if (out != null) {
+ Object meta = out.meta();
+
+ if (meta instanceof HybridTimestamp) {
+ return ((HybridTimestamp) meta).longValue();
+ }
+ }
+
+ return clock.now().longValue();
+ }
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index b68da2fc36..ad8675dd08 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -33,6 +33,7 @@ import
org.apache.ignite.internal.binarytuple.BinaryTupleReader;
import org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
@@ -68,7 +69,7 @@ public class ClientSqlExecuteRequest {
IgniteSql sql,
ClientResourceRegistry resources,
ClientHandlerMetricSource metrics) {
- var tx = readTx(in, resources);
+ var tx = readTx(in, out, resources);
Session session = readSession(in, sql);
Statement statement = readStatement(in, sql);
Object[] arguments = in.unpackObjectArrayFromBinaryTuple();
@@ -78,9 +79,20 @@ public class ClientSqlExecuteRequest {
arguments = ArrayUtils.OBJECT_EMPTY_ARRAY;
}
+ // TODO IGNITE-19898 SQL implicit RO transaction should use
observation timestamp.
+ HybridTimestamp unused =
HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
+
return session
.executeAsync(tx, statement, arguments)
- .thenCompose(asyncResultSet -> writeResultSetAsync(out,
resources, asyncResultSet, session, metrics));
+ .thenCompose(asyncResultSet -> {
+ //noinspection StatementWithEmptyBody
+ if (tx == null) {
+ // TODO IGNITE-19898 Return readTimestamp from
implicit RO TX to the client
+ // out.meta(asyncResultSet.tx().readTimestamp());
+ }
+
+ return writeResultSetAsync(out, resources, asyncResultSet,
session, metrics);
+ });
}
private static CompletionStage<Void> writeResultSetAsync(
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index 0c05718307..ad25f0be1e 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -42,13 +42,13 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.TemporalNativeType;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.sql.ColumnType;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.manager.IgniteTables;
-import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -367,16 +367,26 @@ public class ClientTableCommon {
* Reads transaction.
*
* @param in Unpacker.
+ * @param out Packer.
* @param resources Resource registry.
* @return Transaction, if present, or null.
*/
- public static @Nullable Transaction readTx(ClientMessageUnpacker in,
ClientResourceRegistry resources) {
+ public static @Nullable InternalTransaction readTx(
+ ClientMessageUnpacker in, ClientMessagePacker out,
ClientResourceRegistry resources) {
if (in.tryUnpackNil()) {
return null;
}
try {
- return resources.get(in.unpackLong()).get(Transaction.class);
+ var tx =
resources.get(in.unpackLong()).get(InternalTransaction.class);
+
+ if (tx != null && tx.isReadOnly()) {
+ // For read-only tx, override observable timestamp that we
send to the client:
+ // use readTimestamp() instead of now().
+ out.meta(tx.readTimestamp());
+ }
+
+ return tx;
} catch (IgniteInternalCheckedException e) {
throw new IgniteException(e.traceId(), e.code(), e.getMessage(),
e);
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
index e3564e33b8..9e21b2bd46 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
@@ -47,7 +47,7 @@ public class ClientTupleContainsKeyRequest {
ClientResourceRegistry resources
) {
return readTableAsync(in, tables).thenCompose(table -> {
- var tx = readTx(in, resources);
+ var tx = readTx(in, out, resources);
var keyTuple = readTuple(in, table, true);
return table.recordView().getAsync(tx, keyTuple)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
index 7f203d0051..4ebaac2182 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
@@ -48,7 +48,7 @@ public class ClientTupleDeleteAllExactRequest {
ClientResourceRegistry resources
) {
return readTableAsync(in, tables).thenCompose(table -> {
- var tx = readTx(in, resources);
+ var tx = readTx(in, out, resources);
var tuples = readTuples(in, table, false);
return table.recordView().deleteAllExactAsync(tx, tuples)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
index 3bcb103f66..2b991d444d 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
@@ -49,7 +49,7 @@ public class ClientTupleDeleteAllRequest {
ClientResourceRegistry resources
) {
return readTableAsync(in, tables).thenCompose(table -> {
- var tx = readTx(in, resources);
+ var tx = readTx(in, out, resources);
var tuples = readTuples(in, table, true);
return table.recordView().deleteAllAsync(tx,
tuples).thenAccept(skippedTuples ->
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
index 0d9a480403..55aac9821e 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
@@ -47,7 +47,7 @@ public class ClientTupleDeleteExactRequest {
ClientResourceRegistry resources
) {
return readTableAsync(in, tables).thenCompose(table -> {
- var tx = readTx(in, resources);
+ var tx = readTx(in, out, resources);
var tuple = readTuple(in, table, false);
return table.recordView().deleteExactAsync(tx,
tuple).thenAccept(res -> {
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
index 9b43617036..cda8f6d7b6 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
@@ -47,7 +47,7 @@ public class ClientTupleDeleteRequest {
ClientResourceRegistry resources
) {
return readTableAsync(in, tables).thenCompose(table -> {
- var tx = readTx(in, resources);
+ var tx = readTx(in, out, resources);
var tuple = readTuple(in, table, true);
return table.recordView().deleteAsync(tx, tuple).thenAccept(res ->
{
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
index 3f2b74886b..ca63e74323 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
@@ -49,7 +49,7 @@ public class ClientTupleGetAllRequest {
ClientResourceRegistry resources
) {
return readTableAsync(in, tables).thenCompose(table -> {
- var tx = readTx(in, resources);
+ var tx = readTx(in, out, resources);
var keyTuples = readTuples(in, table, true);
return table.recordView().getAllAsync(tx,
keyTuples).thenAccept(tuples ->
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
index 91197d6bc1..c2b894c70e 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
@@ -48,7 +48,7 @@ public class ClientTupleGetAndDeleteRequest {
ClientResourceRegistry resources
) {
return readTableAsync(in, tables).thenCompose(table -> {
- var tx = readTx(in, resources);
+ var tx = readTx(in, out, resources);
var tuple = readTuple(in, table, true);
return table.recordView().getAndDeleteAsync(tx, tuple).thenAccept(
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
index 7a78ecc44c..626750d2f2 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
@@ -48,7 +48,7 @@ public class ClientTupleGetAndReplaceRequest {
ClientResourceRegistry resources
) {
return readTableAsync(in, tables).thenCompose(table -> {
- var tx = readTx(in, resources);
+ var tx = readTx(in, out, resources);
var tuple = readTuple(in, table, false);
return table.recordView().getAndReplaceAsync(tx, tuple).thenAccept(
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
index 9245336c45..873a0e6758 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
@@ -48,7 +48,7 @@ public class ClientTupleGetAndUpsertRequest {
ClientResourceRegistry resources
) {
return readTableAsync(in, tables).thenCompose(table -> {
- var tx = readTx(in, resources);
+ var tx = readTx(in, out, resources);
var tuple = readTuple(in, table, false);
return table.recordView().getAndUpsertAsync(tx, tuple).thenAccept(
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
index 4d6530a8b7..51c88acfa2 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
@@ -48,7 +48,7 @@ public class ClientTupleGetRequest {
ClientResourceRegistry resources
) {
return readTableAsync(in, tables).thenCompose(table -> {
- var tx = readTx(in, resources);
+ var tx = readTx(in, out, resources);
var keyTuple = readTuple(in, table, true);
return table.recordView().getAsync(tx, keyTuple)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
index 0af2351882..cc7291643b 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
@@ -48,7 +48,7 @@ public class ClientTupleInsertAllRequest {
ClientResourceRegistry resources
) {
return readTableAsync(in, tables).thenCompose(table -> {
- var tx = readTx(in, resources);
+ var tx = readTx(in, out, resources);
var tuples = readTuples(in, table, false);
return table.recordView().insertAllAsync(tx,
tuples).thenAccept(skippedTuples ->
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
index 81cf7f71e9..5398ea50fd 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
@@ -47,7 +47,7 @@ public class ClientTupleInsertRequest {
ClientResourceRegistry resources
) {
return readTableAsync(in, tables).thenCompose(table -> {
- var tx = readTx(in, resources);
+ var tx = readTx(in, out, resources);
var tuple = readTuple(in, table, false);
return table.recordView().insertAsync(tx, tuple).thenAccept(res ->
{
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
index 4d10dd7859..0c1955e21b 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
@@ -48,7 +48,7 @@ public class ClientTupleReplaceExactRequest {
ClientResourceRegistry resources
) {
return readTableAsync(in, tables).thenCompose(table -> {
- var tx = readTx(in, resources);
+ var tx = readTx(in, out, resources);
var schema = readSchema(in, table);
var oldTuple = readTuple(in, false, schema);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
index e7df36a156..46b1f48958 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
@@ -47,7 +47,7 @@ public class ClientTupleReplaceRequest {
ClientResourceRegistry resources
) {
return readTableAsync(in, tables).thenCompose(table -> {
- var tx = readTx(in, resources);
+ var tx = readTx(in, out, resources);
var tuple = readTuple(in, table, false);
return table.recordView().replaceAsync(tx, tuple).thenAccept(res
-> {
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
index a4f178d1ea..63e9192bba 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
@@ -47,7 +47,7 @@ public class ClientTupleUpsertAllRequest {
ClientResourceRegistry resources
) {
return readTableAsync(in, tables).thenCompose(table -> {
- var tx = readTx(in, resources);
+ var tx = readTx(in, out, resources);
var tuples = readTuples(in, table, false);
return table.recordView().upsertAllAsync(tx, tuples)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
index f78990d921..b1d3bcf204 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
@@ -47,7 +47,7 @@ public class ClientTupleUpsertRequest {
ClientResourceRegistry resources
) {
return readTableAsync(in, tables).thenCompose(table -> {
- var tx = readTx(in, resources);
+ var tx = readTx(in, out, resources);
var tuple = readTuple(in, table, false);
return table.recordView().upsertAsync(tx, tuple).thenAccept(v ->
out.packInt(table.schemaView().lastSchemaVersion()));
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
index 5f05681c57..8464cc4a89 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
@@ -23,10 +23,11 @@ import org.apache.ignite.client.handler.ClientResource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.lang.IgniteInternalCheckedException;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.tx.TransactionOptions;
+import org.jetbrains.annotations.Nullable;
/**
* Client transaction begin request.
@@ -42,28 +43,42 @@ public class ClientTransactionBeginRequest {
* @param metrics Metrics.
* @return Future.
*/
- public static CompletableFuture<Void> process(
+ public static @Nullable CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
- IgniteTransactions transactions,
+ IgniteTransactionsImpl transactions,
ClientResourceRegistry resources,
- ClientHandlerMetricSource metrics) {
+ ClientHandlerMetricSource metrics) throws
IgniteInternalCheckedException {
TransactionOptions options = null;
+ HybridTimestamp observableTs = null;
- if (in.unpackBoolean()) {
+ boolean readOnly = in.unpackBoolean();
+ if (readOnly) {
options = new TransactionOptions().readOnly(true);
+
+ // Timestamp makes sense only for read-only transactions.
+ observableTs =
HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
+ }
+
+ // NOTE: we don't use beginAsync here because it is synchronous anyway.
+ var tx = transactions.begin(options, observableTs);
+
+ if (readOnly) {
+ // For read-only tx, override observable timestamp that we send to
the client:
+ // use readTimestamp() instead of now().
+ out.meta(tx.readTimestamp());
}
- return transactions.beginAsync(options).thenAccept(tx -> {
- try {
- long resourceId = resources.put(new ClientResource(tx,
tx::rollbackAsync));
- out.packLong(resourceId);
+ try {
+ long resourceId = resources.put(new ClientResource(tx,
tx::rollbackAsync));
+ out.packLong(resourceId);
+
+ metrics.transactionsActiveIncrement();
- metrics.transactionsActiveIncrement();
- } catch (IgniteInternalCheckedException e) {
- tx.rollback();
- throw new IgniteInternalException(e.getMessage(), e);
- }
- });
+ return null;
+ } catch (IgniteInternalCheckedException e) {
+ tx.rollback();
+ throw e;
+ }
}
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
index 46895e3ac1..e1c3ca42d4 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
@@ -33,7 +33,7 @@ public interface ClientChannel extends AutoCloseable {
* @param <T> Response type.
* @return Future for the operation.
*/
- public <T> CompletableFuture<T> serviceAsync(
+ <T> CompletableFuture<T> serviceAsync(
int opCode,
PayloadWriter payloadWriter,
PayloadReader<T> payloadReader
@@ -44,19 +44,26 @@ public interface ClientChannel extends AutoCloseable {
*
* @return {@code True} channel is closed.
*/
- public boolean closed();
+ boolean closed();
/**
* Returns protocol context.
*
* @return Protocol context.
*/
- public ProtocolContext protocolContext();
+ ProtocolContext protocolContext();
/**
* Add topology change listener.
*
* @param listener Listener.
*/
- public void addTopologyAssignmentChangeListener(Consumer<ClientChannel>
listener);
+ void addTopologyAssignmentChangeListener(Consumer<ClientChannel> listener);
+
+ /**
+ * Add observable timestamp listener.
+ *
+ * @param listener Listener.
+ */
+ void addObservableTimestampListener(Consumer<Long> listener);
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 0239296e59..c24098c1c2 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -106,6 +106,9 @@ public final class ReliableChannel implements AutoCloseable
{
* the table will compare its version with channel version to detect an
update. */
private final AtomicLong assignmentVersion = new AtomicLong();
+ /** Observable timestamp, or causality token. Sent by the server with
every response, and required by some requests. */
+ private final AtomicLong observableTimestamp = new AtomicLong();
+
/** Cluster id from the first handshake. */
private final AtomicReference<UUID> clusterId = new AtomicReference<>();
@@ -180,6 +183,10 @@ public final class ReliableChannel implements
AutoCloseable {
return clientCfg;
}
+ public long observableTimestamp() {
+ return observableTimestamp.get();
+ }
+
/**
* Sends request and handles response asynchronously.
*
@@ -649,6 +656,21 @@ public final class ReliableChannel implements
AutoCloseable {
}
}
+ private void onObservableTimestampReceived(Long newTs) {
+ // Atomically update the observable timestamp to max(newTs, curTs).
+ while (true) {
+ long curTs = observableTimestamp.get();
+
+ if (curTs >= newTs) {
+ break;
+ }
+
+ if (observableTimestamp.compareAndSet(curTs, newTs)) {
+ break;
+ }
+ }
+ }
+
private void onTopologyAssignmentChanged(ClientChannel clientChannel) {
// NOTE: Multiple channels will send the same update to us, resulting
in multiple cache invalidations.
// This could be solved with a cluster-wide AssignmentVersion, but we
don't have that.
@@ -793,6 +815,7 @@ public final class ReliableChannel implements AutoCloseable
{
}
ch.addTopologyAssignmentChangeListener(ReliableChannel.this::onTopologyAssignmentChanged);
+
ch.addObservableTimestampListener(ReliableChannel.this::onObservableTimestampReceived);
ClusterNode newNode = ch.protocolContext().clusterNode();
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 61f72bbebb..e11d3dd4de 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -102,6 +102,9 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
/** Topology change listeners. */
private final Collection<Consumer<ClientChannel>>
assignmentChangeListeners = new CopyOnWriteArrayList<>();
+ /** Observable timestamp listeners. */
+ private final Collection<Consumer<Long>> observableTimestampListeners =
new CopyOnWriteArrayList<>();
+
/** Closed flag. */
private final AtomicBoolean closed = new AtomicBoolean();
@@ -402,6 +405,12 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
}
}
+ long observableTimestamp = unpacker.unpackLong();
+
+ for (Consumer<Long> listener : observableTimestampListeners) {
+ listener.accept(observableTimestamp);
+ }
+
if (unpacker.tryUnpackNil()) {
boolean completed = pendingReq.complete(unpacker);
@@ -489,6 +498,11 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
assignmentChangeListeners.add(listener);
}
+ @Override
+ public void addObservableTimestampListener(Consumer<Long> listener) {
+ observableTimestampListeners.add(listener);
+ }
+
private static void validateConfiguration(ClientChannelConfiguration cfg) {
String error = null;
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
index 970bdd4eae..6f309d293c 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
@@ -158,6 +158,8 @@ public class ClientSession implements Session {
w.out().packString(clientStatement.query());
w.out().packObjectArrayAsBinaryTuple(arguments);
+
+ w.out().packLong(ch.observableTimestamp());
};
PayloadReader<AsyncResultSet<T>> payloadReader = r -> new
ClientAsyncResultSet<>(r.clientChannel(), r.in(), mapper);
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
index b2102b4e8f..8944d353a5 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
@@ -64,7 +64,10 @@ public class ClientTransactions implements
IgniteTransactions {
return ch.serviceAsync(
ClientOp.TX_BEGIN,
- w -> w.out().packBoolean(readOnly),
+ w -> {
+ w.out().packBoolean(readOnly);
+ w.out().packLong(ch.observableTimestamp());
+ },
r -> readTx(r, readOnly));
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
index ae52552461..8b83a66c3f 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
@@ -223,7 +223,7 @@ public class ClientMetricsTest extends
BaseIgniteAbstractTest {
client.tables().tables();
assertEquals(21, metrics().bytesSent());
- assertEquals(55, metrics().bytesReceived());
+ assertEquals(64, metrics().bytesReceived());
}
@Test
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
new file mode 100644
index 0000000000..78b4c31293
--- /dev/null
+++
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.client.fakes.FakeIgnite;
+import org.apache.ignite.internal.TestHybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.tx.TransactionOptions;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests that observable timestamp (causality token) is propagated from server
to client and back.
+ */
+public class ObservableTimestampPropagationTest {
+ private static TestServer testServer;
+
+ private static FakeIgnite ignite;
+
+ private static IgniteClient client;
+
+ private static final AtomicLong currentServerTimestamp = new AtomicLong(1);
+
+ @BeforeAll
+ public static void startServer2() {
+ TestHybridClock clock = new
TestHybridClock(currentServerTimestamp::get);
+
+ ignite = new FakeIgnite("server-2");
+ testServer = new TestServer(0, ignite, null, null, "server-2",
UUID.randomUUID(), null, null, clock);
+
+ client = IgniteClient.builder().addresses("127.0.0.1:" +
testServer.port()).build();
+ }
+
+ @AfterAll
+ public static void stopServer2() throws Exception {
+ IgniteUtils.closeAll(client, testServer, ignite);
+ }
+
+ @Test
+ public void testClientPropagatesLatestKnownHybridTimestamp() {
+ assertNull(lastObservableTimestamp());
+
+ // RW TX does not propagate timestamp.
+ client.transactions().begin();
+ assertNull(lastObservableTimestamp());
+
+ // RO TX propagates timestamp.
+ client.transactions().begin(new TransactionOptions().readOnly(true));
+ assertEquals(1, lastObservableTimestamp());
+
+ // Increase timestamp on server - client does not know about it
initially.
+ currentServerTimestamp.set(11);
+ client.transactions().begin(new TransactionOptions().readOnly(true));
+ assertEquals(1, lastObservableTimestamp());
+
+ // Subsequent RO TX propagates latest known timestamp.
+ client.tables().tables();
+ client.transactions().begin(new TransactionOptions().readOnly(true));
+ assertEquals(11, lastObservableTimestamp());
+
+ // Smaller timestamp from server is ignored by client.
+ currentServerTimestamp.set(9);
+ client.transactions().begin(new TransactionOptions().readOnly(true));
+ client.transactions().begin(new TransactionOptions().readOnly(true));
+ assertEquals(11, lastObservableTimestamp());
+ }
+
+ private static @Nullable Long lastObservableTimestamp() {
+ HybridTimestamp ts = ignite.txManager().lastObservableTimestamp();
+
+ return ts == null ? null : ts.longValue() >> LOGICAL_TIME_BITS_SIZE;
+ }
+}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/RequestBalancingTest.java
b/modules/client/src/test/java/org/apache/ignite/client/RequestBalancingTest.java
index e28efff465..cbdcc47435 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/RequestBalancingTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/RequestBalancingTest.java
@@ -65,7 +65,7 @@ public class RequestBalancingTest extends
BaseIgniteAbstractTest {
assertTrue(IgniteTestUtils.waitForCondition(() ->
client.connections().size() == 3, 3000));
// Execute on unknown node to fall back to balancing.
- List<String> res = IntStream.range(0, 5)
+ List<Object> res = IntStream.range(0, 5)
.mapToObj(i ->
client.compute().<String>executeAsync(getClusterNodes("s123"), List.of(),
"job").join())
.collect(Collectors.toList());
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
index c91537c005..3c439c2242 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
@@ -40,11 +40,13 @@ import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.client.proto.ClientMessageDecoder;
import org.apache.ignite.internal.configuration.AuthenticationConfiguration;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.manager.IgniteComponent;
import
org.apache.ignite.internal.security.authentication.AuthenticationManager;
import
org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NettyBootstrapFactory;
@@ -78,6 +80,9 @@ public class TestClientHandlerModule implements
IgniteComponent {
/** Metrics. */
private final ClientHandlerMetricSource metrics;
+ /** Clock. */
+ private final HybridClock clock;
+
/** Netty channel. */
private volatile Channel channel;
@@ -99,6 +104,7 @@ public class TestClientHandlerModule implements
IgniteComponent {
* @param compute Compute.
* @param clusterId Cluster id.
* @param metrics Metrics.
+ * @param clock Clock.
*/
public TestClientHandlerModule(
Ignite ignite,
@@ -110,7 +116,8 @@ public class TestClientHandlerModule implements
IgniteComponent {
IgniteCompute compute,
UUID clusterId,
ClientHandlerMetricSource metrics,
- AuthenticationConfiguration authenticationConfiguration) {
+ AuthenticationConfiguration authenticationConfiguration,
+ HybridClock clock) {
assert ignite != null;
assert registry != null;
assert bootstrapFactory != null;
@@ -125,6 +132,7 @@ public class TestClientHandlerModule implements
IgniteComponent {
this.clusterId = clusterId;
this.metrics = metrics;
this.authenticationConfiguration = authenticationConfiguration;
+ this.clock = clock;
}
/** {@inheritDoc} */
@@ -184,7 +192,7 @@ public class TestClientHandlerModule implements
IgniteComponent {
new ResponseDelayHandler(responseDelay),
new ClientInboundMessageHandler(
(IgniteTablesInternal) ignite.tables(),
- ignite.transactions(),
+ (IgniteTransactionsImpl)
ignite.transactions(),
mock(QueryProcessor.class),
configuration,
compute,
@@ -192,7 +200,8 @@ public class TestClientHandlerModule implements
IgniteComponent {
ignite.sql(),
clusterId,
metrics,
-
authenticationManager(authenticationConfiguration)));
+
authenticationManager(authenticationConfiguration),
+ clock));
}
})
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
configuration.connectTimeout());
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index 1aba74f063..3be82094ea 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -45,12 +45,15 @@ import
org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
import
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
import
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import
org.apache.ignite.internal.security.authentication.AuthenticationManager;
import
org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl;
import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NettyBootstrapFactory;
@@ -94,6 +97,33 @@ public class TestServer implements AutoCloseable {
null,
UUID.randomUUID(),
null,
+ null,
+ null
+ );
+ }
+
+ /**
+ * Constructor.
+ */
+ public TestServer(
+ long idleTimeout,
+ Ignite ignite,
+ @Nullable Function<Integer, Boolean> shouldDropConnection,
+ @Nullable Function<Integer, Integer> responseDelay,
+ @Nullable String nodeName,
+ UUID clusterId,
+ @Nullable AuthenticationConfiguration authenticationConfiguration,
+ @Nullable Integer port
+ ) {
+ this(
+ idleTimeout,
+ ignite,
+ shouldDropConnection,
+ responseDelay,
+ nodeName,
+ clusterId,
+ authenticationConfiguration,
+ port,
null
);
}
@@ -112,7 +142,8 @@ public class TestServer implements AutoCloseable {
@Nullable String nodeName,
UUID clusterId,
@Nullable AuthenticationConfiguration authenticationConfiguration,
- @Nullable Integer port
+ @Nullable Integer port,
+ @Nullable HybridClock clock
) {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
@@ -156,6 +187,11 @@ public class TestServer implements AutoCloseable {
AuthenticationConfiguration authenticationConfigToApply =
authenticationConfiguration == null
? mock(AuthenticationConfiguration.class)
: authenticationConfiguration;
+
+ if (clock == null) {
+ clock = new HybridClockImpl();
+ }
+
module = shouldDropConnection != null
? new TestClientHandlerModule(
ignite,
@@ -167,11 +203,12 @@ public class TestServer implements AutoCloseable {
compute,
clusterId,
metrics,
- authenticationConfigToApply)
+ authenticationConfigToApply,
+ clock)
: new ClientHandlerModule(
((FakeIgnite) ignite).queryEngine(),
(IgniteTablesInternal) ignite.tables(),
- ignite.transactions(),
+ (IgniteTransactionsImpl) ignite.transactions(),
cfg,
compute,
clusterService,
@@ -181,7 +218,8 @@ public class TestServer implements AutoCloseable {
mock(MetricManager.class),
metrics,
authenticationManager(authenticationConfigToApply),
- authenticationConfigToApply
+ authenticationConfigToApply,
+ clock
);
module.start();
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
index cd692c8726..c9eb41018e 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
@@ -18,26 +18,17 @@
package org.apache.ignite.client.fakes;
import java.util.Collection;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
-import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.tx.TxState;
-import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
-import org.apache.ignite.tx.Transaction;
-import org.apache.ignite.tx.TransactionException;
-import org.apache.ignite.tx.TransactionOptions;
-import org.jetbrains.annotations.NotNull;
/**
* Fake Ignite.
@@ -47,6 +38,8 @@ public class FakeIgnite implements Ignite {
private final HybridClock clock = new HybridClockImpl();
+ private final FakeTxManager txMgr = new FakeTxManager(clock);
+
/**
* Default constructor.
*/
@@ -79,91 +72,7 @@ public class FakeIgnite implements Ignite {
/** {@inheritDoc} */
@Override
public IgniteTransactions transactions() {
- return new IgniteTransactions() {
- @Override
- public Transaction begin(TransactionOptions options) {
- return beginAsync(options).join();
- }
-
- @Override
- public CompletableFuture<Transaction>
beginAsync(TransactionOptions options) {
- return CompletableFuture.completedFuture(new
InternalTransaction() {
- private final UUID id = UUID.randomUUID();
-
- private final HybridTimestamp timestamp = clock.now();
-
- @Override
- public @NotNull UUID id() {
- return id;
- }
-
- @Override
- public IgniteBiTuple<ClusterNode, Long>
enlistedNodeAndTerm(TablePartitionId tablePartitionId) {
- return null;
- }
-
- @Override
- public TxState state() {
- return null;
- }
-
- @Override
- public boolean assignCommitPartition(TablePartitionId
tablePartitionId) {
- return false;
- }
-
- @Override
- public TablePartitionId commitPartition() {
- return null;
- }
-
- @Override
- public IgniteBiTuple<ClusterNode, Long> enlist(
- TablePartitionId tablePartitionId,
- IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
- return null;
- }
-
- @Override
- public void enlistResultFuture(CompletableFuture<?>
resultFuture) {}
-
- @Override
- public void commit() throws TransactionException {
-
- }
-
- @Override
- public CompletableFuture<Void> commitAsync() {
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public void rollback() throws TransactionException {
-
- }
-
- @Override
- public CompletableFuture<Void> rollbackAsync() {
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public boolean isReadOnly() {
- return false;
- }
-
- @Override
- public HybridTimestamp readTimestamp() {
- return null;
- }
-
- @Override
- public HybridTimestamp startTimestamp() {
- return timestamp;
- }
- });
- }
- };
+ return new IgniteTransactionsImpl(txMgr);
}
/** {@inheritDoc} */
@@ -201,4 +110,8 @@ public class FakeIgnite implements Ignite {
public String name() {
return name;
}
+
+ public FakeTxManager txManager() {
+ return txMgr;
+ }
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
new file mode 100644
index 0000000000..11f33056c3
--- /dev/null
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.fakes;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Fake transaction manager.
+ */
+public class FakeTxManager implements TxManager {
+ private final HybridClock clock;
+
+ private HybridTimestamp lastObservableTimestamp = null;
+
+ public FakeTxManager(HybridClock clock) {
+ this.clock = clock;
+ }
+
+ @Override
+ public void start() {
+ // No-op.
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // No-op.
+ }
+
+ @Override
+ public InternalTransaction begin() {
+ return begin(false, null);
+ }
+
+ @Override
+ public InternalTransaction begin(boolean readOnly, @Nullable
HybridTimestamp observableTimestamp) {
+ lastObservableTimestamp = observableTimestamp;
+
+ return new InternalTransaction() {
+ private final UUID id = UUID.randomUUID();
+
+ private final HybridTimestamp timestamp = clock.now();
+
+ @Override
+ public @NotNull UUID id() {
+ return id;
+ }
+
+ @Override
+ public IgniteBiTuple<ClusterNode, Long>
enlistedNodeAndTerm(TablePartitionId tablePartitionId) {
+ return null;
+ }
+
+ @Override
+ public TxState state() {
+ return null;
+ }
+
+ @Override
+ public boolean assignCommitPartition(TablePartitionId
tablePartitionId) {
+ return false;
+ }
+
+ @Override
+ public TablePartitionId commitPartition() {
+ return null;
+ }
+
+ @Override
+ public IgniteBiTuple<ClusterNode, Long> enlist(
+ TablePartitionId tablePartitionId,
+ IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
+ return null;
+ }
+
+ @Override
+ public void enlistResultFuture(CompletableFuture<?> resultFuture) {
+ }
+
+ @Override
+ public void commit() throws TransactionException {
+
+ }
+
+ @Override
+ public CompletableFuture<Void> commitAsync() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void rollback() throws TransactionException {
+
+ }
+
+ @Override
+ public CompletableFuture<Void> rollbackAsync() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public boolean isReadOnly() {
+ return false;
+ }
+
+ @Override
+ public HybridTimestamp readTimestamp() {
+ return observableTimestamp;
+ }
+
+ @Override
+ public HybridTimestamp startTimestamp() {
+ return timestamp;
+ }
+ };
+ }
+
+ @Override
+ public @Nullable TxState state(UUID txId) {
+ return null;
+ }
+
+ @Override
+ public void changeState(UUID txId, @Nullable TxState before, TxState
after) {
+
+ }
+
+ @Override
+ public LockManager lockManager() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> finish(TablePartitionId commitPartition,
ClusterNode recipientNode, Long term, boolean commit,
+ Map<ClusterNode, List<IgniteBiTuple<TablePartitionId, Long>>>
groups, UUID txId) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> cleanup(ClusterNode recipientNode,
List<IgniteBiTuple<TablePartitionId, Long>> tablePartitionIds,
+ UUID txId, boolean commit, @Nullable HybridTimestamp
commitTimestamp) {
+ return null;
+ }
+
+ @Override
+ public int finished() {
+ return 0;
+ }
+
+ @Override
+ public int pending() {
+ return 0;
+ }
+
+ @Override
+ public CompletableFuture<Void> updateLowWatermark(HybridTimestamp
newLowWatermark) {
+ return null;
+ }
+
+ public @Nullable HybridTimestamp lastObservableTimestamp() {
+ return lastObservableTimestamp;
+ }
+}
diff --git
a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
index 6a681d8e26..63041b71f2 100644
---
a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
@@ -204,11 +204,16 @@ public class RepeatedFinishClientTransactionTest extends
BaseIgniteAbstractTest
@Override
public void
addTopologyAssignmentChangeListener(Consumer<ClientChannel> listener) {
+ // No-op.
+ }
+ @Override
+ public void addObservableTimestampListener(Consumer<Long> listener) {
+ // No-op.
}
@Override
- public void close() throws Exception {
+ public void close() {
}
}
diff --git a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
index e90ad52ecb..1c9972cbd2 100644
--- a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
@@ -91,6 +91,9 @@ void node_connection::process_message(bytes_view msg) {
auto flags = reader.read_int32();
UNUSED_VALUE flags; // Flags are unused for now.
+ auto observable_timestamp = reader.read_int64();
+ UNUSED_VALUE observable_timestamp; // // TODO IGNITE-20057 C++ client:
Track observable timestamp
+
auto handler = get_and_remove_handler(req_id);
if (!handler) {
m_logger->log_error("Missing handler for request with id=" +
std::to_string(req_id));
diff --git a/modules/platforms/cpp/ignite/client/detail/sql/sql_impl.cpp
b/modules/platforms/cpp/ignite/client/detail/sql/sql_impl.cpp
index 39e4eff4c5..10a36d2257 100644
--- a/modules/platforms/cpp/ignite/client/detail/sql/sql_impl.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/sql/sql_impl.cpp
@@ -65,27 +65,29 @@ void sql_impl::execute_async(transaction *tx, const
sql_statement &statement, st
if (args.empty()) {
writer.write_nil();
- return;
- }
+ } else {
+ auto args_num = std::int32_t(args.size());
- auto args_num = std::int32_t(args.size());
+ writer.write(args_num);
- writer.write(args_num);
+ binary_tuple_builder args_builder{args_num * 3};
- binary_tuple_builder args_builder{args_num * 3};
+ args_builder.start();
+ for (const auto &arg : args) {
+ protocol::claim_primitive_with_type(args_builder, arg);
+ }
- args_builder.start();
- for (const auto &arg : args) {
- protocol::claim_primitive_with_type(args_builder, arg);
- }
+ args_builder.layout();
+ for (const auto &arg : args) {
+ protocol::append_primitive_with_type(args_builder, arg);
+ }
- args_builder.layout();
- for (const auto &arg : args) {
- protocol::append_primitive_with_type(args_builder, arg);
+ auto args_data = args_builder.build();
+ writer.write_binary(args_data);
}
- auto args_data = args_builder.build();
- writer.write_binary(args_data);
+ // TODO IGNITE-20057 C++ client: Track observable timestamp
+ writer.write(0); // observableTimestamp.
};
auto reader_func = [](std::shared_ptr<node_connection> channel, bytes_view
msg) -> result_set {
diff --git
a/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h
b/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h
index 1b1aa3c943..98f792367c 100644
--- a/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h
+++ b/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h
@@ -57,6 +57,9 @@ public:
IGNITE_API void begin_async(ignite_callback<transaction> callback) {
auto writer_func = [](protocol::writer &writer) {
writer.write_bool(false); // readOnly.
+
+ // TODO IGNITE-20057 C++ client: Track observable timestamp
+ writer.write(0); // observableTimestamp.
};
auto reader_func = [](protocol::reader &reader,
std::shared_ptr<node_connection> conn) mutable -> transaction {
diff --git a/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
b/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
index f59b116eba..9d039567c0 100644
--- a/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
+++ b/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
@@ -344,6 +344,9 @@ sql_result data_query::make_request_execute() {
writer.write(m_query);
m_params.write(writer);
+
+ // TODO IGNITE-20057 C++ client: Track observable timestamp
+ writer.write(0); // observableTimestamp.
});
m_connection.mark_transaction_non_empty();
diff --git a/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
b/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
index 81b14139d9..9b3130da7d 100644
--- a/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
+++ b/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
@@ -325,6 +325,9 @@ network::data_buffer_owning
sql_connection::receive_message(std::int64_t id, std
auto flags = reader.read_int32();
UNUSED_VALUE flags; // Flags are unused for now.
+ auto observable_timestamp = reader.read_int64();
+ UNUSED_VALUE observable_timestamp; // // TODO IGNITE-20057 C++ client:
Track observable timestamp
+
auto err = protocol::read_error(reader);
if (err) {
throw odbc_error(sql_state::SHY000_GENERAL_ERROR,
err.value().what_str());
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 96dadabeff..81c668e199 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -339,6 +339,7 @@ namespace Apache.Ignite.Tests
writer.Write(0); // Message type.
writer.Write(requestId);
writer.Write(PartitionAssignmentChanged ?
(int)ResponseFlags.PartitionAssignmentChanged : 0);
+ writer.Write(0); // Observable timestamp.
if (!isError)
{
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
index 5f0258b066..501fb2e362 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
@@ -97,7 +97,7 @@ public class MetricsTests
await client.Tables.GetTablesAsync();
Assert.AreEqual(17, _listener.GetMetric("bytes-sent"));
- Assert.AreEqual(72, _listener.GetMetric("bytes-received"));
+ Assert.AreEqual(73, _listener.GetMetric("bytes-received"));
}
[Test]
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorGroups.cs
b/modules/platforms/dotnet/Apache.Ignite/ErrorGroups.cs
index 10703c8d9f..8034286d48 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorGroups.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorGroups.cs
@@ -17,7 +17,6 @@
namespace Apache.Ignite
{
- using System;
using System.Diagnostics.CodeAnalysis;
/// <summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index f9319fc3d7..d61dd176e0 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -700,6 +700,9 @@ namespace Apache.Ignite.Internal
_assignmentChangeCallback(this);
}
+ // TODO IGNITE-20056 .NET: Thin 3.0: Track observable timestamp
+ _ = reader.ReadInt64();
+
var exception = ReadError(ref reader);
if (exception != null)
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
index ad8a47ec5b..4476eab85b 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
@@ -419,6 +419,11 @@ internal readonly ref struct MsgPackWriter
Write(col.Count);
+ if (col.Count == 0)
+ {
+ return;
+ }
+
using var builder = new BinaryTupleBuilder(col.Count * 3);
foreach (var obj in col)
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
index 4d93e33270..2d23d8b4d3 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
@@ -190,6 +190,9 @@ namespace Apache.Ignite.Internal.Sql
w.Write(statement.Query);
w.WriteObjectCollectionAsBinaryTuple(args);
+ // TODO IGNITE-20056 .NET: Thin 3.0: Track observable timestamp
+ w.Write(0);
+
return writer;
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
index 2e875a603d..31129c944c 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
@@ -43,7 +43,7 @@ namespace Apache.Ignite.Internal.Transactions
public async Task<ITransaction> BeginAsync(TransactionOptions options)
{
using var writer = ProtoCommon.GetMessageWriter();
- writer.MessageWriter.Write(options.ReadOnly);
+ Write();
// Transaction and all corresponding operations must be performed
using the same connection.
var (resBuf, socket) = await
_socket.DoOutInOpAndGetSocketAsync(ClientOp.TxBegin, request:
writer).ConfigureAwait(false);
@@ -54,6 +54,15 @@ namespace Apache.Ignite.Internal.Transactions
return new Transaction(txId, socket, _socket,
options.ReadOnly);
}
+
+ void Write()
+ {
+ var w = writer.MessageWriter;
+ w.Write(options.ReadOnly);
+
+ // TODO IGNITE-20056 .NET: Thin 3.0: Track observable timestamp
+ w.Write(0);
+ }
}
/// <inheritdoc />
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index de5e7e783a..27eaf4004f 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -605,7 +605,8 @@ public class IgniteImpl implements Ignite {
metricManager,
new ClientHandlerMetricSource(),
authenticationManager,
- authenticationConfiguration
+ authenticationConfiguration,
+ clock
);
restComponent = createRestComponent(name);
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 58dc4f93f0..074d54a197 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -55,7 +55,7 @@ public interface TxManager extends IgniteComponent {
* @throws IgniteInternalException with {@link
Transactions#TX_READ_ONLY_TOO_OLD_ERR} if transaction much older than the data
available
* in the tables.
*/
- InternalTransaction begin(boolean readOnly, HybridTimestamp
observableTimestamp);
+ InternalTransaction begin(boolean readOnly, @Nullable HybridTimestamp
observableTimestamp);
/**
* Returns a transaction state.
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
index a4b8ea297f..bb67f6232b 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.tx.impl;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.tx.Transaction;
@@ -39,25 +41,34 @@ public class IgniteTransactionsImpl implements
IgniteTransactions {
this.txManager = txManager;
}
- /** {@inheritDoc} */
- @Override
- public Transaction begin(@Nullable TransactionOptions options) {
+ /**
+ * Begins a transaction.
+ *
+ * @param options Transaction options.
+ * @param observableTimestamp Observable timestamp, applicable only for
read-only transactions. Read-only transactions
+ * can use some time to the past to avoid waiting for time that is
safe for reading on non-primary replica. To do so, client
+ * should provide this observable timestamp that is calculated
according to the commit time of the latest read-write transaction,
+ * to guarantee that read-only transaction will see the modified data.
+ * @return The started transaction.
+ */
+ public InternalTransaction begin(@Nullable TransactionOptions options,
@Nullable HybridTimestamp observableTimestamp) {
if (options != null && options.timeoutMillis() != 0) {
// TODO: IGNITE-15936.
throw new UnsupportedOperationException("Timeouts are not
supported yet");
}
- return txManager.begin(options != null && options.readOnly(), null);
+ return txManager.begin(options != null && options.readOnly(),
observableTimestamp);
}
/** {@inheritDoc} */
@Override
- public CompletableFuture<Transaction> beginAsync(@Nullable
TransactionOptions options) {
- if (options != null && options.timeoutMillis() != 0) {
- // TODO: IGNITE-15936.
- throw new UnsupportedOperationException("Timeouts are not
supported yet");
- }
+ public Transaction begin(@Nullable TransactionOptions options) {
+ return begin(options, null);
+ }
- return CompletableFuture.completedFuture(txManager.begin(options !=
null && options.readOnly(), null));
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Transaction> beginAsync(@Nullable
TransactionOptions options) {
+ return CompletableFuture.completedFuture(begin(options, null));
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index f60d5720dd..f1147dd08c 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -117,7 +117,7 @@ public class TxManagerImpl implements TxManager {
}
@Override
- public InternalTransaction begin(boolean readOnly, HybridTimestamp
observableTimestamp) {
+ public InternalTransaction begin(boolean readOnly, @Nullable
HybridTimestamp observableTimestamp) {
assert readOnly || observableTimestamp == null : "Observable timestamp
is applicable just for read-only transactions.";
HybridTimestamp beginTimestamp = clock.now();