This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7547d173484 IGNITE-22090 Piggyback tx start in first tx request
7547d173484 is described below
commit 7547d1734846b11e53c682e2b03ca5b4def808af
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Tue May 27 09:28:38 2025 +0300
IGNITE-22090 Piggyback tx start in first tx request
---
.../client/proto/ProtocolBitmaskFeature.java | 7 +-
.../internal/client/proto/tx/ClientTxUtils.java | 5 +-
.../ignite/client/handler/ItClientHandlerTest.java | 8 +-
.../ignite/client/handler/ClientHandlerModule.java | 3 +-
.../handler/ClientInboundMessageHandler.java | 29 ++++-
.../requests/sql/ClientSqlExecuteBatchRequest.java | 2 +-
.../requests/sql/ClientSqlExecuteRequest.java | 2 +-
.../sql/ClientSqlQueryMetadataRequest.java | 2 +-
.../handler/requests/table/ClientTableCommon.java | 101 ++++++++++----
.../table/ClientTupleContainsAllKeysRequest.java | 2 +-
.../table/ClientTupleContainsKeyRequest.java | 2 +-
.../table/ClientTupleDeleteAllExactRequest.java | 2 +-
.../table/ClientTupleDeleteAllRequest.java | 2 +-
.../table/ClientTupleDeleteExactRequest.java | 2 +-
.../requests/table/ClientTupleDeleteRequest.java | 2 +-
.../requests/table/ClientTupleGetAllRequest.java | 2 +-
.../table/ClientTupleGetAndDeleteRequest.java | 2 +-
.../table/ClientTupleGetAndReplaceRequest.java | 2 +-
.../table/ClientTupleGetAndUpsertRequest.java | 2 +-
.../requests/table/ClientTupleGetRequest.java | 2 +-
.../table/ClientTupleInsertAllRequest.java | 2 +-
.../requests/table/ClientTupleInsertRequest.java | 2 +-
.../table/ClientTupleReplaceExactRequest.java | 2 +-
.../requests/table/ClientTupleReplaceRequest.java | 2 +-
.../requests/table/ClientTupleRequestBase.java | 15 ++-
.../table/ClientTupleUpsertAllRequest.java | 2 +-
.../requests/table/ClientTupleUpsertRequest.java | 2 +-
.../requests/table/ClientTuplesRequestBase.java | 14 +-
.../requests/tx/ClientTransactionBeginRequest.java | 44 +------
.../tx/ClientTransactionCommitRequest.java | 12 +-
.../ignite/internal/client/ReliableChannel.java | 35 +++--
.../ignite/internal/client/TcpClientChannel.java | 3 +-
.../ignite/internal/client/WriteContext.java | 12 +-
.../internal/client/compute/ClientCompute.java | 2 -
.../client/compute/ClientJobExecution.java | 4 -
.../ignite/internal/client/sql/ClientSql.java | 4 +-
.../internal/client/table/ClientDataStreamer.java | 1 -
.../ignite/internal/client/table/ClientTable.java | 145 +++++++++++++++------
.../internal/client/tx/ClientLazyTransaction.java | 54 +++++---
.../internal/client/tx/ClientTransaction.java | 25 ++--
.../internal/client/tx/ClientTransactions.java | 47 ++++---
.../org/apache/ignite/client/ConnectionTest.java | 4 +-
.../ignite/client/FeatureCompatibilityTest.java | 95 ++++++++++++++
.../client/ObservableTimestampPropagationTest.java | 14 +-
.../org/apache/ignite/client/RetryPolicyTest.java | 4 +-
.../ignite/client/TestClientHandlerModule.java | 26 +++-
.../java/org/apache/ignite/client/TestServer.java | 10 +-
.../RepeatedFinishClientTransactionTest.java | 5 +-
.../app/client/ItThinClientTransactionsTest.java | 38 +++++-
.../ItThinClientTransactionsWithReplicasTest.java | 6 +-
50 files changed, 553 insertions(+), 259 deletions(-)
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
index 80df673426a..78c7109d885 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
@@ -57,7 +57,12 @@ public enum ProtocolBitmaskFeature {
/**
* Delayed ack optimization for directly mapped transactions.
*/
- TX_DELAYED_ACKS(6);
+ TX_DELAYED_ACKS(6),
+
+ /**
+ * Piggyback txn start in the first request.
+ */
+ TX_PIGGYBACK(7);
private static final EnumSet<ProtocolBitmaskFeature>
ALL_FEATURES_AS_ENUM_SET =
EnumSet.allOf(ProtocolBitmaskFeature.class);
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/tx/ClientTxUtils.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/tx/ClientTxUtils.java
index 86568ae61d6..151c361566e 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/tx/ClientTxUtils.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/tx/ClientTxUtils.java
@@ -21,6 +21,9 @@ package org.apache.ignite.internal.client.proto.tx;
* Utility class for client transactions.
*/
public class ClientTxUtils {
- /** Tx resource id for direct mapping. */
+ /** Tx resource id for directly mapped request. */
public static final long TX_ID_DIRECT = 0L;
+
+ /** Tx resource id for directly mapped first request. */
+ public static final long TX_ID_FIRST_DIRECT = -1L;
}
diff --git
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index bc1aa75bbac..f20fa71956c 100644
---
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -488,10 +488,13 @@ public class ItClientHandlerTest extends
BaseIgniteAbstractTest {
packer.packInt(2); // Client type: general purpose.
BitSet clientFeatures = new BitSet();
- // Supported feature
+ // Supported features
clientFeatures.set(1);
+ clientFeatures.set(2);
+ clientFeatures.set(6);
+ clientFeatures.set(7);
// Unsupported feature
- clientFeatures.set(3);
+ clientFeatures.set(4);
packer.packBinaryHeader(clientFeatures.toByteArray().length); //
Features.);
packer.writePayload(clientFeatures.toByteArray());
@@ -538,6 +541,7 @@ public class ItClientHandlerTest extends
BaseIgniteAbstractTest {
expected.set(3);
expected.set(5);
expected.set(6);
+ expected.set(7);
assertEquals(expected, supportedFeatures);
var extensionsLen = unpacker.unpackInt();
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 055c4c51176..d5b691d8b85 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -88,7 +88,8 @@ public class ClientHandlerModule implements IgniteComponent,
PlatformComputeTran
ProtocolBitmaskFeature.TX_DIRECT_MAPPING,
ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB,
ProtocolBitmaskFeature.STREAMER_RECEIVER_EXECUTION_OPTIONS,
- ProtocolBitmaskFeature.TX_DELAYED_ACKS
+ ProtocolBitmaskFeature.TX_DELAYED_ACKS,
+ ProtocolBitmaskFeature.TX_PIGGYBACK
));
/** Connection id generator.
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index ad4373ca7d5..d7cd60a0758 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -21,6 +21,7 @@ import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLA
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.STREAMER_RECEIVER_EXECUTION_OPTIONS;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.firstNotNull;
@@ -476,11 +477,26 @@ public class ClientInboundMessageHandler
BitSet clientFeatures,
ProtocolVersion clientVer,
int clientCode) {
- BitSet mutuallySupportedFeatures =
HandshakeUtils.supportedFeatures(features, clientFeatures);
+ // Disable direct mapping if not all required features are supported.
+ boolean supportsDirectMapping =
features.get(TX_DIRECT_MAPPING.featureId()) &&
clientFeatures.get(TX_DIRECT_MAPPING.featureId())
+ && features.get(TX_DELAYED_ACKS.featureId()) &&
clientFeatures.get(TX_DELAYED_ACKS.featureId())
+ && features.get(TX_PIGGYBACK.featureId()) &&
clientFeatures.get(TX_PIGGYBACK.featureId());
- clientContext = new ClientContext(clientVer, clientCode,
mutuallySupportedFeatures, user);
+ BitSet actualFeatures;
- sendHandshakeResponse(ctx, packer);
+ if (!supportsDirectMapping) {
+ actualFeatures = (BitSet) this.features.clone();
+
+ actualFeatures.clear(TX_DIRECT_MAPPING.featureId());
+ actualFeatures.clear(TX_DELAYED_ACKS.featureId());
+ actualFeatures.clear(TX_PIGGYBACK.featureId());
+ } else {
+ actualFeatures = this.features;
+ }
+
+ clientContext = new ClientContext(clientVer, clientCode,
HandshakeUtils.supportedFeatures(actualFeatures, clientFeatures), user);
+
+ sendHandshakeResponse(ctx, packer, actualFeatures);
}
private void handshakeError(ChannelHandlerContext ctx, ClientMessagePacker
packer, Throwable t) {
@@ -508,7 +524,7 @@ public class ClientInboundMessageHandler
metrics.sessionsRejectedIncrement();
}
- private void sendHandshakeResponse(ChannelHandlerContext ctx,
ClientMessagePacker packer) {
+ private void sendHandshakeResponse(ChannelHandlerContext ctx,
ClientMessagePacker packer, BitSet mutuallySupportedFeatures) {
ProtocolVersion.LATEST_VER.pack(packer);
packer.packNil(); // No error.
@@ -538,7 +554,7 @@ public class ClientInboundMessageHandler
packer.packByteNullable(IgniteProductVersion.CURRENT_VERSION.patch());
packer.packStringNullable(IgniteProductVersion.CURRENT_VERSION.preRelease());
- HandshakeUtils.packFeatures(packer, features);
+ HandshakeUtils.packFeatures(packer, mutuallySupportedFeatures);
HandshakeUtils.packExtensions(packer, extensions);
write(packer, ctx);
@@ -854,8 +870,7 @@ public class ClientInboundMessageHandler
return ClientJdbcPrimaryKeyMetadataRequest.process(in,
jdbcQueryEventHandler);
case ClientOp.TX_BEGIN:
- return ClientTransactionBeginRequest.process(in, txManager,
resources, metrics, igniteTables,
- clientContext.hasAllFeatures(TX_DIRECT_MAPPING,
TX_DELAYED_ACKS), tsTracker);
+ return ClientTransactionBeginRequest.process(in, txManager,
resources, metrics, tsTracker);
case ClientOp.TX_COMMIT:
return ClientTransactionCommitRequest.process(in, resources,
metrics, clockService, igniteTables,
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
index 085cdf1795d..2b7d560f339 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
@@ -63,7 +63,7 @@ public class ClientSqlExecuteBatchRequest {
CancelHandle cancelHandle = CancelHandle.create();
cancelHandleMap.put(requestId, cancelHandle);
- InternalTransaction tx = readTx(in, tsTracker, resources, null, null);
+ InternalTransaction tx = readTx(in, tsTracker, resources, null, null,
null);
ClientSqlProperties props = new ClientSqlProperties(in);
String statement = in.unpackString();
BatchedArguments arguments = readArgs(in);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index 169b88651e9..93f3afd96a6 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -81,7 +81,7 @@ public class ClientSqlExecuteRequest {
CancelHandle cancelHandle = CancelHandle.create();
cancelHandles.put(requestId, cancelHandle);
- InternalTransaction tx = readTx(in, tsUpdater, resources, null, null);
+ InternalTransaction tx = readTx(in, tsUpdater, resources, null, null,
null);
ClientSqlProperties props = new ClientSqlProperties(in);
String statement = in.unpackString();
Object[] arguments = readArgsNotNull(in);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
index 83f83c3e227..4437b70536b 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
@@ -51,7 +51,7 @@ public class ClientSqlQueryMetadataRequest {
ClientResourceRegistry resources,
HybridTimestampTracker tsTracker
) {
- var tx = readTx(in, tsTracker, resources, null, null);
+ var tx = readTx(in, tsTracker, resources, null, null, null);
String schema = in.unpackString();
String query = in.unpackString();
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index 7d31321be48..3291b83d25b 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client.handler.requests.table;
import static
org.apache.ignite.internal.client.proto.ClientMessageCommon.NO_VALUE;
import static
org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_DIRECT;
+import static
org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_FIRST_DIRECT;
import static org.apache.ignite.lang.ErrorGroups.Client.TABLE_ID_NOT_FOUND_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR;
@@ -26,6 +27,7 @@ import java.util.BitSet;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.ClientResource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.client.handler.NotificationSender;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
@@ -314,29 +316,61 @@ public class ClientTableCommon {
* Write tx metadata.
*
* @param out Packer.
+ * @param tsTracker Timestamp tracker.
* @param clockService Clock service.
- * @param tx The transaction.
+ * @param req Request.
*/
- public static void writeTxMeta(
- ClientMessagePacker out, HybridTimestampTracker tsTracker,
@Nullable ClockService clockService, InternalTransaction tx) {
- if (!tx.remote()) {
- return;
- }
+ static void writeTxMeta(
+ ClientMessagePacker out, HybridTimestampTracker tsTracker,
@Nullable ClockService clockService, ClientTupleRequestBase req) {
+ writeTxMeta(out, tsTracker, clockService, req.tx(), req.resourceId());
+ }
- TxState state = tx.state();
+ /**
+ * Write tx metadata.
+ *
+ * @param out Packer.
+ * @param tsTracker Timestamp tracker.
+ * @param clockService Clock service.
+ * @param req Request.
+ */
+ static void writeTxMeta(
+ ClientMessagePacker out, HybridTimestampTracker tsTracker,
@Nullable ClockService clockService, ClientTuplesRequestBase req) {
+ writeTxMeta(out, tsTracker, clockService, req.tx(), req.resourceId());
+ }
- if (state == TxState.ABORTED) {
- // No-op enlistment.
- out.packNil();
- } else {
- // Remote tx carries operation enlistment info.
- PendingTxPartitionEnlistment token = tx.enlistedPartition(null);
- out.packString(token.primaryNodeConsistentId());
- out.packLong(token.consistencyToken());
- }
+ /**
+ * Write tx metadata.
+ *
+ * @param out Packer.
+ * @param tsTracker Timestamp tracker.
+ * @param clockService Clock service.
+ * @param tx Transaction.
+ * @param resourceId Resource id.
+ */
+ public static void writeTxMeta(ClientMessagePacker out,
HybridTimestampTracker tsTracker, @Nullable ClockService clockService,
+ InternalTransaction tx, long resourceId) {
+ if (resourceId != 0) {
+ // Resource id is assigned on a first request in direct mode.
+ out.packLong(resourceId);
+ out.packUuid(tx.id());
+ out.packUuid(tx.coordinatorId());
+ out.packLong(tx.getTimeout());
+ } else if (tx.remote()) {
+ TxState state = tx.state();
+
+ if (state == TxState.ABORTED) {
+ // No-op enlistment.
+ out.packNil();
+ } else {
+ // Remote tx carries operation enlistment info.
+ PendingTxPartitionEnlistment token =
tx.enlistedPartition(null);
+ out.packString(token.primaryNodeConsistentId());
+ out.packLong(token.consistencyToken());
+ }
- if (clockService != null) {
- tsTracker.update(clockService.current());
+ if (clockService != null) {
+ tsTracker.update(clockService.current());
+ }
}
}
@@ -358,6 +392,7 @@ public class ClientTableCommon {
* @param resources Resource registry.
* @param txManager Tx manager.
* @param notificationSender Notification sender.
+ * @param resourceIdHolder Resource id holder.
* @return Transaction, if present, or null.
*/
public static @Nullable InternalTransaction readTx(
@@ -365,15 +400,34 @@ public class ClientTableCommon {
HybridTimestampTracker tsUpdater,
ClientResourceRegistry resources,
@Nullable TxManager txManager,
- @Nullable NotificationSender notificationSender
- ) {
+ @Nullable NotificationSender notificationSender,
+ long[] resourceIdHolder) {
if (in.tryUnpackNil()) {
return null;
}
try {
long id = in.unpackLong();
- if (id == TX_ID_DIRECT) {
+ if (id == TX_ID_FIRST_DIRECT) {
+ long observableTs = in.unpackLong();
+
+ // This is first mapping request, which piggybacks transaction
creation.
+ boolean readOnly = in.unpackBoolean();
+ long timeoutMillis = in.unpackLong();
+
+ InternalTxOptions txOptions = InternalTxOptions.builder()
+ .timeoutMillis(timeoutMillis)
+ .build();
+
+ var tx = startExplicitTx(tsUpdater, txManager,
HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly,
+ txOptions);
+
+ // Attach resource id only on first direct request.
+ resourceIdHolder[0] = resources.put(new ClientResource(tx,
tx::rollbackAsync));
+
+ return tx;
+ } else if (id == TX_ID_DIRECT) {
+ // This is direct request mapping.
long token = in.unpackLong();
UUID txId = in.unpackUuid();
int commitTableId = in.unpackInt();
@@ -416,8 +470,9 @@ public class ClientTableCommon {
ClientResourceRegistry resources,
TxManager txManager,
boolean readOnly,
- @Nullable NotificationSender notificationSender) {
- InternalTransaction tx = readTx(in, readTs, resources, txManager,
notificationSender);
+ @Nullable NotificationSender notificationSender,
+ long[] resourceIdHolder) {
+ InternalTransaction tx = readTx(in, readTs, resources, txManager,
notificationSender, resourceIdHolder);
if (tx == null) {
// Implicit transactions do not use an observation timestamp
because RW never depends on it, and implicit RO is always direct.
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java
index 74ddb89b931..89c37b08da7 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java
@@ -53,7 +53,7 @@ public class ClientTupleContainsAllKeysRequest {
return ClientTuplesRequestBase.readAsync(in, tables, resources,
txManager, false, null, tsTracker, true)
.thenCompose(req ->
req.table().recordView().containsAllAsync(req.tx(), req.tuples())
.thenApply(containsAll -> out -> {
- writeTxMeta(out, tsTracker, clockService,
req.tx());
+ writeTxMeta(out, tsTracker, clockService, req);
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
out.packBoolean(containsAll);
}));
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
index 55aaa42c8dd..e08a309c665 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
@@ -52,7 +52,7 @@ public class ClientTupleContainsKeyRequest {
return ClientTupleRequestBase.readAsync(in, tables, resources,
txManager, true, null, tsTracker, true)
.thenCompose(req ->
req.table().recordView().containsAsync(req.tx(), req.tuple())
.thenApply(res -> out -> {
- writeTxMeta(out, tsTracker, clockService,
req.tx());
+ writeTxMeta(out, tsTracker, clockService, req);
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
out.packBoolean(res);
}));
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
index 4d8607ebdd7..026983cfa18 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
@@ -55,7 +55,7 @@ public class ClientTupleDeleteAllExactRequest {
return ClientTuplesRequestBase.readAsync(in, tables, resources,
txManager, false, notificationSender, tsTracker, false)
.thenCompose(req ->
req.table().recordView().deleteAllExactAsync(req.tx(), req.tuples())
.thenApply(skippedTuples -> out -> {
- writeTxMeta(out, tsTracker, clockService,
req.tx());
+ writeTxMeta(out, tsTracker, clockService, req);
writeTuples(out, skippedTuples,
req.table().schemaView());
}));
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
index 15d604278b4..660f55c0a28 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
@@ -56,7 +56,7 @@ public class ClientTupleDeleteAllRequest {
return ClientTuplesRequestBase.readAsync(in, tables, resources,
txManager, false, notificationSender, tsTracker, true)
.thenCompose(req ->
req.table().recordView().deleteAllAsync(req.tx(), req.tuples())
.thenApply(skippedTuples -> out -> {
- writeTxMeta(out, tsTracker, clockService,
req.tx());
+ writeTxMeta(out, tsTracker, clockService, req);
writeTuples(out, skippedTuples, TuplePart.KEY,
req.table().schemaView());
}));
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
index abd586779a5..c1a51fe3e22 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
@@ -54,7 +54,7 @@ public class ClientTupleDeleteExactRequest {
return ClientTupleRequestBase.readAsync(in, tables, resources,
txManager, false, notificationSender, tsTracker, false)
.thenCompose(req ->
req.table().recordView().deleteExactAsync(req.tx(), req.tuple())
.thenApply(res -> out -> {
- writeTxMeta(out, tsTracker, clockService,
req.tx());
+ writeTxMeta(out, tsTracker, clockService, req);
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
out.packBoolean(res);
}));
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
index d537c446b12..3020caac20d 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
@@ -54,7 +54,7 @@ public class ClientTupleDeleteRequest {
return ClientTupleRequestBase.readAsync(in, tables, resources,
txManager, false, notificationSender, tsTracker, true)
.thenCompose(req ->
req.table().recordView().deleteAsync(req.tx(), req.tuple())
.thenApply(res -> out -> {
- writeTxMeta(out, tsTracker, clockService,
req.tx());
+ writeTxMeta(out, tsTracker, clockService, req);
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
out.packBoolean(res);
}));
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
index f1fa7a9119e..d9a4e90f52b 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
@@ -55,7 +55,7 @@ public class ClientTupleGetAllRequest {
return ClientTuplesRequestBase.readAsync(in, tables, resources,
txManager, false, null, tsTracker, true)
.thenCompose(req ->
req.table().recordView().getAllAsync(req.tx(), req.tuples())
.thenApply(resTuples -> out -> {
- writeTxMeta(out, tsTracker, clockService,
req.tx());
+ writeTxMeta(out, tsTracker, clockService, req);
writeTuplesNullable(out, resTuples,
TuplePart.KEY_AND_VAL, req.table().schemaView());
}));
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
index 6922fa171aa..d03003add7e 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
@@ -55,7 +55,7 @@ public class ClientTupleGetAndDeleteRequest {
return ClientTupleRequestBase.readAsync(in, tables, resources,
txManager, false, notificationSender, tsTracker, true)
.thenCompose(req ->
req.table().recordView().getAndDeleteAsync(req.tx(), req.tuple())
.thenApply(resTuple -> out -> {
- writeTxMeta(out, tsTracker, clockService,
req.tx());
+ writeTxMeta(out, tsTracker, clockService, req);
ClientTableCommon.writeTupleOrNil(out, resTuple,
TuplePart.KEY_AND_VAL, req.table().schemaView());
}));
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
index bf53fe42ddd..a6a27b32991 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
@@ -55,7 +55,7 @@ public class ClientTupleGetAndReplaceRequest {
return ClientTupleRequestBase.readAsync(in, tables, resources,
txManager, false, notificationSender, tsTracker, false)
.thenCompose(req ->
req.table().recordView().getAndReplaceAsync(req.tx(), req.tuple())
.thenApply(resTuple -> out -> {
- writeTxMeta(out, tsTracker, clockService,
req.tx());
+ writeTxMeta(out, tsTracker, clockService, req);
ClientTableCommon.writeTupleOrNil(out, resTuple,
TuplePart.KEY_AND_VAL, req.table().schemaView());
}));
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
index a5c0630d8fd..26aae656f79 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
@@ -55,7 +55,7 @@ public class ClientTupleGetAndUpsertRequest {
return ClientTupleRequestBase.readAsync(in, tables, resources,
txManager, false, notificationSender, tsTracker, false)
.thenCompose(req ->
req.table().recordView().getAndUpsertAsync(req.tx(), req.tuple())
.thenApply(resTuple -> out -> {
- writeTxMeta(out, tsTracker, clockService,
req.tx());
+ writeTxMeta(out, tsTracker, clockService, req);
ClientTableCommon.writeTupleOrNil(out, resTuple,
TuplePart.KEY_AND_VAL, req.table().schemaView());
}));
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
index 148811f1ca5..234e2964841 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
@@ -53,7 +53,7 @@ public class ClientTupleGetRequest {
return ClientTupleRequestBase.readAsync(in, tables, resources,
txManager, true, null, tsTracker, true)
.thenCompose(req ->
req.table().recordView().getAsync(req.tx(), req.tuple())
.thenApply(res -> out -> {
- writeTxMeta(out, tsTracker, clockService,
req.tx());
+ writeTxMeta(out, tsTracker, clockService, req);
ClientTableCommon.writeTupleOrNil(out, res,
TuplePart.KEY_AND_VAL, req.table().schemaView());
}));
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
index e918c2ecd1e..f1ca252dcc5 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
@@ -57,7 +57,7 @@ public class ClientTupleInsertAllRequest {
return ClientTuplesRequestBase.readAsync(in, tables, resources,
txManager, false, notificationSender, tsTracker, false)
.thenCompose(req ->
req.table().recordView().insertAllAsync(req.tx(), req.tuples())
.thenApply(skippedTuples -> out -> {
- writeTxMeta(out, tsTracker, clockService,
req.tx());
+ writeTxMeta(out, tsTracker, clockService, req);
writeTuples(out, skippedTuples,
req.table().schemaView());
}));
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
index f6bcf5f908f..4988f724e04 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
@@ -56,7 +56,7 @@ public class ClientTupleInsertRequest {
return ClientTupleRequestBase.readAsync(in, tables, resources,
txManager, false, notificationSender, tsTracker, false)
.thenCompose(req ->
req.table().recordView().insertAsync(req.tx(), req.tuple())
.thenApply(res -> out -> {
- writeTxMeta(out, tsTracker, clockService,
req.tx());
+ writeTxMeta(out, tsTracker, clockService, req);
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
out.packBoolean(res);
}));
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
index c3a72023b1f..240fac2f29c 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
@@ -54,7 +54,7 @@ public class ClientTupleReplaceExactRequest {
return ClientTupleRequestBase.readAsync(in, tables, resources,
txManager, false, notificationSender, tsTracker, false, true)
.thenCompose(req ->
req.table().recordView().replaceAsync(req.tx(), req.tuple(), req.tuple2())
.thenApply(res -> out -> {
- writeTxMeta(out, tsTracker, clockService,
req.tx());
+ writeTxMeta(out, tsTracker, clockService, req);
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
out.packBoolean(res);
}));
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
index f4fd263eaa9..49dc05f277d 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
@@ -54,7 +54,7 @@ public class ClientTupleReplaceRequest {
return ClientTupleRequestBase.readAsync(in, tables, resources,
txManager, false, notificationSender, tsTracker, false)
.thenCompose(req ->
req.table().recordView().replaceAsync(req.tx(), req.tuple())
.thenApply(res -> out -> {
- writeTxMeta(out, tsTracker, clockService,
req.tx());
+ writeTxMeta(out, tsTracker, clockService, req);
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
out.packBoolean(res);
}));
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
index 0d33dc026c5..6908613a81a 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
@@ -39,12 +39,15 @@ class ClientTupleRequestBase {
private final TableViewInternal table;
private final Tuple tuple;
private final @Nullable Tuple tuple2;
+ private final long resourceId;
- private ClientTupleRequestBase(@Nullable InternalTransaction tx,
TableViewInternal table, Tuple tuple, @Nullable Tuple tuple2) {
+ private ClientTupleRequestBase(@Nullable InternalTransaction tx,
TableViewInternal table, Tuple tuple, @Nullable Tuple tuple2,
+ long resourceId) {
this.tx = tx;
this.table = table;
this.tuple = tuple;
this.tuple2 = tuple2;
+ this.resourceId = resourceId;
}
public InternalTransaction tx() {
@@ -53,6 +56,10 @@ class ClientTupleRequestBase {
return tx;
}
+ public long resourceId() {
+ return resourceId;
+ }
+
public TableViewInternal table() {
return table;
}
@@ -95,9 +102,11 @@ class ClientTupleRequestBase {
int tableId = in.unpackInt();
+ long[] resIdHolder = {0};
+
InternalTransaction tx = txManager == null
? null
- : readOrStartImplicitTx(in, tsTracker, resources, txManager,
txReadOnly, notificationSender);
+ : readOrStartImplicitTx(in, tsTracker, resources, txManager,
txReadOnly, notificationSender, resIdHolder);
int schemaId = in.unpackInt();
@@ -113,7 +122,7 @@ class ClientTupleRequestBase {
var tuple = readTuple(noValueSet, tupleBytes,
keyOnly, schema);
var tuple2 = readSecondTuple ?
readTuple(noValueSet2, tupleBytes2, keyOnly, schema) : null;
- return new ClientTupleRequestBase(tx, table,
tuple, tuple2);
+ return new ClientTupleRequestBase(tx, table,
tuple, tuple2, resIdHolder[0]);
}));
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
index d60dd38a926..a11e4574be3 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
@@ -54,7 +54,7 @@ public class ClientTupleUpsertAllRequest {
return ClientTuplesRequestBase.readAsync(in, tables, resources,
txManager, false, notificationSender, tsTracker, false)
.thenCompose(req ->
req.table().recordView().upsertAllAsync(req.tx(), req.tuples())
.thenApply(v -> out -> {
- writeTxMeta(out, tsTracker, clockService,
req.tx());
+ writeTxMeta(out, tsTracker, clockService, req);
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
}));
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
index 1e785291c9c..38e94ccd4d0 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
@@ -56,7 +56,7 @@ public class ClientTupleUpsertRequest {
return ClientTupleRequestBase.readAsync(in, tables, resources,
txManager, false, notificationSender, tsTracker, false)
.thenCompose(req ->
req.table().recordView().upsertAsync(req.tx(), req.tuple())
.thenApply(v -> out -> {
- writeTxMeta(out, tsTracker, clockService,
req.tx());
+ writeTxMeta(out, tsTracker, clockService, req);
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
}));
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
index 88aae188726..c6dd6ec54d4 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
@@ -40,17 +40,23 @@ class ClientTuplesRequestBase {
private final InternalTransaction tx;
private final TableViewInternal table;
private final List<Tuple> tuples;
+ private final long resourceId;
- private ClientTuplesRequestBase(InternalTransaction tx, TableViewInternal
table, List<Tuple> tuples) {
+ private ClientTuplesRequestBase(InternalTransaction tx, TableViewInternal
table, List<Tuple> tuples, long resourceId) {
this.tx = tx;
this.table = table;
this.tuples = tuples;
+ this.resourceId = resourceId;
}
public InternalTransaction tx() {
return tx;
}
+ public long resourceId() {
+ return resourceId;
+ }
+
public TableViewInternal table() {
return table;
}
@@ -85,7 +91,9 @@ class ClientTuplesRequestBase {
) {
int tableId = in.unpackInt();
- InternalTransaction tx = readOrStartImplicitTx(in, tsTracker,
resources, txManager, txReadOnly, notificationSender);
+ long[] resIdHolder = {0};
+
+ InternalTransaction tx = readOrStartImplicitTx(in, tsTracker,
resources, txManager, txReadOnly, notificationSender, resIdHolder);
int schemaId = in.unpackInt();
@@ -108,7 +116,7 @@ class ClientTuplesRequestBase {
tuples.add(readTuple(noValueSet[i],
tupleBytes[i], keyOnly, schema));
}
- return new ClientTuplesRequestBase(tx, table,
tuples);
+ return new ClientTuplesRequestBase(tx, table,
tuples, resIdHolder[0]);
}));
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
index 5a33b3971c8..2e04f3f016e 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.client.handler.requests.tx;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.startExplicitTx;
-import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.tableIdNotFoundException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
@@ -29,11 +28,6 @@ import
org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
-import org.apache.ignite.internal.lang.IgniteSystemProperties;
-import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.replicator.ZonePartitionId;
-import org.apache.ignite.internal.table.IgniteTablesInternal;
-import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.tx.InternalTxOptions;
import org.apache.ignite.internal.tx.TxManager;
@@ -41,8 +35,6 @@ import org.apache.ignite.internal.tx.TxManager;
* Client transaction begin request.
*/
public class ClientTransactionBeginRequest {
- private static final boolean enabledColocation =
IgniteSystemProperties.enabledColocation();
-
/**
* Processes the request.
*
@@ -50,8 +42,6 @@ public class ClientTransactionBeginRequest {
* @param txManager Transactions.
* @param resources Resources.
* @param metrics Metrics.
- * @param igniteTables Tables facade.
- * @param enableDirectMapping Direct mapping feature.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -59,26 +49,15 @@ public class ClientTransactionBeginRequest {
TxManager txManager,
ClientResourceRegistry resources,
ClientHandlerMetricSource metrics,
- IgniteTablesInternal igniteTables,
- boolean enableDirectMapping,
HybridTimestampTracker tsTracker
) throws IgniteInternalCheckedException {
boolean readOnly = in.unpackBoolean();
long timeoutMillis = in.unpackLong();
- long observable = in.unpackLong();
HybridTimestamp observableTs = null;
- int tableId = -1;
- int partition = -1;
-
if (readOnly) {
// Timestamp makes sense only for read-only transactions.
- observableTs = HybridTimestamp.nullableHybridTimestamp(observable);
- } else if (enableDirectMapping) {
- // Read commit partition info.
- // It may be not available if client has not yet loaded partition
map.
- tableId = in.unpackInt();
- partition = in.unpackInt();
+ observableTs =
HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
}
InternalTxOptions txOptions = InternalTxOptions.builder()
@@ -87,22 +66,6 @@ public class ClientTransactionBeginRequest {
var tx = startExplicitTx(tsTracker, txManager, observableTs, readOnly,
txOptions);
- // Assign commit partition at the beginning to avoid races on a client.
- if (tableId != -1) {
- if (enabledColocation) {
- TableViewInternal tableView =
igniteTables.cachedTable(tableId);
-
- if (tableView == null) {
- tx.rollback();
- throw tableIdNotFoundException(tableId);
- }
-
- tx.assignCommitPartition(new
ZonePartitionId(tableView.zoneId(), partition));
- } else {
- tx.assignCommitPartition(new TablePartitionId(tableId,
partition));
- }
- }
-
if (readOnly) {
// Propagate assigned read timestamp to client to enforce
serializability on subsequent reads from another node.
tsTracker.update(tx.readTimestamp());
@@ -114,11 +77,6 @@ public class ClientTransactionBeginRequest {
return CompletableFuture.completedFuture(out -> {
out.packLong(resourceId);
-
- if (enableDirectMapping && !readOnly) {
- out.packUuid(tx.id());
- out.packUuid(tx.coordinatorId());
- }
});
} catch (IgniteInternalCheckedException e) {
tx.rollback();
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
index 15e9f0bd37a..114e1f8efcb 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
@@ -70,11 +70,6 @@ public class ClientTransactionCommitRequest {
// Attempt to merge server and client mappings.
if (enableDirectMapping && !tx.isReadOnly()) {
- long causality = in.unpackLong();
-
- // Update causality.
-
clockService.updateClock(HybridTimestamp.hybridTimestamp(causality));
-
int cnt = in.unpackInt(); // Number of direct enlistments.
List<IgniteTuple3<TablePartitionId, String, Long>> list = new
ArrayList<>();
@@ -87,6 +82,13 @@ public class ClientTransactionCommitRequest {
list.add(new IgniteTuple3<>(new TablePartitionId(tableId,
partId), consistentId, token));
}
+ if (cnt > 0) {
+ long causality = in.unpackLong();
+
+ // Update causality.
+
clockService.updateClock(HybridTimestamp.hybridTimestamp(causality));
+ }
+
Exception ex = null;
for (IgniteTuple3<TablePartitionId, String, Long> enlistment :
list) {
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 3b812b2bb87..f3229847db6 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;
import org.apache.ignite.client.ClientOperationType;
@@ -246,13 +247,12 @@ public final class ReliableChannel implements
AutoCloseable {
/**
* Sends request and handles response asynchronously.
*
+ * @param <T> response type.
* @param opCode Operation code.
* @param payloadWriter Payload writer.
* @param payloadReader Payload reader.
- * @param <T> response type.
* @param preferredNodeName Unique name (consistent id) of the preferred
target node. When a connection to the specified node
* exists, it will be used to handle the request; otherwise,
default connection will be used.
- * @param fallbackNodeName Fallback node name.
* @param retryPolicyOverride Retry policy override.
* @return Future for the operation.
*/
@@ -261,12 +261,11 @@ public final class ReliableChannel implements
AutoCloseable {
@Nullable PayloadWriter payloadWriter,
@Nullable PayloadReader<T> payloadReader,
@Nullable String preferredNodeName,
- @Nullable String fallbackNodeName,
@Nullable RetryPolicy retryPolicyOverride,
boolean expectNotifications
) {
return ClientFutureUtils.doWithRetryAsync(
- () -> getChannelAsync(preferredNodeName, fallbackNodeName)
+ () -> getChannelAsync(preferredNodeName)
.thenCompose(ch -> serviceAsyncInternal(opCode,
payloadWriter, payloadReader, expectNotifications, ch)),
null,
ctx -> shouldRetry(opCode, ctx, retryPolicyOverride));
@@ -279,9 +278,7 @@ public final class ReliableChannel implements AutoCloseable
{
* @param payloadWriter Payload writer.
* @param payloadReader Payload reader.
* @param <T> response type.
- * @param preferredNodeName Unique name (consistent id) of the preferred
target node. When a connection to the specified node
- * exists, it will be used to handle the request; otherwise,
default connection will be used.
- * @param fallbackNodeName Fallback node name to connect if a preferred
node connection is not available.
+ * @param channelResolver Channel resolver.
* @param retryPolicyOverride Retry policy override.
* @return Future for the operation.
*/
@@ -290,13 +287,12 @@ public final class ReliableChannel implements
AutoCloseable {
Function<ClientChannel, CompletableFuture<Void>> channelReadyCb,
@Nullable PayloadWriter payloadWriter,
@Nullable PayloadReader<T> payloadReader,
- @Nullable String preferredNodeName,
- @Nullable String fallbackNodeName,
+ Supplier<CompletableFuture<ClientChannel>> channelResolver,
@Nullable RetryPolicy retryPolicyOverride,
boolean expectNotifications
) {
return ClientFutureUtils.doWithRetryAsync(
- () -> getChannelAsync(preferredNodeName, fallbackNodeName)
+ () -> channelResolver.get()
.thenCompose(ch ->
channelReadyCb.apply(ch).thenApply(ignored -> ch))
.thenCompose(ch -> serviceAsyncInternal(opCode,
payloadWriter, payloadReader, expectNotifications, ch)),
null,
@@ -320,7 +316,7 @@ public final class ReliableChannel implements AutoCloseable
{
@Nullable PayloadReader<T> payloadReader
) {
return ClientFutureUtils.doWithRetryAsync(
- () -> getChannelAsync(null, null)
+ () -> getChannelAsync(null)
.thenCompose(ch -> {
int opCode = opCodeFunc.applyAsInt(ch);
return serviceAsyncInternal(opCode, payloadWriter,
payloadReader, false, ch);
@@ -343,7 +339,7 @@ public final class ReliableChannel implements AutoCloseable
{
PayloadWriter payloadWriter,
@Nullable PayloadReader<T> payloadReader
) {
- return serviceAsync(opCode, payloadWriter, payloadReader, null, null,
null, false);
+ return serviceAsync(opCode, payloadWriter, payloadReader, null, null,
false);
}
/**
@@ -355,7 +351,7 @@ public final class ReliableChannel implements AutoCloseable
{
* @return Future for the operation.
*/
public <T> CompletableFuture<T> serviceAsync(int opCode, PayloadReader<T>
payloadReader) {
- return serviceAsync(opCode, null, payloadReader, null, null, null,
false);
+ return serviceAsync(opCode, null, payloadReader, null, null, false);
}
private <T> CompletableFuture<T> serviceAsyncInternal(
@@ -371,15 +367,18 @@ public final class ReliableChannel implements
AutoCloseable {
});
}
- private CompletableFuture<ClientChannel> getChannelAsync(@Nullable String
preferredNodeName, @Nullable String fallbackNodeName) {
+ /**
+ * Get the channel.
+ *
+ * @param preferredNodeName Preferred node name.
+ *
+ * @return The future.
+ */
+ public CompletableFuture<ClientChannel> getChannelAsync(@Nullable String
preferredNodeName) {
// 1. Preferred node connection.
if (preferredNodeName != null) {
ClientChannelHolder holder =
nodeChannelsByName.get(preferredNodeName);
- if (fallbackNodeName != null && holder == null) {
- holder = nodeChannelsByName.get(fallbackNodeName);
- }
-
if (holder != null) {
return holder.getOrCreateChannelAsync().thenCompose(ch -> {
if (ch != null) {
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 6baf36f4a7e..e1c314c3376 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -88,7 +88,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
ProtocolBitmaskFeature.TABLE_GET_REQS_USE_QUALIFIED_NAME,
ProtocolBitmaskFeature.TX_DIRECT_MAPPING,
ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB,
- ProtocolBitmaskFeature.TX_DELAYED_ACKS
+ ProtocolBitmaskFeature.TX_DELAYED_ACKS,
+ ProtocolBitmaskFeature.TX_PIGGYBACK
));
/** Minimum supported heartbeat interval. */
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/WriteContext.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/WriteContext.java
index 0683a148a93..3564f3a2b0e 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/WriteContext.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/WriteContext.java
@@ -15,9 +15,11 @@
* limitations under the License.
*/
-
package org.apache.ignite.internal.client;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.client.tx.ClientTransaction;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.jetbrains.annotations.Nullable;
/**
@@ -26,4 +28,12 @@ import org.jetbrains.annotations.Nullable;
public class WriteContext {
public @Nullable PartitionMapping pm;
public @Nullable Long enlistmentToken;
+ public CompletableFuture<ClientTransaction> firstReqFut;
+ public final HybridTimestampTracker tracker;
+ public boolean readOnly;
+ public @Nullable ClientChannel channel;
+
+ public WriteContext(HybridTimestampTracker tracker) {
+ this.tracker = tracker;
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index 0602694b6ff..6f7057e7c2c 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -319,7 +319,6 @@ public class ClientCompute implements IgniteCompute {
ClientCompute::unpackSubmitTaskResult,
null,
null,
- null,
true
);
}
@@ -336,7 +335,6 @@ public class ClientCompute implements IgniteCompute {
ClientCompute::unpackSubmitResult,
node.name(),
null,
- null,
true
);
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
index efcc8ba5560..c78e3f31796 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
@@ -121,7 +121,6 @@ class ClientJobExecution<R> implements JobExecution<R> {
ClientJobExecution::unpackJobState,
null,
null,
- null,
false
);
}
@@ -135,7 +134,6 @@ class ClientJobExecution<R> implements JobExecution<R> {
ClientJobExecution::unpackTaskState,
null,
null,
- null,
false
);
}
@@ -149,7 +147,6 @@ class ClientJobExecution<R> implements JobExecution<R> {
ClientJobExecution::unpackBooleanResult,
null,
null,
- null,
false
);
}
@@ -166,7 +163,6 @@ class ClientJobExecution<R> implements JobExecution<R> {
ClientJobExecution::unpackBooleanResult,
null,
null,
- null,
false
);
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index 14aeb37aade..3af0b29e9be 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -283,7 +283,7 @@ public class ClientSql implements IgniteSql {
if (transaction != null) {
try {
//noinspection resource
- return ClientLazyTransaction.ensureStarted(transaction, ch,
null)
+ return ClientLazyTransaction.ensureStarted(transaction,
ch).get1()
.thenCompose(tx ->
tx.channel().serviceAsync(ClientOp.SQL_EXEC, payloadWriter, payloadReader))
.exceptionally(ClientSql::handleException);
} catch (TransactionException e) {
@@ -350,7 +350,7 @@ public class ClientSql implements IgniteSql {
if (transaction != null) {
try {
//noinspection resource
- return ClientLazyTransaction.ensureStarted(transaction, ch,
null)
+ return ClientLazyTransaction.ensureStarted(transaction,
ch).get1()
.thenCompose(tx ->
tx.channel().serviceAsync(ClientOp.SQL_EXEC_BATCH, payloadWriter,
payloadReader))
.exceptionally(ClientSql::handleException);
} catch (TransactionException e) {
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
index 251701cb44b..a4300b7f014 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientDataStreamer.java
@@ -104,7 +104,6 @@ class ClientDataStreamer {
?
StreamerReceiverSerializer.deserializeReceiverResultsOnClient(in.in())
: null,
partitionAssignment.get(partitionId),
- null,
new
RetryLimitPolicy().retryLimit(options.retryLimit()),
false)
);
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index e81bf06acc7..cb0ecb7a10c 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -21,7 +21,9 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.function.Function.identity;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
import static
org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_DIRECT;
+import static
org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_FIRST_DIRECT;
import static
org.apache.ignite.internal.client.tx.ClientLazyTransaction.ensureStarted;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.matchAny;
@@ -35,12 +37,12 @@ import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHE
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Function;
-import java.util.function.Supplier;
import org.apache.ignite.client.RetryPolicy;
import org.apache.ignite.internal.client.ClientChannel;
import org.apache.ignite.internal.client.ClientSchemaVersionMismatchException;
@@ -56,6 +58,7 @@ import
org.apache.ignite.internal.client.proto.ColumnTypeConverter;
import org.apache.ignite.internal.client.sql.ClientSql;
import org.apache.ignite.internal.client.table.api.PublicApiClientKeyValueView;
import org.apache.ignite.internal.client.table.api.PublicApiClientRecordView;
+import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
import org.apache.ignite.internal.client.tx.ClientTransaction;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -311,17 +314,26 @@ public class ClientTable implements Table {
if (tx == null) {
out.out().packNil();
} else {
- ClientTransaction tx0 = ClientTransaction.get(tx);
-
- if (ctx != null && ctx.enlistmentToken != null) {
- out.out().packLong(TX_ID_DIRECT); // For direct enlistment,
pass 0 for resourceId to distinguish with proxy mode.
- out.out().packLong(ctx.enlistmentToken);
- out.out().packUuid(tx0.txId());
- out.out().packInt(tx0.commitTableId());
- out.out().packInt(tx0.commitPartition());
- out.out().packUuid(tx0.coordinatorId());
- out.out().packLong(tx0.timeout());
+ if (ctx != null && (ctx.enlistmentToken != null || ctx.firstReqFut
!= null)) {
+ if (ctx.firstReqFut != null) {
+ ClientLazyTransaction tx0 = (ClientLazyTransaction) tx;
+ out.out().packLong(TX_ID_FIRST_DIRECT);
+ out.out().packLong(ctx.tracker.get().longValue());
+ out.out().packBoolean(tx.isReadOnly());
+ out.out().packLong(tx0.timeout());
+ } else {
+ ClientTransaction tx0 = ClientTransaction.get(tx);
+ out.out().packLong(TX_ID_DIRECT);
+ out.out().packLong(ctx.enlistmentToken);
+ out.out().packUuid(tx0.txId());
+ out.out().packInt(tx0.commitTableId());
+ out.out().packInt(tx0.commitPartition());
+ out.out().packUuid(tx0.coordinatorId());
+ out.out().packLong(tx0.timeout());
+ }
} else {
+ ClientTransaction tx0 = ClientTransaction.get(tx);
+
//noinspection resource
if (tx0.channel() != out.clientChannel()) {
// Do not throw IgniteClientConnectionException to avoid
retry kicking in.
@@ -488,24 +500,64 @@ public class ClientTable implements Table {
.thenCompose(v -> {
ClientSchema schema = schemaFut.getNow(null);
- Supplier<PartitionMapping> sup =
- () -> getPreferredNodeName(tableId(), provider,
partitionsFut.getNow(null), schema, true);
- return ensureStarted(tx, ch, sup).thenCompose(tx0 -> {
- // Force coordinator mode for implicit transactions.
- @Nullable PartitionMapping forOp =
- getPreferredNodeName(tableId(), provider,
partitionsFut.getNow(null), schema, tx0 == null);
-
- WriteContext ctx = new WriteContext();
- // Force proxy mode for requests colocated with
coordinator to reduce passed enlistment info on commit.
- ctx.pm = tx0 != null && forOp != null &&
forOp.nodeConsistentId().equals(tx0.nodeName()) ? null : forOp;
+ // If a partition mapping is known apriori, a request for
explicit RW txn will be attempted in direct mode.
+ // Direct mode is only possible if:
+ // * a client's connection exists to a corresponding node.
+ // * a transaction has commit partition
+ @Nullable PartitionMapping pm =
+ getPreferredNodeName(tableId(), provider,
partitionsFut.getNow(null), schema, tx == null || tx.isReadOnly());
+
+ // Write context carries request execution details over
async chain.
+ WriteContext ctx = new
WriteContext(ch.observableTimestamp());
+
+ CompletableFuture<ClientTransaction> txStartFut;
+
+ if (tx == null) {
+ txStartFut = nullCompletedFuture();
+ } else {
+ if (pm == null) {
+ txStartFut = ensureStarted(tx, ch, () ->
ch.getChannelAsync(null)).get1();
+ } else {
+ txStartFut =
ch.getChannelAsync(pm.nodeConsistentId()).thenCompose(ch0 -> {
+ // Enough to check only TX_PIGGYBACK flag -
other tx flags are set if this flag is set.
+ boolean supports =
ch0.protocolContext().isFeatureSupported(TX_PIGGYBACK)
+ &&
ch0.protocolContext().clusterNode().name().equals(pm.nodeConsistentId());
+
+ assert
ch0.protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING, TX_DELAYED_ACKS);
+
+
IgniteBiTuple<CompletableFuture<ClientTransaction>, Boolean> tuple =
ensureStarted(tx, ch,
+ supports ? null : () ->
completedFuture(ch0));
+
+ // If this is the first direct request in
transaction, it will also piggyback a transaction start.
+ if (tuple.get2()) {
+ ctx.pm = pm;
+ ctx.readOnly = tx.isReadOnly();
+ ctx.channel = ch0;
+ ctx.firstReqFut = tuple.get1();
+ return nullCompletedFuture();
+ } else {
+ return tuple.get1();
+ }
+ });
+ }
+ }
+ return txStartFut.thenCompose(tx0 -> {
return ch.serviceAsync(opCode,
- (opCh) -> useDirectMapping(tx0, ctx,
opCh) ? enlistDirect(tx0, ch, opCh, ctx, opCode)
- : nullCompletedFuture(),
+ (opCh) -> {
+ if (tx0 != null &&
tx0.hasCommitPartition()
+ // If a request is
colocated with a coordinator, it's executed in proxy mode.
+ &&
!tx0.nodeName().equals(opCh.protocolContext().clusterNode().name())) {
+ ctx.pm = pm;
+ return enlistDirect(tx0, ch,
opCh, ctx, opCode);
+ } else {
+ return nullCompletedFuture();
+ }
+ },
w -> writer.accept(schema, w, ctx),
r -> readSchemaAndReadData(schema, r,
reader, defaultValue, responseSchemaRequired, ctx, tx0),
- resolvePreferredNode(tx0, ctx.pm),
- tx0 == null ? null : tx0.nodeName(),
+ () -> ctx.firstReqFut != null ?
completedFuture(ctx.channel)
+ :
ch.getChannelAsync(resolvePreferredNode(tx0, pm)),
retryPolicyOverride,
expectNotifications)
// Read resulting schema and the rest of the
response.
@@ -596,13 +648,10 @@ public class ClientTable implements Table {
}
}
- private static boolean useDirectMapping(@Nullable ClientTransaction tx0,
WriteContext ctx, ClientChannel opChannel) {
- // Fulfilling this condition forces proxy mode.
- boolean proxy = tx0 == null || tx0.isReadOnly() || ctx.pm == null ||
!opChannel.protocolContext()
- .allFeaturesSupported(TX_DIRECT_MAPPING, TX_DELAYED_ACKS);
-
- return !proxy && ctx.pm != null &&
ctx.pm.nodeConsistentId().equals(opChannel.protocolContext().clusterNode().name())
- && tx0.hasCommitPartition();
+ private static boolean isProxy(@Nullable Transaction tx, @Nullable
PartitionMapping pm, ClientChannel opChannel) {
+ return tx == null || tx.isReadOnly() || pm == null
+ ||
!opChannel.protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING,
TX_DELAYED_ACKS, TX_PIGGYBACK)
+ ||
!pm.nodeConsistentId().equals(opChannel.protocolContext().clusterNode().name());
}
private static CompletableFuture<Void> enlistDirect(
@@ -631,20 +680,32 @@ public class ClientTable implements Table {
@Nullable T defaultValue,
boolean responseSchemaRequired,
WriteContext ctx,
- @Nullable ClientTransaction tx
+ @Nullable ClientTransaction tx0
) {
- // Use enlistment meta only for remote transactions.
- if (ctx.enlistmentToken != null) {
- assert tx != null;
+ ClientMessageUnpacker in1 = in.in();
+ if (ctx.firstReqFut != null) {
+ assert tx0 == null;
+
+ long id = in1.unpackLong();
+ UUID txId = in1.unpackUuid();
+ UUID coordId = in1.unpackUuid();
+ long timeout = in1.unpackLong();
+
+ ClientTransaction tx =
+ new ClientTransaction(in.clientChannel(), id,
ctx.readOnly, txId, ctx.pm, coordId, ch.observableTimestamp(), timeout);
+
+ ctx.firstReqFut.complete(tx);
+ } else if (ctx.enlistmentToken != null) { // Use enlistment meta only
for remote transactions.
+ assert tx0 != null;
assert ctx.pm != null;
if (in.in().tryUnpackNil()) {
// No-op.
- in.clientChannel().inflights().removeInflight(tx.txId(), null);
+ in.clientChannel().inflights().removeInflight(tx0.txId(),
null);
// Finish enlist on first request only.
if (ctx.enlistmentToken == 0) {
- tx.tryFinishEnlist(ctx.pm, null, 0, true);
+ tx0.tryFinishEnlist(ctx.pm, null, 0, true);
}
} else {
String consistentId = in.in().unpackString();
@@ -652,12 +713,12 @@ public class ClientTable implements Table {
// Finish enlist on first request only.
if (ctx.enlistmentToken == 0) {
- tx.tryFinishEnlist(ctx.pm, consistentId, token, false);
+ tx0.tryFinishEnlist(ctx.pm, consistentId, token, false);
}
}
}
- int schemaVer = in.in().unpackInt();
+ int schemaVer = in1.unpackInt();
if (!responseSchemaRequired) {
ensureSchemaLoadedAsync(schemaVer);
@@ -665,7 +726,7 @@ public class ClientTable implements Table {
return fn.apply(null, in);
}
- if (in.in().tryUnpackNil()) {
+ if (in1.tryUnpackNil()) {
ensureSchemaLoadedAsync(schemaVer);
return defaultValue;
@@ -679,7 +740,7 @@ public class ClientTable implements Table {
// Schema is not yet known - request.
// Retain unpacker - normally it is closed when this method exits.
- in.in().retain();
+ in1.retain();
return new IgniteBiTuple<>(in, schemaVer);
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
index 9ef1fcfecd8..b905b61ef96 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
@@ -17,13 +17,14 @@
package org.apache.ignite.internal.client.tx;
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static
org.apache.ignite.internal.client.tx.ClientTransactions.USE_CONFIGURED_TIMEOUT_DEFAULT;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
-import org.apache.ignite.internal.client.PartitionMapping;
+import org.apache.ignite.internal.client.ClientChannel;
import org.apache.ignite.internal.client.ReliableChannel;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
import org.apache.ignite.tx.TransactionOptions;
@@ -97,6 +98,10 @@ public class ClientLazyTransaction implements Transaction {
return options != null && options.readOnly();
}
+ public long timeout() {
+ return options == null ? USE_CONFIGURED_TIMEOUT_DEFAULT :
options.timeoutMillis();
+ }
+
/**
* Gets the node name of the node where the transaction is started. If not
started yet, returns {@code null}.
*
@@ -135,41 +140,52 @@ public class ClientLazyTransaction implements Transaction
{
*
* @param tx Transaction.
* @param ch Channel.
- * @param sup Partition mapping supplier.
- * @return Future that will be completed when the transaction is started.
+ *
+ * @return Future that will be completed when the transaction is started
and first request flag.
*/
- public static CompletableFuture<ClientTransaction> ensureStarted(
- @Nullable Transaction tx,
- ReliableChannel ch,
- @Nullable Supplier<PartitionMapping> sup
+ public static IgniteBiTuple<CompletableFuture<ClientTransaction>, Boolean>
ensureStarted(
+ Transaction tx,
+ ReliableChannel ch
) {
- if (tx == null) {
- return nullCompletedFuture();
- }
+ return ensureStarted(tx, ch, () -> ch.getChannelAsync(null));
+ }
+ /**
+ * Ensures that the underlying transaction is actually started on the
server.
+ *
+ * @param tx Transaction.
+ * @param ch Channel.
+ * @param channelResolver Client channel resolver. {@code null} value
means skipping explicit tx begin request.
+ *
+ * @return Future that will be completed when the transaction is started
and first request flag.
+ */
+ public static IgniteBiTuple<CompletableFuture<ClientTransaction>, Boolean>
ensureStarted(
+ Transaction tx,
+ ReliableChannel ch,
+ @Nullable Supplier<CompletableFuture<ClientChannel>>
channelResolver
+ ) {
if (!(tx instanceof ClientLazyTransaction)) {
throw ClientTransaction.unsupportedTxTypeException(tx);
}
- return ((ClientLazyTransaction) tx).ensureStarted(ch, sup);
+ return ((ClientLazyTransaction) tx).ensureStarted(ch, channelResolver);
}
- private synchronized CompletableFuture<ClientTransaction> ensureStarted(
+ private synchronized IgniteBiTuple<CompletableFuture<ClientTransaction>,
Boolean> ensureStarted(
ReliableChannel ch,
- @Nullable Supplier<PartitionMapping> sup
+ @Nullable Supplier<CompletableFuture<ClientChannel>>
channelResolver
) {
var tx0 = tx;
if (tx0 != null) {
- return tx0;
+ return new IgniteBiTuple<>(tx0, false);
}
- @Nullable PartitionMapping pm = sup == null ? null : sup.get();
-
- tx0 = ClientTransactions.beginAsync(ch, pm, options,
observableTimestamp);
+ tx0 = channelResolver != null ? ClientTransactions.beginAsync(ch,
options, observableTimestamp, channelResolver)
+ : new CompletableFuture<>();
tx = tx0;
- return tx0;
+ return new IgniteBiTuple<>(tx0, channelResolver == null);
}
/**
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index 2d9d2d33b96..7cb9fb508d4 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.client.tx;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ViewUtils.sync;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
@@ -135,9 +136,7 @@ public class ClientTransaction implements Transaction {
this.timeout = timeout;
if (cpm != null) {
- // If mapping is known, assign commit partition.
- // However, we don't require direct connection to a commit
partition primary replica here because where is a guarantee that
- // commit partition will be assigned to provided value at the txn
beginning.
+ // if commit partition is known, we can attempt to do direct
mappings in this transaction.
this.commitTableId = cpm.tableId();
this.commitPartition = cpm.partition();
} else {
@@ -220,18 +219,22 @@ public class ClientTransaction implements Transaction {
enlistPartitionLock.writeLock().unlock();
}
- CompletableFuture<Void> finishFut =
ch.inflights().finishFuture(txId());
+ boolean enabled =
ch.protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING, TX_DELAYED_ACKS);
+ CompletableFuture<Void> finishFut = enabled ?
ch.inflights().finishFuture(txId()) : nullCompletedFuture();
CompletableFuture<Void> mainFinishFut = finishFut.thenCompose(ignored
-> ch.serviceAsync(ClientOp.TX_COMMIT, w -> {
w.out().packLong(id);
- if (!isReadOnly &&
w.clientChannel().protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING,
TX_DELAYED_ACKS)) {
- w.out().packLong(tracker.get().longValue());
+
+ if (!isReadOnly && enabled) {
w.out().packInt(enlisted.size());
- for (Entry<TablePartitionId,
CompletableFuture<IgniteBiTuple<String, Long>>> entry : enlisted.entrySet()) {
- w.out().packInt(entry.getKey().tableId());
- w.out().packInt(entry.getKey().partitionId());
- w.out().packString(entry.getValue().getNow(null).get1());
- w.out().packLong(entry.getValue().getNow(null).get2());
+ if (!enlisted.isEmpty()) {
+ for (Entry<TablePartitionId,
CompletableFuture<IgniteBiTuple<String, Long>>> entry : enlisted.entrySet()) {
+ w.out().packInt(entry.getKey().tableId());
+ w.out().packInt(entry.getKey().partitionId());
+
w.out().packString(entry.getValue().getNow(null).get1());
+ w.out().packLong(entry.getValue().getNow(null).get2());
+ }
+ w.out().packLong(tracker.get().longValue());
}
}
}, r -> null));
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
index 1b5fbdd3032..7b8ea671b17 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
@@ -17,14 +17,14 @@
package org.apache.ignite.internal.client.tx;
-import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
-import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.client.tx.ClientTransaction.EMPTY;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ViewUtils.sync;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.client.PartitionMapping;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.client.ClientChannel;
import org.apache.ignite.internal.client.PayloadInputChannel;
import org.apache.ignite.internal.client.ReliableChannel;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -40,7 +40,7 @@ import org.jetbrains.annotations.Nullable;
*/
public class ClientTransactions implements IgniteTransactions {
/** 0 timeout is used as a flag to use the configured timeout. */
- private static final int USE_CONFIGURED_TIMEOUT_DEFAULT = 0;
+ public static final int USE_CONFIGURED_TIMEOUT_DEFAULT = 0;
/** Channel. */
private final ReliableChannel ch;
@@ -63,32 +63,38 @@ public class ClientTransactions implements
IgniteTransactions {
/** {@inheritDoc} */
@Override
public CompletableFuture<Transaction> beginAsync(@Nullable
TransactionOptions options) {
- return CompletableFuture.completedFuture(new
ClientLazyTransaction(ch.observableTimestamp(), options));
+ return completedFuture(new
ClientLazyTransaction(ch.observableTimestamp(), options));
}
+ /**
+ * Begins the transaction on any node.
+ *
+ * @param ch Reliable channel.
+ * @param options The options.
+ * @param observableTimestamp The timestamp.
+ * @param channelResolver Client channel resolver.
+ *
+ * @return The future.
+ */
static CompletableFuture<ClientTransaction> beginAsync(
ReliableChannel ch,
- @Nullable PartitionMapping pm,
@Nullable TransactionOptions options,
- HybridTimestampTracker observableTimestamp
+ HybridTimestampTracker observableTimestamp,
+ Supplier<CompletableFuture<ClientChannel>> channelResolver
) {
boolean readOnly = options != null && options.readOnly();
long timeout = options == null ? USE_CONFIGURED_TIMEOUT_DEFAULT :
options.timeoutMillis();
return ch.serviceAsync(
ClientOp.TX_BEGIN,
+ ch0 -> nullCompletedFuture(),
w -> {
w.out().packBoolean(readOnly);
w.out().packLong(timeout);
w.out().packLong(observableTimestamp.get().longValue());
- if (!readOnly &&
w.clientChannel().protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING,
TX_DELAYED_ACKS)) {
- w.out().packInt(pm == null ? -1 : pm.tableId());
- w.out().packInt(pm == null ? -1 : pm.partition());
- }
},
- r -> readTx(r, readOnly, pm, observableTimestamp, timeout),
- pm == null ? null : pm.nodeConsistentId(),
- null,
+ r -> readTx(r, readOnly, timeout),
+ channelResolver,
null,
false);
}
@@ -96,19 +102,12 @@ public class ClientTransactions implements
IgniteTransactions {
private static ClientTransaction readTx(
PayloadInputChannel r,
boolean isReadOnly,
- @Nullable PartitionMapping pm,
- HybridTimestampTracker tracker,
long timeout
) {
ClientMessageUnpacker in = r.in();
long id = in.unpackLong();
- if (isReadOnly ||
!r.clientChannel().protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING,
TX_DELAYED_ACKS)) {
- return new ClientTransaction(r.clientChannel(), id, isReadOnly,
EMPTY, null, EMPTY, null, timeout);
- } else {
- UUID txId = in.unpackUuid();
- UUID coordId = in.unpackUuid();
- return new ClientTransaction(r.clientChannel(), id, isReadOnly,
txId, pm, coordId, tracker, timeout);
- }
+
+ return new ClientTransaction(r.clientChannel(), id, isReadOnly, EMPTY,
null, EMPTY, null, timeout);
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
index ede65c51059..2aa1124553c 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
@@ -138,7 +138,7 @@ public class ConnectionTest extends AbstractClientTest {
FakeIgnite ignite = new FakeIgnite(nodeName);
try (TestServer testServer =
- new TestServer(0, ignite, null, null, nodeName,
UUID.randomUUID(), null, null, null, false)) {
+ new TestServer(0, ignite, null, null, nodeName,
UUID.randomUUID(), null, null, null, false, null)) {
Builder clientBuilder = IgniteClient.builder()
.addresses("127.0.0.1:" + testServer.port())
@@ -166,7 +166,7 @@ public class ConnectionTest extends AbstractClientTest {
FakeIgnite ignite = new FakeIgnite(nodeName);
try (TestServer testServer =
- new TestServer(0, ignite, null, null, nodeName,
UUID.randomUUID(), null, null, null, false)) {
+ new TestServer(0, ignite, null, null, nodeName,
UUID.randomUUID(), null, null, null, false, null)) {
Builder clientBuilder = IgniteClient.builder()
.addresses("127.0.0.1:" + testServer.port())
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/FeatureCompatibilityTest.java
b/modules/client/src/test/java/org/apache/ignite/client/FeatureCompatibilityTest.java
new file mode 100644
index 00000000000..dcb27b1d457
--- /dev/null
+++
b/modules/client/src/test/java/org/apache/ignite/client/FeatureCompatibilityTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.client.fakes.FakeIgnite;
+import org.apache.ignite.internal.TestHybridClock;
+import org.apache.ignite.internal.client.ClientChannel;
+import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.TcpIgniteClient;
+import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests that after handshake client and servers mutual features are in
expected state.
+ */
+public class FeatureCompatibilityTest extends BaseIgniteAbstractTest {
+ private TestServer testServer;
+
+ private FakeIgnite ignite;
+
+ private IgniteClient client;
+
+ private void startServer(@Nullable BitSet features) {
+ TestHybridClock clock = new TestHybridClock(System::currentTimeMillis);
+
+ ignite = new FakeIgnite("server-1");
+ testServer = new TestServer(0, ignite, reqId -> false, null,
"server-1", UUID.randomUUID(), null, null, clock, true, features);
+
+ client = IgniteClient.builder().addresses("127.0.0.1:" +
testServer.port()).build();
+ }
+
+ private void stopServer() throws Exception {
+ closeAll(client, testServer);
+ }
+
+ @Test
+ public void testDirectMappingEnabled() throws Exception {
+ startServer(null);
+
+ try {
+ ReliableChannel ch = ((TcpIgniteClient) client).channel();
+
+ ClientChannel ch0 = ch.getChannelAsync(null).join();
+
+
assertTrue(ch0.protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING,
TX_DELAYED_ACKS, TX_PIGGYBACK));
+ } finally {
+ stopServer();
+ }
+ }
+
+ @Test
+ public void testDirectMappingDisabled() throws Exception {
+ BitSet features = new BitSet(ProtocolBitmaskFeature.values().length);
+ features.set(TX_DIRECT_MAPPING.featureId());
+ startServer(features);
+
+ try {
+ ReliableChannel ch = ((TcpIgniteClient) client).channel();
+
+ ClientChannel ch0 = ch.getChannelAsync(null).join();
+
+
assertFalse(ch0.protocolContext().isFeatureSupported(TX_DIRECT_MAPPING));
+
assertFalse(ch0.protocolContext().isFeatureSupported(TX_DELAYED_ACKS));
+
assertFalse(ch0.protocolContext().isFeatureSupported(TX_PIGGYBACK));
+ } finally {
+ stopServer();
+ }
+ }
+}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
index 09275b81a49..2b8ee6d2953 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
@@ -51,7 +51,7 @@ public class ObservableTimestampPropagationTest extends
BaseIgniteAbstractTest {
TestHybridClock clock = new
TestHybridClock(currentServerTimestamp::get);
ignite = new FakeIgnite("server-2");
- testServer = new TestServer(0, ignite, null, null, "server-2",
UUID.randomUUID(), null, null, clock, true);
+ testServer = new TestServer(0, ignite, null, null, "server-2",
UUID.randomUUID(), null, null, clock, true, null);
client = IgniteClient.builder().addresses("127.0.0.1:" +
testServer.port()).build();
}
@@ -74,27 +74,27 @@ public class ObservableTimestampPropagationTest extends
BaseIgniteAbstractTest {
// RW TX does not propagate timestamp.
var rwTx = client.transactions().begin();
- ClientLazyTransaction.ensureStarted(rwTx, ch, null).join();
+ ClientLazyTransaction.ensureStarted(rwTx, ch).get1().join();
// RO TX propagates timestamp.
var roTx = client.transactions().begin(roOpts);
- ClientLazyTransaction.ensureStarted(roTx, ch, null).join();
+ ClientLazyTransaction.ensureStarted(roTx, ch).get1().join();
assertEquals(1, lastObservableTimestamp(ch));
// Increase timestamp on server - client does not know about it
initially.
currentServerTimestamp.set(11);
-
ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts), ch,
null).join();
+
ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts),
ch).get1().join();
assertEquals(1, lastObservableTimestamp(ch));
// Subsequent RO TX propagates latest known timestamp.
client.tables().tables();
-
ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts), ch,
null).join();
+
ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts),
ch).get1().join();
assertEquals(11, lastObservableTimestamp(ch));
// Smaller timestamp from server is ignored by client.
currentServerTimestamp.set(9);
-
ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts), ch,
null).join();
-
ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts), ch,
null).join();
+
ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts),
ch).get1().join();
+
ClientLazyTransaction.ensureStarted(client.transactions().begin(roOpts),
ch).get1().join();
assertEquals(11, lastObservableTimestamp(ch));
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index 86743153c48..e2f6d670106 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -104,7 +104,7 @@ public class RetryPolicyTest extends BaseIgniteAbstractTest
{
try (var client = getClient(plc)) {
Transaction tx = client.transactions().begin();
- ClientLazyTransaction.ensureStarted(tx, ((TcpIgniteClient)
client).channel(), null).join();
+ ClientLazyTransaction.ensureStarted(tx, ((TcpIgniteClient)
client).channel()).get1().join();
assertThrows(IgniteClientConnectionException.class, tx::commit);
assertEquals(0, plc.invocations.size());
@@ -167,7 +167,7 @@ public class RetryPolicyTest extends BaseIgniteAbstractTest
{
try (var client = getClient(plc)) {
RecordView<Tuple> recView =
client.tables().table("t").recordView();
Transaction tx = client.transactions().begin();
- ClientLazyTransaction.ensureStarted(tx, ((TcpIgniteClient)
client).channel(), null).join();
+ ClientLazyTransaction.ensureStarted(tx, ((TcpIgniteClient)
client).channel()).get1().join();
var ex = assertThrows(IgniteException.class, () -> recView.get(tx,
Tuple.create().set("id", 1)));
assertThat(ex.getMessage(), containsString("Transaction context
has been lost due to connection errors."));
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
index 36036ce631d..55df8413910 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
@@ -110,6 +110,9 @@ public class TestClientHandlerModule implements
IgniteComponent {
/** Configuration of the client connector. */
private final ClientConnectorConfiguration clientConnectorConfiguration;
+ /** Features set. */
+ private final BitSet features;
+
/**
* Constructor.
*
@@ -124,6 +127,7 @@ public class TestClientHandlerModule implements
IgniteComponent {
* @param clock Clock.
* @param placementDriver Placement driver.
* @param clientConnectorConfiguration Configuration of the client
connector.
+ * @param features Features.
*/
public TestClientHandlerModule(
Ignite ignite,
@@ -136,7 +140,8 @@ public class TestClientHandlerModule implements
IgniteComponent {
AuthenticationManager authenticationManager,
HybridClock clock,
PlacementDriver placementDriver,
- ClientConnectorConfiguration clientConnectorConfiguration
+ ClientConnectorConfiguration clientConnectorConfiguration,
+ @Nullable BitSet features
) {
assert ignite != null;
assert bootstrapFactory != null;
@@ -153,6 +158,7 @@ public class TestClientHandlerModule implements
IgniteComponent {
this.clock = clock;
this.placementDriver = placementDriver;
this.clientConnectorConfiguration = clientConnectorConfiguration;
+ this.features = features;
}
/** {@inheritDoc} */
@@ -212,10 +218,20 @@ public class TestClientHandlerModule implements
IgniteComponent {
ServerBootstrap bootstrap = bootstrapFactory.createServerBootstrap();
- BitSet features = BitSet.valueOf(new
long[]{ThreadLocalRandom.current().nextLong()});
- features.set(ProtocolBitmaskFeature.TX_DIRECT_MAPPING.featureId());
- features.set(ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB.featureId());
- features.set(ProtocolBitmaskFeature.TX_DELAYED_ACKS.featureId());
+ BitSet features;
+
+ if (this.features == null) {
+ features = BitSet.valueOf(new
long[]{ThreadLocalRandom.current().nextLong()});
+ features.set(ProtocolBitmaskFeature.TX_DIRECT_MAPPING.featureId());
+
features.set(ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB.featureId());
+ features.set(ProtocolBitmaskFeature.TX_DELAYED_ACKS.featureId());
+ features.set(ProtocolBitmaskFeature.TX_PIGGYBACK.featureId());
+ } else {
+ features = new BitSet(ProtocolBitmaskFeature.values().length);
+ for (int i = this.features.nextSetBit(0); i != -1; i =
this.features.nextSetBit(i + 1)) {
+ features.set(i);
+ }
+ }
bootstrap.childHandler(new ChannelInitializer<>() {
@Override
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index 34194725490..dfb66ff1e75 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -32,6 +32,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
+import java.util.BitSet;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@@ -150,7 +151,8 @@ public class TestServer implements AutoCloseable {
securityConfiguration,
port,
null,
- true
+ true,
+ null
);
}
@@ -170,7 +172,8 @@ public class TestServer implements AutoCloseable {
@Nullable SecurityConfiguration securityConfiguration,
@Nullable Integer port,
@Nullable HybridClock clock,
- boolean enableRequestHandling
+ boolean enableRequestHandling,
+ @Nullable BitSet features
) {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
@@ -262,7 +265,8 @@ public class TestServer implements AutoCloseable {
authenticationManager,
clock,
ignite.placementDriver(),
- clientConnectorConfiguration)
+ clientConnectorConfiguration,
+ features)
: new ClientHandlerModule(
ignite.queryEngine(),
(IgniteTablesInternal) ignite.tables(),
diff --git
a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
index 6816ef2c891..3cf582e5785 100644
---
a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.client;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.hlc.HybridTimestampTracker.EMPTY_TS_PROVIDER;
+import static
org.apache.ignite.internal.hlc.HybridTimestampTracker.emptyTracker;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -220,7 +221,7 @@ public class RepeatedFinishClientTransactionTest extends
BaseIgniteAbstractTest
tx.commit();
- WriteContext wc = new WriteContext();
+ WriteContext wc = new WriteContext(emptyTracker());
wc.pm = pm;
try {
@@ -251,7 +252,7 @@ public class RepeatedFinishClientTransactionTest extends
BaseIgniteAbstractTest
tx.rollback();
- WriteContext wc = new WriteContext();
+ WriteContext wc = new WriteContext(emptyTracker());
wc.pm = pm;
try {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
index e470017f652..07497f463dd 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.runner.app.client;
+import static java.util.Collections.emptyList;
import static java.util.Comparator.comparing;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
@@ -467,8 +468,9 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
static List<Tuple> generateKeysForNode(int start, int count,
Map<Partition, ClusterNode> map, ClusterNode clusterNode, Table table) {
String clusterNodeName = clusterNode.name();
- assert map.values().stream().anyMatch(x -> Objects.equals(x.name(),
clusterNodeName)) :
- "No partitions for node: " + clusterNodeName;
+ if (map.values().stream().noneMatch(x -> Objects.equals(x.name(),
clusterNodeName))) {
+ return emptyList();
+ }
List<Tuple> keys = new ArrayList<>();
PartitionManager partitionManager = table.partitionManager();
@@ -755,6 +757,38 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
}
}
+ @Test
+ void testExplicitReadOnlyTransaction() {
+ ClientTable table = (ClientTable) table();
+
+ KeyValueView<Tuple, Tuple> kvView = table.keyValueView();
+
+ // Load partition map to ensure all entries are directly mapped.
+ Map<Partition, ClusterNode> map =
table.partitionManager().primaryReplicasAsync().join();
+
+ IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+
+ List<Tuple> tuples0 = generateKeysForNode(600, 20, map,
server0.clusterService().topologyService().localMember(), table);
+ List<Tuple> tuples1 = generateKeysForNode(600, 20, map,
server1.clusterService().topologyService().localMember(), table);
+
+ Tuple k1 = tuples0.get(0);
+ Tuple v1 = val(tuples0.get(0).intValue(0) + "");
+
+ Tuple k2 = tuples1.get(1);
+ Tuple v2 = val(tuples1.get(1).intValue(0) + "");
+
+ kvView.put(null, k1, v1);
+ kvView.put(null, k2, v2);
+
+ Transaction tx = client().transactions().begin(new
TransactionOptions().readOnly(true));
+
+ assertTrue(Tuple.equals(v1, kvView.get(tx, k1)));
+ assertTrue(Tuple.equals(v2, kvView.get(tx, k2)));
+
+ tx.commit();
+ }
+
private KeyValueView<Integer, String> kvView() {
return table().keyValueView(Mapper.of(Integer.class),
Mapper.of(String.class));
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithReplicasTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithReplicasTest.java
index a2c4d18c47f..fd9458925ba 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithReplicasTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithReplicasTest.java
@@ -35,7 +35,6 @@ import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.partition.Partition;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -43,7 +42,6 @@ import org.junit.jupiter.api.Test;
*/
public class ItThinClientTransactionsWithReplicasTest extends
ItAbstractThinClientTest {
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-25431")
void testStaleMapping() {
Map<Partition, ClusterNode> map =
table().partitionManager().primaryReplicasAsync().orTimeout(9,
TimeUnit.SECONDS).join();
@@ -57,6 +55,10 @@ public class ItThinClientTransactionsWithReplicasTest
extends ItAbstractThinClie
List<Tuple> tuples1 = generateKeysForNode(100, 1, map,
server1.clusterService().topologyService().localMember(), table);
List<Tuple> tuples2 = generateKeysForNode(100, 1, map,
server2.clusterService().topologyService().localMember(), table);
+ if (tuples0.isEmpty() || tuples1.isEmpty() || tuples2.isEmpty()) {
+ return; // Skip the test if assignments are bad.
+ }
+
Transaction tx0 = client().transactions().begin();
KeyValueView<Tuple, Tuple> view = table.keyValueView();