This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6b8ce9f766d IGNITE-28019 Fix client tx rollback on local error (#7689)
6b8ce9f766d is described below
commit 6b8ce9f766df00ff0be0c585218459f9046f909e
Author: Aleksei Scherbakov <[email protected]>
AuthorDate: Mon Mar 2 21:02:06 2026 +0300
IGNITE-28019 Fix client tx rollback on local error (#7689)
---
.../ignite/internal/client/TcpClientChannel.java | 2 +-
.../ignite/internal/client/sql/ClientSql.java | 4 +-
.../ignite/internal/client/table/ClientTable.java | 4 +-
.../internal/client/tx/ClientTransaction.java | 18 +++++++--
.../RepeatedFinishClientTransactionTest.java | 3 +-
.../app/client/ItThinClientTransactionsTest.java | 43 ++++++++++++++++++++--
6 files changed, 59 insertions(+), 15 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index cd4512d8330..8a43ac6c517 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -613,7 +613,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
ClientTransaction tx =
inflights.trackedTransaction(err0.txId());
if (tx != null) {
- tx.discardDirectMappings(true);
+ tx.rollbackAndDiscardDirectMappings(true);
}
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index 92c368a4eea..2a8956905e0 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -388,9 +388,7 @@ public class ClientSql implements IgniteSql {
return null;
});
} else {
- // In case of unrecoverable error the tx is already rolled
back on coordinator.
- // Need to additionally cleanup directly mapped parts.
- return tx.discardDirectMappings(false).handle((ignored,
err0) -> {
+ return
tx.rollbackAndDiscardDirectMappings(false).handle((ignored, err0) -> {
if (err0 != null) {
err.addSuppressed(err0);
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index bb499258db0..3b90a75e62a 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -550,9 +550,7 @@ public class ClientTable implements Table {
if (tx0 == null) {
fut.completeExceptionally(ex);
} else {
- // In case of unrecoverable
error the tx is already rolled back on coordinator.
- // We need to additionally
cleanup directly mapped parts.
-
tx0.discardDirectMappings(false).handle((ignored, err0) -> {
+
tx0.rollbackAndDiscardDirectMappings(false).handle((ignored, err0) -> {
if (err0 != null) {
ex.addSuppressed(err0);
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index 414ef106c4e..23970f21c74 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -221,14 +221,13 @@ public class ClientTransaction implements Transaction {
}
/**
- * Discards the directly mapped transaction fragments in case of
coordinator side transaction invalidation
- * (either kill or implicit rollback due to mapping failure, see
postEnlist).
+ * Rolls back a transaction and discards directly mapped transaction
fragments in case of enlistment failure.
*
* @param killed Killed flag.
*
* @return The future.
*/
- public CompletableFuture<Void> discardDirectMappings(boolean killed) {
+ public CompletableFuture<Void> rollbackAndDiscardDirectMappings(boolean
killed) {
enlistPartitionLock.writeLock().lock();
try {
@@ -241,7 +240,18 @@ public class ClientTransaction implements Transaction {
enlistPartitionLock.writeLock().unlock();
}
- return sendDiscardRequests().handle((r, e) -> {
+ // The transaction could not be yet rolled back on a coordinator. Make
sure it's rolled back and client resource is cleaned up.
+ CompletableFuture<Void> rollbackFut =
ch.serviceAsync(ClientOp.TX_ROLLBACK, w -> {
+ w.out().packLong(id);
+
+ if (!isReadOnly &&
w.clientChannel().protocolContext().isFeatureSupported(TX_PIGGYBACK)) {
+ w.out().packInt(0); // Don't send direct enlistments.
+ }
+ }, r -> null);
+
+ // It's safe to rollback proxy and direct parts of transactions
concurrently.
+ // Write intent resolution will ignore WIs from PENDING transactions.
+ return CompletableFuture.allOf(rollbackFut,
sendDiscardRequests()).handle((r, e) -> {
setState(killed ? STATE_KILLED : STATE_ROLLED_BACK);
ch.inflights().erase(txId());
this.finishFut.complete(null);
diff --git
a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
index 5d750fa5fa0..663086caa78 100644
---
a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
@@ -242,7 +242,8 @@ public class RepeatedFinishClientTransactionTest extends
BaseIgniteAbstractTest
private static Stream<Arguments> rollbackClosureFactory() {
return Stream.of(
argumentSet("rollback", (Consumer<ClientTransaction>)
ClientTransaction::rollback),
- argumentSet("discard", (Consumer<ClientTransaction>)
clientTransaction -> clientTransaction.discardDirectMappings(false))
+ argumentSet("discard",
+ (Consumer<ClientTransaction>) clientTransaction ->
clientTransaction.rollbackAndDiscardDirectMappings(false))
);
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
index e8fa68cacb2..7a86359cbdc 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
@@ -1495,6 +1495,43 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
assertThat(kvView.removeAllAsync(null, Arrays.asList(key0, key, key2,
key3, key4)), willSucceedFast());
}
+ @ParameterizedTest
+ @MethodSource("killTestContextFactory")
+ public void testRollbackOnLocalError(KillTestContext ctx) throws Exception
{
+ ClientTable table = (ClientTable) table();
+ ClientSql sql = (ClientSql) client().sql();
+ KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+
+ Map<Partition, ClusterNode> map =
table.partitionDistribution().primaryReplicasAsync().join();
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = unwrapIgniteImpl(server(1));
+
+ List<Tuple> tuples0 = generateKeysForNode(100, 10, map,
server0.cluster().localNode(), table);
+ List<Tuple> tuples1 = generateKeysForNode(100, 10, map,
server1.cluster().localNode(), table);
+
+ // Init SQL mappings.
+ Tuple key0 = tuples0.get(0);
+ sql.execute(format("INSERT INTO %s (%s, %s) VALUES (?, ?)",
TABLE_NAME, COLUMN_KEY, COLUMN_VAL),
+ key0.intValue(0), key0.intValue(0) + "");
+ await().atMost(2, TimeUnit.SECONDS)
+ .until(() ->
sql.partitionAwarenessCachedMetas().stream().allMatch(PartitionMappingProvider::ready));
+
+ ClientLazyTransaction txProxy = (ClientLazyTransaction)
client().transactions().begin();
+
+ Tuple key = tuples0.get(1);
+ Tuple key3 = tuples1.get(0);
+
+ assertThat(ctx.put.apply(client(), txProxy, key), willSucceedFast());
// Proxy mode.
+
+ assertThat(ctx.put.apply(client(), txProxy, key3), willSucceedFast());
// Direct mode.
+
+ Tuple key2 = Tuple.create().set("id1", "1"); // Intentionally use
wrong schema.
+ assertThat(ctx.put.apply(client(), txProxy, key2),
willThrowWithCauseOrSuppressed(IgniteException.class));
+
+ // Ensure all enlisted keys are unlocked.
+ assertThat(kvView.removeAllAsync(null, Arrays.asList(key0, key,
key3)), willSucceedFast());
+ }
+
@AfterEach
protected void validateInflights() throws NoSuchFieldException {
System.out.println("DBG: validateInflights");
@@ -1565,12 +1602,12 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
private static CompletableFuture<?> putSql(IgniteClient client,
Transaction tx, Tuple key) {
return client.sql()
- .executeAsync(tx, format("INSERT INTO %s (%s, %s) VALUES (?,
?)", TABLE_NAME, COLUMN_KEY, COLUMN_VAL), key.intValue(0),
- key.intValue(0) + "");
+ .executeAsync(tx, format("INSERT INTO %s (%s, %s) VALUES (?,
?)", TABLE_NAME, COLUMN_KEY, COLUMN_VAL), key.value(0),
+ key.value(0) + "");
}
private static CompletableFuture<?> putKv(IgniteClient client, Transaction
tx, Tuple key) {
- return client.tables().tables().get(0).keyValueView().putAsync(tx,
key, val(key.intValue(0) + ""));
+ return client.tables().tables().get(0).keyValueView().putAsync(tx,
key, val(key.value(0) + ""));
}
private static class KillTestContext {