This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3eb50900656 IGNITE-27651 Clean up direct tx on client disconnect
(#7779)
3eb50900656 is described below
commit 3eb50900656064276d76fc346d4e90ce7765e892
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Mar 19 13:55:50 2026 +0100
IGNITE-27651 Clean up direct tx on client disconnect (#7779)
Store remote enlistments in client resource registry and clean up on
disconnect.
---
.../client/handler/ClientResourceRegistry.java | 82 ++++++++++++++--
.../handler/requests/table/ClientTableCommon.java | 19 ++++
.../tx/ClientTransactionDiscardRequest.java | 30 +-----
.../tx/ClientTxPartitionEnlistmentCleaner.java | 88 +++++++++++++++++
.../ignite/internal/client/ClientFutureUtils.java | 20 +---
.../internal/client/tx/ClientTransaction.java | 3 +-
.../apache/ignite/internal/client/tx/DEVNOTES.md | 60 ++++++++++++
.../apache/ignite/client/fakes/FakeTxManager.java | 8 +-
.../ignite/internal/util/ExceptionUtils.java | 25 +++++
.../ignite/jdbc/ItJdbcConnectionFailoverTest.java | 5 +-
.../client/ItThinClientTransactionCleanupTest.java | 106 +++++++++++++++++++++
.../org/apache/ignite/internal/tx/TxManager.java | 11 ++-
.../ignite/internal/tx/impl/OrphanDetector.java | 62 +++++++-----
.../ignite/internal/tx/impl/TxManagerImpl.java | 27 ++++--
.../internal/tx/impl/OrphanDetectorTest.java | 2 +-
15 files changed, 455 insertions(+), 93 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
index 8ed619b3072..5ef694506e7 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
@@ -17,12 +17,21 @@
package org.apache.ignite.client.handler;
+import static
org.apache.ignite.internal.util.ExceptionUtils.existingCauseOrSuppressed;
+
+import java.util.HashSet;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import
org.apache.ignite.client.handler.requests.tx.ClientTxPartitionEnlistmentCleaner;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.ErrorGroups.Client;
import org.apache.ignite.lang.IgniteException;
@@ -49,6 +58,8 @@ public class ClientResourceRegistry {
private final AtomicBoolean stopGuard = new AtomicBoolean();
+ private final ConcurrentHashMap<UUID, ClientTxPartitionEnlistmentCleaner>
txCleaners = new ConcurrentHashMap<>();
+
/**
* Stores the resource and returns the generated id.
*
@@ -112,6 +123,46 @@ public class ClientResourceRegistry {
}
}
+ /**
+ * Records that a remote transaction enlisted a partition on this node.
+ *
+ * @param txId Transaction ID.
+ * @param tableId Table ID.
+ * @param partitionId Partition ID.
+ * @param txManager Transaction manager responsible for coordinating and
cleaning up the transaction.
+ * @param tables Tables facade used to resolve table information for
enlisted partitions.
+ */
+ public void addTxCleaner(UUID txId, int tableId, int partitionId,
TxManager txManager, IgniteTablesInternal tables)
+ throws IgniteInternalCheckedException {
+ enter();
+
+ try {
+ txCleaners
+ .computeIfAbsent(txId, k -> new
ClientTxPartitionEnlistmentCleaner(txId, txManager, tables))
+ .addEnlistment(tableId, partitionId);
+ } finally {
+ leave();
+ }
+ }
+
+ /**
+ * Removes the transaction cleaner associated with the given transaction
ID.
+ *
+ * @param txId Transaction ID whose cleaner should be removed.
+ */
+ public void removeTxCleaner(UUID txId) throws
IgniteInternalCheckedException {
+ if (!busyLock.enterBusy()) {
+ // Can be called on disconnect. Removing from a closed registry is
not a problem.
+ return;
+ }
+
+ try {
+ txCleaners.remove(txId);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
/**
* Closes the registry and releases all resources.
*/
@@ -122,24 +173,35 @@ public class ClientResourceRegistry {
busyLock.block();
- IgniteInternalException ex = null;
+ AtomicReference<IgniteInternalException> ex = new AtomicReference<>();
+ var dejaVu = new HashSet<Throwable>();
- for (ClientResource r : res.values()) {
+ Consumer<Runnable> releaseSafe = r -> {
try {
- r.release();
- } catch (Exception e) {
- if (ex == null) {
- ex = new IgniteInternalException(e);
- } else {
- ex.addSuppressed(e);
+ r.run();
+ } catch (Throwable e) {
+ if (ex.get() == null) {
+ ex.set(new IgniteInternalException(e));
+ existingCauseOrSuppressed(e, dejaVu); // Seed dejaVu.
+ } else if (!existingCauseOrSuppressed(e, dejaVu)) {
+ ex.get().addSuppressed(e);
}
}
+ };
+
+ for (ClientResource r : res.values()) {
+ releaseSafe.accept(r::release);
}
res.clear();
- if (ex != null) {
- throw ex;
+ for (var cleaner : txCleaners.values()) {
+ // Don't block the thread, clean in background.
discardLocalWriteIntents swallows errors anyway.
+ releaseSafe.accept(() -> cleaner.clean(true));
+ }
+
+ if (ex.get() != null) {
+ throw ex.get();
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index 8d8851dd094..3a26a7afc55 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -62,6 +62,7 @@ import
org.apache.ignite.internal.tx.TransactionKilledException;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxPriority;
import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.impl.FullyQualifiedResourceId;
import org.apache.ignite.internal.type.DecimalNativeType;
import org.apache.ignite.internal.type.NativeType;
import org.apache.ignite.internal.type.TemporalNativeType;
@@ -544,6 +545,24 @@ public class ClientTableCommon {
MESSAGE_TX_ALREADY_FINISHED_DUE_TO_TIMEOUT + " [tx=" + remote + "].");
}
+ // Track this remote enlistment for cleanup if
client disconnects.
+ try {
+ resources.addTxCleaner(txId, tableId,
commitPart, txManager, (IgniteTablesInternal) tables);
+ } catch (IgniteInternalCheckedException e) {
+ // Client disconnected (resource registry
closed).
+ try {
+ remote.rollback();
+ } catch (Exception ex) {
+ e.addSuppressed(ex);
+ }
+
+ throw new IgniteException(e.traceId(),
e.code(), "Client disconnected, tx rolled back: " + remote, e);
+ }
+
+ // Stop tracking on tx finish.
+ txManager.resourceRegistry().register(
+ new FullyQualifiedResourceId(txId, txId),
txId, () -> () -> resources.removeTxCleaner(txId));
+
return remote;
});
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionDiscardRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionDiscardRequest.java
index dc823da38ae..e6ae2a61664 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionDiscardRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionDiscardRequest.java
@@ -17,22 +17,13 @@
package org.apache.ignite.client.handler.requests.tx;
-import static java.util.stream.Collectors.toList;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ResponseWriter;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
-import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.table.IgniteTablesInternal;
-import org.apache.ignite.internal.table.TableViewInternal;
-import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
/**
* Client transaction direct mapping discard request.
@@ -50,28 +41,17 @@ public class ClientTransactionDiscardRequest {
TxManager txManager,
IgniteTablesInternal igniteTables
) throws IgniteInternalCheckedException {
- Map<ZonePartitionId, PendingTxPartitionEnlistment> enlistedPartitions
= new HashMap<>();
-
UUID txId = in.unpackUuid();
-
int cnt = in.unpackInt(); // Number of direct enlistments.
+
+ var cleaner = new ClientTxPartitionEnlistmentCleaner(txId, txManager,
igniteTables);
+
for (int i = 0; i < cnt; i++) {
int tableId = in.unpackInt();
int partId = in.unpackInt();
-
- TableViewInternal table = igniteTables.cachedTable(tableId);
-
- if (table != null) {
- ZonePartitionId replicationGroupId =
table.internalTable().targetReplicationGroupId(partId);
- enlistedPartitions.computeIfAbsent(replicationGroupId, k ->
new PendingTxPartitionEnlistment("UNUSED", 0))
- .addTableId(tableId);
- }
+ cleaner.addEnlistment(tableId, partId);
}
- List<EnlistedPartitionGroup> enlistedPartitionGroups =
enlistedPartitions.entrySet().stream()
- .map(entry -> new EnlistedPartitionGroup(entry.getKey(),
entry.getValue().tableIds()))
- .collect(toList());
-
- return txManager.discardLocalWriteIntents(enlistedPartitionGroups,
txId).handle((res, err) -> null);
+ return cleaner.clean(false).handle((res, err) -> null);
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTxPartitionEnlistmentCleaner.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTxPartitionEnlistmentCleaner.java
new file mode 100644
index 00000000000..d7843e7814e
--- /dev/null
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTxPartitionEnlistmentCleaner.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.tx;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
+
+/**
+ * Helper class to clean up direct transaction enlistments on the server side
for a client connection.
+ */
+public class ClientTxPartitionEnlistmentCleaner {
+ private final UUID txId;
+
+ private final TxManager txManager;
+
+ private final IgniteTablesInternal igniteTables;
+
+ private final Map<ZonePartitionId, PendingTxPartitionEnlistment>
enlistedPartitions = new ConcurrentHashMap<>();
+
+ /**
+ * Creates a new instance of the transaction partition enlistment cleaner.
+ *
+ * @param txId Transaction ID.
+ * @param txManager Transaction manager.
+ * @param igniteTables Ignite tables.
+ */
+ public ClientTxPartitionEnlistmentCleaner(UUID txId, TxManager txManager,
IgniteTablesInternal igniteTables) {
+ this.txId = txId;
+ this.txManager = txManager;
+ this.igniteTables = igniteTables;
+ }
+
+ /**
+ * Adds a partition enlistment for the given table and partition.
+ *
+ * @param tableId Table ID.
+ * @param partId Partition ID.
+ */
+ public void addEnlistment(int tableId, int partId) {
+ TableViewInternal table = igniteTables.cachedTable(tableId);
+
+ if (table != null) {
+ ZonePartitionId replicationGroupId =
table.internalTable().targetReplicationGroupId(partId);
+ enlistedPartitions.computeIfAbsent(replicationGroupId, k -> new
PendingTxPartitionEnlistment("UNUSED", 0))
+ .addTableId(tableId);
+ }
+ }
+
+ /**
+ * Discards local write intents for all enlisted partitions.
+ *
+ * @param abortTx Whether to abort the transaction as well.
+ * @return Future that completes when cleanup is done.
+ */
+ public CompletableFuture<Void> clean(boolean abortTx) {
+ List<EnlistedPartitionGroup> enlistedPartitionGroups =
enlistedPartitions.entrySet().stream()
+ .map(entry -> new EnlistedPartitionGroup(entry.getKey(),
entry.getValue().tableIds()))
+ .collect(toList());
+
+ return txManager.discardLocalWriteIntents(enlistedPartitionGroups,
txId, abortTx);
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
index 6960cf781b1..fcceb3cfdb8 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.client;
+import static
org.apache.ignite.internal.util.ExceptionUtils.existingCauseOrSuppressed;
+
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
@@ -99,24 +101,6 @@ class ClientFutureUtils {
});
}
- private static boolean existingCauseOrSuppressed(Throwable t,
HashSet<Throwable> dejaVu) {
- if (t == null) {
- return false;
- }
-
- if (!dejaVu.add(t)) {
- return true;
- }
-
- for (Throwable sup : t.getSuppressed()) {
- if (existingCauseOrSuppressed(sup, dejaVu)) {
- return true;
- }
- }
-
- return existingCauseOrSuppressed(t.getCause(), dejaVu);
- }
-
static class RetryContext {
int attempt;
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index f5730c3d4f0..e910a47f1e7 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -297,8 +297,7 @@ public class ClientTransaction implements Transaction {
ClientChannel ch = reliableChannel.getNodeChannel(entry.getKey());
if (ch == null) {
- // Connection is lost, the transaction will be cleaned up by
other means.
- // TODO https://issues.apache.org/jira/browse/IGNITE-27651
+ // Connection is lost, the transaction will be cleaned up by
the server.
continue;
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DEVNOTES.md
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DEVNOTES.md
new file mode 100644
index 00000000000..01f00885273
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DEVNOTES.md
@@ -0,0 +1,60 @@
+# Client Direct Transaction Coordinator
+
+The "lightweight client tx coordinator" is an optimization that allows clients
to bypass the traditional coordinator-proxy pattern
+and communicate directly with partition owners.
+
+This requires existing active client connections to the partition owner nodes.
+
+## Two Operating Modes
+
+1. Proxy Mode (Traditional): Client → Coordinator → Partition Owners
+2. Direct Mapping Mode (Lightweight): Client → Partition Owners (directly)
+
+## Protocol
+
+1. Transaction Start (Piggybacked)
+- Request: TX_ID_FIRST_DIRECT = -1L embedded in first data operation (e.g.,
TUPLE_UPSERT)
+- Includes: Observable timestamp, read-only flag, timeout, options
+- Response: Transaction ID, UUID, coordinator ID, timeout
+- Location: ClientTableCommon#readTx
+
+2. Direct Operations
+- Request: TX_ID_DIRECT = 0L marker with enlistment metadata
+- Includes: Enlistment token, transaction UUID, commit table ID, commit
partition ID, coordinator UUID
+- Sent to: Partition owner directly (not coordinator)
+- Location: DirectTxUtils.writeTx
+
+3. Transaction Commit
+- Operation: ClientOp.TX_COMMIT = 44
+- Includes: Resource ID + list of all direct enlistments (tableId,
partitionId, consistentId, token)
+- Server merges client enlistments with server-side enlistments
+- Location: ClientTransactionCommitRequest.java
+
+4. Transaction Discard (Cleanup)
+- Operation: ClientOp.TX_DISCARD = 75
+- When: Transaction fails or rollback needed
+- Includes: Transaction UUID + list of (tableId, partitionId) pairs
+- Purpose: Clean up direct enlistments on partition owners
+- Location: ClientTransaction#sendDiscardRequests
+
+## Lifecycle
+
+1. Client creates lazy transaction (no network call)
+2. First operation triggers:
+ - DirectTxUtils.ensureStarted() checks if colocated
+ - DirectTxUtils.writeTx() writes TX_ID_FIRST_DIRECT (-1)
+ - Request sent directly to partition owner
+ - Server starts transaction and returns metadata
+ - DirectTxUtils.readTx() creates ClientTransaction
+
+3. Subsequent operations:
+ - DirectTxUtils.resolveChannel() picks partition owner
+ - ClientTransaction.enlistDirect() tracks enlistment
+ - DirectTxUtils.writeTx() writes TX_ID_DIRECT (0) + enlistment token
+ - Direct communication with partition owner
+
+4. Commit:
+ - Collects all enlistments from ClientTransaction.enlisted map
+ - Sends TX_COMMIT with enlistment metadata to coordinator
+ - Coordinator merges client + server enlistments
+ - Validates tokens and commits transaction
\ No newline at end of file
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 97183e24672..78f78cb2f2c 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
+import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.metrics.ResourceVacuumMetrics;
import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
import org.apache.ignite.tx.TransactionException;
@@ -286,7 +287,7 @@ public class FakeTxManager implements TxManager {
}
@Override
- public CompletableFuture<Void>
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId) {
+ public CompletableFuture<Void>
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId,
boolean abortTx) {
return nullCompletedFuture();
}
@@ -295,6 +296,11 @@ public class FakeTxManager implements TxManager {
return 0;
}
+ @Override
+ public RemotelyTriggeredResourceRegistry resourceRegistry() {
+ return null;
+ }
+
@Override
public int finished() {
return 0;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
index 0162113cda3..4969432747e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
@@ -455,6 +455,31 @@ public final class ExceptionUtils {
return false;
}
+ /**
+ * Checks if the given throwable is already present in the cause or
suppressed hierarchy of the given throwable.
+ *
+ * @param t Throwable.
+ * @param dejaVu Known exceptions.
+ * @return True if seen before, false otherwise.
+ */
+ public static boolean existingCauseOrSuppressed(@Nullable Throwable t,
Set<Throwable> dejaVu) {
+ if (t == null) {
+ return false;
+ }
+
+ if (!dejaVu.add(t)) {
+ return true;
+ }
+
+ for (Throwable sup : t.getSuppressed()) {
+ if (existingCauseOrSuppressed(sup, dejaVu)) {
+ return true;
+ }
+ }
+
+ return existingCauseOrSuppressed(t.getCause(), dejaVu);
+ }
+
/**
* Unwraps exception cause from wrappers like CompletionException and
ExecutionException.
*
diff --git
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcConnectionFailoverTest.java
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcConnectionFailoverTest.java
index f8eac10274f..6e4a2cd4e34 100644
---
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcConnectionFailoverTest.java
+++
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcConnectionFailoverTest.java
@@ -34,7 +34,6 @@ import
org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.jdbc.JdbcConnection;
import org.apache.ignite.internal.lang.RunnableX;
import org.awaitility.Awaitility;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -197,7 +196,6 @@ public class ItJdbcConnectionFailoverTest extends
ClusterPerTestIntegrationTest
* Ensures that the client receives a meaningful exception when the node
holding the client transaction goes down.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-27091")
void testTransactionCannotBeUsedAfterNodeRestart() throws SQLException {
int nodesCount = 3;
cluster.startAndInit(nodesCount, new int[]{2});
@@ -221,8 +219,7 @@ public class ItJdbcConnectionFailoverTest extends
ClusterPerTestIntegrationTest
cluster.startNode(1);
//noinspection ThrowableNotThrown
- assertThrowsSqlException("Transaction context has been lost
due to connection errors",
- () -> stmt.execute(dummyQuery));
+ assertThrowsSqlException("Transaction is already finished", ()
-> stmt.execute(dummyQuery));
}
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionCleanupTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionCleanupTest.java
new file mode 100644
index 00000000000..bbae8717cdb
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionCleanupTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app.client;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.runner.app.client.ItThinClientTransactionsTest.generateKeysForNode;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.client.table.ClientTable;
+import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
+import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.partition.Partition;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for client transaction cleanup on disconnect.
+ */
+@SuppressWarnings({"DataFlowIssue"})
+public class ItThinClientTransactionCleanupTest extends
ItAbstractThinClientTest {
+ /**
+ * Tests that locks are released when client disconnects with a
transaction having direct enlistments.
+ */
+ @Test
+ void testClientDisconnectReleasesTxLocksFast() {
+ try (IgniteClient client =
IgniteClient.builder().addresses(getClientAddresses().toArray(new
String[0])).build()) {
+ var table = (ClientTable) client.tables().table(TABLE_NAME);
+ Map<Partition, ClusterNode> map =
table.partitionDistribution().primaryReplicas();
+
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = unwrapIgniteImpl(server(1));
+
+ List<Tuple> tuples0 = generateKeysForNode(300, 1, map,
server0.cluster().localNode(), table);
+ List<Tuple> tuples1 = generateKeysForNode(310, 1, map,
server1.cluster().localNode(), table);
+
+ // Perform a mix of operations to trigger direct tx logic.
+ Map<Tuple, Tuple> data = new HashMap<>();
+
+ data.put(tuples0.get(0), val(tuples0.get(0).intValue(0) + ""));
+ data.put(tuples1.get(0), val(tuples1.get(0).intValue(0) + ""));
+
+ ClientLazyTransaction tx0 = (ClientLazyTransaction)
client.transactions().begin();
+
+ table.keyValueView().putAll(tx0, data);
+
+ for (Entry<Tuple, Tuple> entry : data.entrySet()) {
+ table.keyValueView().put(tx0, entry.getKey(),
entry.getValue());
+ }
+
+ assertThat(txLockCount(), greaterThanOrEqualTo(2));
+
+ // Disconnect without commit or rollback.
+ }
+
+ await().atMost(Duration.ofSeconds(2))
+ .untilAsserted(() -> assertEquals(0, txLockCount()));
+ }
+
+ private int txLockCount() {
+ int count = 0;
+
+ for (int i = 0; i < nodes(); i++) {
+ count += txLockCount(server(i));
+ }
+
+ return count;
+ }
+
+ private static int txLockCount(Ignite server) {
+ IgniteImpl ignite = unwrapIgniteImpl(server);
+ LockManager lockManager = ignite.txManager().lockManager();
+ return CollectionUtils.count(lockManager.locks());
+ }
+
+ private static Tuple val(String v) {
+ return Tuple.create().set(COLUMN_VAL, v);
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 0544d860d8e..c113153d880 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
+import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.metrics.ResourceVacuumMetrics;
import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
import org.jetbrains.annotations.Nullable;
@@ -287,8 +288,9 @@ public interface TxManager extends IgniteComponent {
*
* @param groups Groups.
* @param txId Transaction id.
+ * @param abortTx If {@code true}, the transaction will be aborted.
*/
- CompletableFuture<Void>
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId);
+ CompletableFuture<Void>
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId,
boolean abortTx);
/**
* Returns lock retry count.
@@ -297,6 +299,13 @@ public interface TxManager extends IgniteComponent {
*/
int lockRetryCount();
+ /**
+ * Returns the resource registry.
+ *
+ * @return Resource registry.
+ */
+ RemotelyTriggeredResourceRegistry resourceRegistry();
+
/**
* Returns a number of finished transactions.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
index 8b9e77f7c99..ed6263fac0a 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.tx.impl;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
import static org.apache.ignite.internal.tx.TransactionLogUtils.formatTxInfo;
@@ -25,7 +26,6 @@ import static org.apache.ignite.internal.tx.TxState.FINISHING;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
import static org.apache.ignite.internal.util.CompletableFutures.allOf;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
-import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
@@ -37,7 +37,6 @@ import java.util.function.Supplier;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -183,7 +182,7 @@ public class OrphanDetector {
// We can path the work to another thread without any condition,
because it is a very rare scenario in which the transaction
// coordinator left topology.
- partitionOperationsExecutor.execute(() ->
sendTxRecoveryMessage(txState.commitPartitionId(), txId));
+ partitionOperationsExecutor.execute(() ->
sendTxRecoveryMessageIgnoreErrors(txState.commitPartitionId(), txId));
}
// TODO: https://issues.apache.org/jira/browse/IGNITE-21153
@@ -197,28 +196,9 @@ public class OrphanDetector {
* @param cmpPartGrp Replication group of commit partition.
* @param txId Transaction id.
*/
- private void sendTxRecoveryMessage(ZonePartitionId cmpPartGrp, UUID txId) {
-
placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(cmpPartGrp)
- .thenCompose(replicaMeta -> {
- InternalClusterNode commitPartPrimaryNode =
- replicaMeta != null ?
topologyService.getByConsistentId(replicaMeta.getLeaseholder()) : null;
-
- if (commitPartPrimaryNode == null) {
- LOG.warn(
- "The primary replica of the commit partition
is not available [commitPartGrp={}, tx={}]",
- cmpPartGrp,
- txId
- );
-
- return nullCompletedFuture();
- }
-
- return replicaService.invoke(commitPartPrimaryNode,
TX_MESSAGES_FACTORY.txRecoveryMessage()
-
.groupId(toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, cmpPartGrp))
-
.enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
- .txId(txId)
- .build());
- }).exceptionally(throwable -> {
+ private void sendTxRecoveryMessageIgnoreErrors(ZonePartitionId cmpPartGrp,
UUID txId) {
+ sendTxRecoveryMessage(cmpPartGrp, txId)
+ .exceptionally(throwable -> {
if (throwable != null) {
LOG.warn("A recovery message for the transaction was
handled with the error {}.",
throwable, formatTxInfo(txId,
txLocalStateStorage));
@@ -228,6 +208,38 @@ public class OrphanDetector {
});
}
+ /**
+ * Sends transaction recovery message to commit partition for particular
transaction.
+ *
+ * @param txId Transaction id.
+ * @return Future.
+ */
+ CompletableFuture<Object> sendTxRecoveryMessage(UUID txId) {
+ TxStateMeta txState = txLocalStateStorage.state(txId);
+
+ if (txState == null || isFinalState(txState.txState())) {
+ return completedFuture(null);
+ }
+
+ return sendTxRecoveryMessage(txState.commitPartitionId(), txId);
+ }
+
+ /**
+ * Sends transaction recovery message to commit partition for particular
transaction.
+ *
+ * @param cmpPartGrp Replication group of commit partition.
+ * @param txId Transaction id.
+ * @return Future.
+ */
+ private CompletableFuture<Object> sendTxRecoveryMessage(ZonePartitionId
cmpPartGrp, UUID txId) {
+ return
placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(cmpPartGrp)
+ .thenCompose(replicaMeta ->
replicaService.invoke(replicaMeta.getLeaseholder(),
TX_MESSAGES_FACTORY.txRecoveryMessage()
+
.groupId(toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, cmpPartGrp))
+
.enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
+ .txId(txId)
+ .build()));
+ }
+
/**
* Performs a life check for the transaction coordinator.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 5bbee2749f1..be03cc8af9b 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -241,6 +241,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
private final ConcurrentLinkedQueue<CompletableFuture<?>> stopFuts = new
ConcurrentLinkedQueue<>();
+ private final RemotelyTriggeredResourceRegistry resourcesRegistry;
+
/**
* Test-only constructor.
*
@@ -366,6 +368,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
this.commonScheduler = commonScheduler;
this.failureProcessor = failureProcessor;
this.metricsManager = metricManager;
+ this.resourcesRegistry = resourcesRegistry;
placementDriverHelper = new PlacementDriverHelper(placementDriver,
clockService);
@@ -1195,12 +1198,19 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
}
@Override
- public CompletableFuture<Void>
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId) {
- return txCleanupRequestHandler.discardLocalWriteIntents(groups,
txId).handle((r, e) -> {
- // We don't need tx state any more.
- updateTxMeta(txId, old -> null);
- return null;
- });
+ public CompletableFuture<Void>
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId,
boolean abortTx) {
+ CompletableFuture<Object> f = nullCompletedFuture();
+
+ if (abortTx) {
+ f = orphanDetector.sendTxRecoveryMessage(txId);
+ }
+
+ return f.thenCompose(unused ->
txCleanupRequestHandler.discardLocalWriteIntents(groups, txId))
+ .handle((r, e) -> {
+ // We don't need tx state any more.
+ updateTxMeta(txId, old -> null);
+ return null;
+ });
}
@Override
@@ -1208,6 +1218,11 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
return lockRetryCount;
}
+ @Override
+ public RemotelyTriggeredResourceRegistry resourceRegistry() {
+ return resourcesRegistry;
+ }
+
@Override
public Executor writeIntentSwitchExecutor() {
return writeIntentSwitchPool;
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java
index 798d2fc6fe5..fd9e3f78106 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java
@@ -326,7 +326,7 @@ public class OrphanDetectorTest extends
BaseIgniteAbstractTest {
assertEquals(TxState.ABANDONED, orphanState.txState());
// Send tx recovery message.
- verify(replicaService).invoke(any(InternalClusterNode.class), any());
+ verify(replicaService).invoke(any(String.class), any());
assertThat(acquire, willThrow(LockException.class, "Failed to acquire
the abandoned lock due to a possible deadlock"));