This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 372b1ba8428 IGNITE-27202 Fix an issue a wrong commit partition ID for
directly mapped transactions. (#7105)
372b1ba8428 is described below
commit 372b1ba842873a1340523d168659d841f7c0255a
Author: Slava Koptilin <[email protected]>
AuthorDate: Fri Nov 28 17:35:44 2025 +0200
IGNITE-27202 Fix an issue a wrong commit partition ID for directly mapped
transactions. (#7105)
---
.../handler/ClientInboundMessageHandler.java | 2 +-
.../requests/sql/ClientSqlExecuteBatchRequest.java | 14 ++-
.../requests/sql/ClientSqlExecuteRequest.java | 17 ++-
.../sql/ClientSqlQueryMetadataRequest.java | 8 +-
.../handler/requests/table/ClientTableCommon.java | 133 +++++++++++++++------
.../requests/table/ClientTupleRequestBase.java | 16 ++-
.../requests/table/ClientTuplesRequestBase.java | 8 +-
.../apache/ignite/client/fakes/FakeTxManager.java | 14 ++-
.../replicator/PartitionReplicaListener.java | 6 +
.../replication/PartitionReplicaListenerTest.java | 27 ++---
.../org/apache/ignite/internal/tx/TxManager.java | 4 +-
.../tx/impl/RemoteReadWriteTransaction.java | 8 +-
.../ignite/internal/tx/impl/TxManagerImpl.java | 12 +-
13 files changed, 184 insertions(+), 85 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 3dc893edaf6..01dea112d25 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -964,7 +964,7 @@ public class ClientInboundMessageHandler
return ClientSqlExecuteRequest.process(
partitionOperationsExecutor, in, requestId,
cancelHandles, queryProcessor, resources, metrics, tsTracker,
clientContext.hasFeature(SQL_PARTITION_AWARENESS),
clientContext.hasFeature(SQL_DIRECT_TX_MAPPING), txManager,
- clockService, notificationSender(requestId),
resolveCurrentUsername(),
+ igniteTables, clockService,
notificationSender(requestId), resolveCurrentUsername(),
clientContext.hasFeature(SQL_MULTISTATEMENT_SUPPORT)
);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
index fd7429b568b..03c353061b0 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.client.handler.requests.sql;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -65,7 +64,16 @@ public class ClientSqlExecuteBatchRequest {
CancelHandle cancelHandle = CancelHandle.create();
cancelHandleMap.put(requestId, cancelHandle);
- InternalTransaction tx = readTx(in, tsTracker, resources, null, null,
null);
+ CompletableFuture<InternalTransaction> txFut = readTx(
+ in,
+ tsTracker,
+ resources,
+ null,
+ null,
+ null,
+ null
+ );
+
ClientSqlProperties props = new ClientSqlProperties(in, false);
String statement = in.unpackString();
BatchedArguments arguments = readArgs(in);
@@ -73,7 +81,7 @@ public class ClientSqlExecuteBatchRequest {
HybridTimestamp clientTs =
HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
tsTracker.update(clientTs);
- return nullCompletedFuture().thenComposeAsync(none -> {
+ return txFut.thenComposeAsync(tx -> {
return IgniteSqlImpl.executeBatchCore(
sql,
tsTracker,
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index 418c9238d5e..42390807490 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -21,7 +21,6 @@ import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
import static org.apache.ignite.internal.util.CompletableFutures.allOf;
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.ArrayList;
import java.util.List;
@@ -50,6 +49,7 @@ import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.CancelHandle;
import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.table.IgniteTables;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
@@ -92,6 +92,7 @@ public class ClientSqlExecuteRequest {
boolean sqlPartitionAwarenessSupported,
boolean sqlDirectTxMappingSupported,
TxManager txManager,
+ IgniteTables tables,
ClockService clockService,
NotificationSender notificationSender,
@Nullable String username,
@@ -105,7 +106,17 @@ public class ClientSqlExecuteRequest {
}
long[] resIdHolder = {0};
- InternalTransaction tx = readTx(in, timestampTracker, resources,
txManager, notificationSender, resIdHolder);
+
+ CompletableFuture<InternalTransaction> txFut = readTx(
+ in,
+ timestampTracker,
+ resources,
+ txManager,
+ tables,
+ notificationSender,
+ resIdHolder
+ );
+
ClientSqlProperties props = new ClientSqlProperties(in,
sqlMultistatementsSupported);
String statement = in.unpackString();
Object[] arguments = readArgsNotNull(in);
@@ -115,7 +126,7 @@ public class ClientSqlExecuteRequest {
boolean includePartitionAwarenessMeta = sqlPartitionAwarenessSupported
&& in.unpackBoolean();
- return nullCompletedFuture().thenComposeAsync(none -> executeAsync(
+ return txFut.thenComposeAsync(tx -> executeAsync(
tx,
sql,
timestampTracker,
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
index 4437b70536b..49db22c22bb 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.client.handler.requests.sql;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -30,6 +29,7 @@ import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.SqlProperties;
import org.apache.ignite.internal.sql.engine.prepare.QueryMetadata;
+import org.apache.ignite.internal.tx.InternalTransaction;
/**
* Client SQL request for the parameter metadata.
@@ -51,15 +51,15 @@ public class ClientSqlQueryMetadataRequest {
ClientResourceRegistry resources,
HybridTimestampTracker tsTracker
) {
- var tx = readTx(in, tsTracker, resources, null, null, null);
+ CompletableFuture<InternalTransaction> txFut = readTx(in, tsTracker,
resources, null, null, null, null);
String schema = in.unpackString();
String query = in.unpackString();
SqlProperties properties = new SqlProperties().defaultSchema(schema);
- return nullCompletedFuture()
- .thenComposeAsync(none ->
processor.prepareSingleAsync(properties, tx, query)
+ return txFut
+ .thenComposeAsync(tx ->
processor.prepareSingleAsync(properties, tx, query)
.thenApply(meta -> out -> writeMeta(out, meta)),
operationExecutor);
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index 60e17fb31c4..c3301a7fb86 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -17,10 +17,12 @@
package org.apache.ignite.client.handler.requests.table;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.client.proto.ClientMessageCommon.NO_VALUE;
import static
org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_DIRECT;
import static
org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_FIRST_DIRECT;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Client.TABLE_ID_NOT_FOUND_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR;
@@ -46,7 +48,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.lang.NodeStoppingException;
-import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.schema.SchemaAware;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -172,7 +174,8 @@ public class ClientTableCommon {
static void writeTuples(
ClientMessagePacker packer,
Collection<Tuple> tuples,
- SchemaRegistry schemaRegistry) {
+ SchemaRegistry schemaRegistry
+ ) {
writeTuples(packer, tuples, TuplePart.KEY_AND_VAL, schemaRegistry);
}
@@ -247,7 +250,12 @@ public class ClientTableCommon {
}
public static CompletableFuture<Tuple> readTuple(
- int schemaId, BitSet noValueSet, byte[] tupleBytes,
TableViewInternal table, boolean keyOnly) {
+ int schemaId,
+ BitSet noValueSet,
+ byte[] tupleBytes,
+ TableViewInternal table,
+ boolean keyOnly
+ ) {
return readSchema(schemaId, table).thenApply(schema ->
readTuple(noValueSet, tupleBytes, keyOnly, schema));
}
@@ -301,7 +309,7 @@ public class ClientTableCommon {
// This method can return a table that is being stopped, but it's
not a problem - any operation on such table will fail.
TableViewInternal cachedTable =
tablesInternal.cachedTable(tableId);
if (cachedTable != null) {
- return CompletableFuture.completedFuture(cachedTable);
+ return completedFuture(cachedTable);
}
return tablesInternal.tableAsync(tableId)
@@ -326,7 +334,11 @@ public class ClientTableCommon {
* @param req Request.
*/
static void writeTxMeta(
- ClientMessagePacker out, HybridTimestampTracker tsTracker,
@Nullable ClockService clockService, ClientTupleRequestBase req) {
+ ClientMessagePacker out,
+ HybridTimestampTracker tsTracker,
+ @Nullable ClockService clockService,
+ ClientTupleRequestBase req
+ ) {
writeTxMeta(out, tsTracker, clockService, req.tx(), req.resourceId());
}
@@ -339,7 +351,11 @@ public class ClientTableCommon {
* @param req Request.
*/
static void writeTxMeta(
- ClientMessagePacker out, HybridTimestampTracker tsTracker,
@Nullable ClockService clockService, ClientTuplesRequestBase req) {
+ ClientMessagePacker out,
+ HybridTimestampTracker tsTracker,
+ @Nullable ClockService clockService,
+ ClientTuplesRequestBase req
+ ) {
writeTxMeta(out, tsTracker, clockService, req.tx(), req.resourceId());
}
@@ -352,8 +368,13 @@ public class ClientTableCommon {
* @param tx Transaction.
* @param resourceId Resource id.
*/
- public static void writeTxMeta(ClientMessagePacker out,
HybridTimestampTracker tsTracker, @Nullable ClockService clockService,
- InternalTransaction tx, long resourceId) {
+ public static void writeTxMeta(
+ ClientMessagePacker out,
+ HybridTimestampTracker tsTracker,
+ @Nullable ClockService clockService,
+ InternalTransaction tx,
+ long resourceId
+ ) {
if (resourceId != 0) {
// Resource id is assigned on a first request in direct mode.
out.packLong(resourceId);
@@ -393,14 +414,25 @@ public class ClientTableCommon {
* @param resourceIdHolder Resource id holder.
* @return Transaction, if present, or null.
*/
- public static @Nullable InternalTransaction readTx(
+ public static CompletableFuture<@Nullable InternalTransaction> readTx(
ClientMessageUnpacker in,
HybridTimestampTracker tsUpdater,
ClientResourceRegistry resources,
@Nullable TxManager txManager,
+ @Nullable IgniteTables tables,
@Nullable NotificationSender notificationSender,
- long[] resourceIdHolder) {
- return readTx(in, tsUpdater, resources, txManager, notificationSender,
resourceIdHolder, EnumSet.noneOf(RequestOptions.class));
+ long[] resourceIdHolder
+ ) {
+ return readTx(
+ in,
+ tsUpdater,
+ resources,
+ txManager,
+ tables,
+ notificationSender,
+ resourceIdHolder,
+ EnumSet.noneOf(RequestOptions.class)
+ );
}
/**
@@ -415,21 +447,26 @@ public class ClientTableCommon {
* @param options Request options. Defines how a request is processed.
* @return Transaction, if present, or null.
*/
- public static @Nullable InternalTransaction readTx(
+ public static CompletableFuture<@Nullable InternalTransaction> readTx(
ClientMessageUnpacker in,
HybridTimestampTracker tsUpdater,
ClientResourceRegistry resources,
@Nullable TxManager txManager,
+ @Nullable IgniteTables tables,
@Nullable NotificationSender notificationSender,
long[] resourceIdHolder,
- EnumSet<RequestOptions> options) {
+ EnumSet<RequestOptions> options
+ ) {
if (in.tryUnpackNil()) {
- return null;
+ return nullCompletedFuture();
}
try {
long id = in.unpackLong();
+
if (id == TX_ID_FIRST_DIRECT) {
+ assert txManager != null : "Transaction manager must be
specified to process directly mapped requests.";
+
// This is first mapping request, which piggybacks transaction
creation.
long observableTs = in.unpackLong();
@@ -457,29 +494,43 @@ public class ClientTableCommon {
// Attach resource id only on first direct request.
resourceIdHolder[0] = resources.put(new ClientResource(tx,
tx::rollbackAsync));
- return tx;
+ return completedFuture(tx);
} else if (id == TX_ID_DIRECT) {
+ assert txManager != null : "Transaction manager must be
specified to process directly mapped requests.";
+ assert tables != null : "Tables manager must be specified to
process directly mapped requests.";
+
// This is direct request mapping.
long token = in.unpackLong();
UUID txId = in.unpackUuid();
- int commitTableId = in.unpackInt();
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-25017
+ // partition awareness feature should be reworked to use zone
ids.
+ int tableId = in.unpackInt();
int commitPart = in.unpackInt();
+
UUID coord = in.unpackUuid();
long timeout = in.unpackLong();
- InternalTransaction remote = txManager.beginRemote(txId, new
TablePartitionId(commitTableId, commitPart),
- coord, token, timeout, err -> {
- // Will be called for write txns.
- notificationSender.sendNotification(w ->
w.packUuid(txId), err, NULL_HYBRID_TIMESTAMP);
+ return readTableAsync(tableId, tables)
+ .thenApply(table -> {
+ InternalTransaction remote =
txManager.beginRemote(txId, new ZonePartitionId(table.zoneId(), commitPart),
+ coord, token, timeout, err -> {
+ // Need clarification: why might
this listener be null when processing SQL queries?
+ // See
ClientSqlExecuteRequest#process
+ if (notificationSender != null) {
+ // Will be called for write
txns.
+
notificationSender.sendNotification(w -> w.packUuid(txId), err,
NULL_HYBRID_TIMESTAMP);
+ }
+ });
+
+ // Remote transaction will be synchronously rolled
back if the timeout has exceeded.
+ if (remote.isRolledBackWithTimeoutExceeded()) {
+ throw new
TransactionException(TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR,
+ "Transaction is already finished [tx="
+ remote + "].");
+ }
+
+ return remote;
});
-
- // Remote transaction will be synchronously rolled back if the
timeout has exceeded.
- if (remote.isRolledBackWithTimeoutExceeded()) {
- throw new
TransactionException(TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR,
- "Transaction is already finished [tx=" + remote +
"].");
- }
-
- return remote;
}
var tx = resources.get(id).get(InternalTransaction.class);
@@ -490,29 +541,33 @@ public class ClientTableCommon {
tsUpdater.update(tx.readTimestamp()); // TODO
https://issues.apache.org/jira/browse/IGNITE-24592
}
- return tx;
+ return completedFuture(tx);
} catch (IgniteInternalCheckedException e) {
throw new IgniteException(e.traceId(), e.code(), e.getMessage(),
e);
}
}
- static InternalTransaction readOrStartImplicitTx(
+ static CompletableFuture<InternalTransaction> readOrStartImplicitTx(
ClientMessageUnpacker in,
HybridTimestampTracker readTs,
ClientResourceRegistry resources,
TxManager txManager,
+ IgniteTables tables,
EnumSet<RequestOptions> options,
@Nullable NotificationSender notificationSender,
- long[] resourceIdHolder) {
- InternalTransaction tx = readTx(in, readTs, resources, txManager,
notificationSender, resourceIdHolder, options);
-
- if (tx == null) {
- // Implicit transactions do not use an observation timestamp
because RW never depends on it, and implicit RO is always direct.
- // The direct transaction uses a current timestamp on the primary
replica by definition.
- tx = startImplicitTx(readTs, txManager,
options.contains(RequestOptions.READ_ONLY));
- }
+ long[] resourceIdHolder
+ ) {
+ return readTx(in, readTs, resources, txManager, tables,
notificationSender, resourceIdHolder, options)
+ .thenApply(tx -> {
+ if (tx == null) {
+ // Implicit transactions do not use an observation
timestamp because RW never depends on it,
+ // and implicit RO is always direct.
+ // The direct transaction uses a current timestamp on
the primary replica by definition.
+ tx = startImplicitTx(readTs, txManager,
options.contains(RequestOptions.READ_ONLY));
+ }
- return tx;
+ return tx;
+ });
}
/**
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
index 9787ef0a055..eefed432cd8 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
@@ -97,7 +97,16 @@ class ClientTupleRequestBase {
long[] resIdHolder = {0};
- InternalTransaction tx = readOrStartImplicitTx(in, tsTracker,
resources, txManager, options, notificationSender, resIdHolder);
+ CompletableFuture<InternalTransaction> txFut = readOrStartImplicitTx(
+ in,
+ tsTracker,
+ resources,
+ txManager,
+ tables,
+ options,
+ notificationSender,
+ resIdHolder
+ );
int schemaId = in.unpackInt();
@@ -107,7 +116,7 @@ class ClientTupleRequestBase {
BitSet noValueSet2 = options.contains(READ_SECOND_TUPLE) ?
in.unpackBitSet() : null;
byte[] tupleBytes2 = options.contains(READ_SECOND_TUPLE) ?
in.readBinary() : null;
- return readTableAsync(tableId, tables)
+ return txFut.thenCompose(tx -> readTableAsync(tableId, tables)
.thenCompose(table -> ClientTableCommon.readSchema(schemaId,
table)
.thenApply(schema -> {
var tuple = readTuple(noValueSet, tupleBytes,
options.contains(KEY_ONLY), schema);
@@ -115,7 +124,6 @@ class ClientTupleRequestBase {
? readTuple(noValueSet2, tupleBytes2,
options.contains(KEY_ONLY), schema) : null;
return new ClientTupleRequestBase(tx, table,
tuple, tuple2, resIdHolder[0]);
- }));
-
+ })));
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
index cf3ad63b583..c0dbcb6d0d6 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
@@ -81,8 +81,8 @@ class ClientTuplesRequestBase {
long[] resIdHolder = {0};
- InternalTransaction tx =
- readOrStartImplicitTx(in, tsTracker, resources, txManager,
options, notificationSender, resIdHolder);
+ CompletableFuture<InternalTransaction> txFut =
+ readOrStartImplicitTx(in, tsTracker, resources, txManager,
tables, options, notificationSender, resIdHolder);
int schemaId = in.unpackInt();
@@ -96,7 +96,7 @@ class ClientTuplesRequestBase {
tupleBytes[i] = in.readBinary();
}
- return readTableAsync(tableId, tables)
+ return txFut.thenCompose(tx -> readTableAsync(tableId, tables)
.thenCompose(table -> ClientTableCommon.readSchema(schemaId,
table)
.thenApply(schema -> {
var tuples = new ArrayList<Tuple>(count);
@@ -106,6 +106,6 @@ class ClientTuplesRequestBase {
}
return new ClientTuplesRequestBase(tx, table,
tuples, resIdHolder[0]);
- }));
+ })));
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 452fe0f817e..e538cc4a9eb 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -30,7 +30,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.InternalTxOptions;
import org.apache.ignite.internal.tx.LockManager;
@@ -103,7 +103,7 @@ public class FakeTxManager implements TxManager {
}
@Override
- public TablePartitionId commitPartition() {
+ public ReplicationGroupId commitPartition() {
return null;
}
@@ -289,8 +289,14 @@ public class FakeTxManager implements TxManager {
}
@Override
- public InternalTransaction beginRemote(UUID txId, TablePartitionId
commitPartId, UUID coord, long token, long timeout,
- Consumer<Throwable> cb) {
+ public InternalTransaction beginRemote(
+ UUID txId,
+ ZonePartitionId commitPartId,
+ UUID coord,
+ long token,
+ long timeout,
+ Consumer<Throwable> cb
+ ) {
return null;
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 8832e28c220..5f25e9a307f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -2826,6 +2826,12 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
assert commitPartitionId != null : "Commit partition is null [type=" +
request.requestType() + ']';
+ // TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove this
assert.
+ assert commitPartitionId instanceof ZonePartitionId : "Commit
partition id must be zone aware ["
+ + "requestType=" + request.requestType()
+ + ", commitPartitionId=" + commitPartitionId
+ + ", class=" + commitPartitionId.getClass().getSimpleName() +
']';
+
switch (request.requestType()) {
case RW_DELETE_EXACT: {
return resolveRowByPk(extractPk(searchRow), txId, (rowId, row,
lastCommitTime) -> {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 459626831c6..2a8e878b8b5 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -172,7 +172,6 @@ import
org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
import
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage;
import org.apache.ignite.internal.schema.AlwaysSyncedSchemaSyncService;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -440,7 +439,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.schemaId(1)
.primaryKeyIndexId(2)
.name(TABLE_NAME)
- .zoneId(1)
+ .zoneId(ZONE_ID)
.newColumns(columns)
.primaryKeyColumns(IntList.of(0, 1))
.storageProfile(CatalogService.DEFAULT_STORAGE_PROFILE)
@@ -820,7 +819,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
var rowId = new RowId(PART_ID);
pkStorage().put(testBinaryRow, rowId);
- testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
+ testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, ZONE_ID,
PART_ID);
testMvPartitionStorage.commitWrite(rowId, clock.now(), txId);
CompletableFuture<ReplicaResult> fut =
doReadOnlySingleGet(testBinaryKey);
@@ -839,7 +838,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
txState = COMMITTED;
pkStorage().put(testBinaryRow, rowId);
- testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
+ testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, ZONE_ID,
PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(COMMITTED,
localNode.id(), commitPartitionId, clock.now(), null, null));
CompletableFuture<ReplicaResult> fut =
doReadOnlySingleGet(testBinaryKey);
@@ -857,7 +856,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
var rowId = new RowId(PART_ID);
pkStorage().put(testBinaryRow, rowId);
- testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
+ testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, ZONE_ID,
PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING,
localNode.id(), commitPartitionId, null, null, null));
CompletableFuture<ReplicaResult> fut =
doReadOnlySingleGet(testBinaryKey);
@@ -876,7 +875,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
txState = ABORTED;
pkStorage().put(testBinaryRow, rowId);
- testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
+ testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, ZONE_ID,
PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(ABORTED,
localNode.id(), commitPartitionId, null, null, null));
CompletableFuture<ReplicaResult> fut =
doReadOnlySingleGet(testBinaryKey);
@@ -900,7 +899,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
new BinaryTupleBuilder(1).appendInt(indexedVal).build());
BinaryRow storeRow = binaryRow(key(nextBinaryKey()), testValue);
- testMvPartitionStorage.addWrite(rowId, storeRow, txId, TABLE_ID,
PART_ID);
+ testMvPartitionStorage.addWrite(rowId, storeRow, txId, ZONE_ID,
PART_ID);
sortedIndexStorage.storage().put(new IndexRowImpl(indexedValue,
rowId));
testMvPartitionStorage.commitWrite(rowId, clock.now(), txId);
});
@@ -1028,7 +1027,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
new BinaryTupleBuilder(1).appendInt(indexedVal).build());
BinaryRow storeRow = binaryRow(key(nextBinaryKey()), testValue);
- testMvPartitionStorage.addWrite(rowId, storeRow, txId, TABLE_ID,
PART_ID);
+ testMvPartitionStorage.addWrite(rowId, storeRow, txId, ZONE_ID,
PART_ID);
sortedIndexStorage.storage().put(new IndexRowImpl(indexedValue,
rowId));
testMvPartitionStorage.commitWrite(rowId, clock.now(), txId);
});
@@ -1141,7 +1140,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
new BinaryTupleBuilder(1).appendInt(indexedVal).build());
BinaryRow storeRow = binaryRow(key(nextBinaryKey()), testValue);
- testMvPartitionStorage.addWrite(rowId, storeRow, txId, TABLE_ID,
PART_ID);
+ testMvPartitionStorage.addWrite(rowId, storeRow, txId, ZONE_ID,
PART_ID);
hashIndexStorage.storage().put(new IndexRowImpl(indexedValue,
rowId));
testMvPartitionStorage.commitWrite(rowId, clock.now(), txId);
});
@@ -1389,10 +1388,10 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
);
}
- private static TablePartitionIdMessage commitPartitionId() {
- return REPLICA_MESSAGES_FACTORY.tablePartitionIdMessage()
+ private static ZonePartitionIdMessage commitPartitionId() {
+ return REPLICA_MESSAGES_FACTORY.zonePartitionIdMessage()
.partitionId(PART_ID)
- .tableId(TABLE_ID)
+ .zoneId(ZONE_ID)
.build();
}
@@ -1566,7 +1565,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
// This is needed to check that this row will be skipped by RO tx
and it will see the data anyway.
// TODO https://issues.apache.org/jira/browse/IGNITE-18767 after
this, the following check may be not needed.
RowId emptyRowId = new RowId(PART_ID, new UUID(Long.MIN_VALUE,
Long.MIN_VALUE));
- testMvPartitionStorage.addWrite(emptyRowId, null, tx1, TABLE_ID,
PART_ID);
+ testMvPartitionStorage.addWrite(emptyRowId, null, tx1, ZONE_ID,
PART_ID);
if (committed) {
testMvPartitionStorage.commitWrite(emptyRowId, clock.now(),
tx1);
@@ -1701,7 +1700,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
);
pkStorage().put(futureSchemaVersionRow, rowId);
- testMvPartitionStorage.addWrite(rowId, futureSchemaVersionRow,
futureSchemaVersionTxId, TABLE_ID, PART_ID);
+ testMvPartitionStorage.addWrite(rowId, futureSchemaVersionRow,
futureSchemaVersionTxId, ZONE_ID, PART_ID);
sortedIndexStorage.storage().put(new IndexRowImpl(indexedValue,
rowId));
testMvPartitionStorage.commitWrite(rowId, clock.now(),
futureSchemaVersionTxId);
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index bc56cf8e29b..34e1192081e 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
import org.apache.ignite.internal.tx.metrics.ResourceVacuumMetrics;
import org.jetbrains.annotations.Nullable;
@@ -117,7 +117,7 @@ public interface TxManager extends IgniteComponent {
*
* @return Remote transaction.
*/
- InternalTransaction beginRemote(UUID txId, TablePartitionId commitPartId,
UUID coord, long token, long timeout, Consumer<Throwable> cb);
+ InternalTransaction beginRemote(UUID txId, ZonePartitionId commitPartId,
UUID coord, long token, long timeout, Consumer<Throwable> cb);
/**
* Returns a transaction state meta.
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemoteReadWriteTransaction.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemoteReadWriteTransaction.java
index 2247e8ab3ad..c72e58b282f 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemoteReadWriteTransaction.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/RemoteReadWriteTransaction.java
@@ -25,7 +25,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.tx.InternalTransaction;
@@ -42,7 +42,7 @@ public abstract class RemoteReadWriteTransaction implements
InternalTransaction
private static final String EXCEPTION_MSG = "Remote transaction should
never be finished directly";
private final UUID txId;
- private final TablePartitionId commitGroupId;
+ private final ZonePartitionId commitGroupId;
private final long timeout;
private final UUID coord;
private final String localNodeConsistentId;
@@ -59,7 +59,7 @@ public abstract class RemoteReadWriteTransaction implements
InternalTransaction
* @param localNode Local node.
* @param timeout The timeout.
*/
- RemoteReadWriteTransaction(UUID txId, TablePartitionId commitGroupId, UUID
coord, long token, InternalClusterNode localNode,
+ RemoteReadWriteTransaction(UUID txId, ZonePartitionId commitGroupId, UUID
coord, long token, InternalClusterNode localNode,
long timeout) {
this.txId = txId;
this.commitGroupId = commitGroupId;
@@ -110,7 +110,7 @@ public abstract class RemoteReadWriteTransaction implements
InternalTransaction
}
@Override
- public TablePartitionId commitPartition() {
+ public ZonePartitionId commitPartition() {
return commitGroupId;
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 85253160aa1..83231bc39a0 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -952,9 +952,15 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
}
@Override
- public InternalTransaction beginRemote(UUID txId, TablePartitionId
commitPartId, UUID coord, long token, long timeout,
- Consumer<Throwable> cb) {
- assert commitPartId.tableId() > 0 && commitPartId.partitionId() >= 0 :
"Illegal condition for direct mapping: " + commitPartId;
+ public InternalTransaction beginRemote(
+ UUID txId,
+ ZonePartitionId commitPartId,
+ UUID coord,
+ long token,
+ long timeout,
+ Consumer<Throwable> cb
+ ) {
+ assert commitPartId.zoneId() >= 0 && commitPartId.partitionId() >= 0 :
"Illegal condition for direct mapping: " + commitPartId;
// Switch to default timeout if needed.
timeout = timeout == USE_CONFIGURED_TIMEOUT_DEFAULT ?
txConfig.readWriteTimeoutMillis().value() : timeout;