This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3eb50900656 IGNITE-27651 Clean up direct tx on client disconnect 
(#7779)
3eb50900656 is described below

commit 3eb50900656064276d76fc346d4e90ce7765e892
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Mar 19 13:55:50 2026 +0100

    IGNITE-27651 Clean up direct tx on client disconnect (#7779)
    
    Store remote enlistments in client resource registry and clean up on 
disconnect.
---
 .../client/handler/ClientResourceRegistry.java     |  82 ++++++++++++++--
 .../handler/requests/table/ClientTableCommon.java  |  19 ++++
 .../tx/ClientTransactionDiscardRequest.java        |  30 +-----
 .../tx/ClientTxPartitionEnlistmentCleaner.java     |  88 +++++++++++++++++
 .../ignite/internal/client/ClientFutureUtils.java  |  20 +---
 .../internal/client/tx/ClientTransaction.java      |   3 +-
 .../apache/ignite/internal/client/tx/DEVNOTES.md   |  60 ++++++++++++
 .../apache/ignite/client/fakes/FakeTxManager.java  |   8 +-
 .../ignite/internal/util/ExceptionUtils.java       |  25 +++++
 .../ignite/jdbc/ItJdbcConnectionFailoverTest.java  |   5 +-
 .../client/ItThinClientTransactionCleanupTest.java | 106 +++++++++++++++++++++
 .../org/apache/ignite/internal/tx/TxManager.java   |  11 ++-
 .../ignite/internal/tx/impl/OrphanDetector.java    |  62 +++++++-----
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  27 ++++--
 .../internal/tx/impl/OrphanDetectorTest.java       |   2 +-
 15 files changed, 455 insertions(+), 93 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
index 8ed619b3072..5ef694506e7 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
@@ -17,12 +17,21 @@
 
 package org.apache.ignite.client.handler;
 
+import static 
org.apache.ignite.internal.util.ExceptionUtils.existingCauseOrSuppressed;
+
+import java.util.HashSet;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import 
org.apache.ignite.client.handler.requests.tx.ClientTxPartitionEnlistmentCleaner;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.ErrorGroups.Client;
 import org.apache.ignite.lang.IgniteException;
@@ -49,6 +58,8 @@ public class ClientResourceRegistry {
 
     private final AtomicBoolean stopGuard = new AtomicBoolean();
 
+    private final ConcurrentHashMap<UUID, ClientTxPartitionEnlistmentCleaner> 
txCleaners = new ConcurrentHashMap<>();
+
     /**
      * Stores the resource and returns the generated id.
      *
@@ -112,6 +123,46 @@ public class ClientResourceRegistry {
         }
     }
 
+    /**
+     * Records that a remote transaction enlisted a partition on this node.
+     *
+     * @param txId Transaction ID.
+     * @param tableId Table ID.
+     * @param partitionId Partition ID.
+     * @param txManager Transaction manager responsible for coordinating and 
cleaning up the transaction.
+     * @param tables Tables facade used to resolve table information for 
enlisted partitions.
+     */
+    public void addTxCleaner(UUID txId, int tableId, int partitionId, 
TxManager txManager, IgniteTablesInternal tables)
+            throws IgniteInternalCheckedException {
+        enter();
+
+        try {
+            txCleaners
+                    .computeIfAbsent(txId, k -> new 
ClientTxPartitionEnlistmentCleaner(txId, txManager, tables))
+                    .addEnlistment(tableId, partitionId);
+        } finally {
+            leave();
+        }
+    }
+
+    /**
+     * Removes the transaction cleaner associated with the given transaction 
ID.
+     *
+     * @param txId Transaction ID whose cleaner should be removed.
+     */
+    public void removeTxCleaner(UUID txId) throws 
IgniteInternalCheckedException {
+        if (!busyLock.enterBusy()) {
+            // Can be called on disconnect. Removing from a closed registry is 
not a problem.
+            return;
+        }
+
+        try {
+            txCleaners.remove(txId);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
     /**
      * Closes the registry and releases all resources.
      */
@@ -122,24 +173,35 @@ public class ClientResourceRegistry {
 
         busyLock.block();
 
-        IgniteInternalException ex = null;
+        AtomicReference<IgniteInternalException> ex = new AtomicReference<>();
+        var dejaVu = new HashSet<Throwable>();
 
-        for (ClientResource r : res.values()) {
+        Consumer<Runnable> releaseSafe = r -> {
             try {
-                r.release();
-            } catch (Exception e) {
-                if (ex == null) {
-                    ex = new IgniteInternalException(e);
-                } else {
-                    ex.addSuppressed(e);
+                r.run();
+            } catch (Throwable e) {
+                if (ex.get() == null) {
+                    ex.set(new IgniteInternalException(e));
+                    existingCauseOrSuppressed(e, dejaVu); // Seed dejaVu.
+                } else if (!existingCauseOrSuppressed(e, dejaVu)) {
+                    ex.get().addSuppressed(e);
                 }
             }
+        };
+
+        for (ClientResource r : res.values()) {
+            releaseSafe.accept(r::release);
         }
 
         res.clear();
 
-        if (ex != null) {
-            throw ex;
+        for (var cleaner : txCleaners.values()) {
+            // Don't block the thread, clean in background. 
discardLocalWriteIntents swallows errors anyway.
+            releaseSafe.accept(() -> cleaner.clean(true));
+        }
+
+        if (ex.get() != null) {
+            throw ex.get();
         }
     }
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index 8d8851dd094..3a26a7afc55 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -62,6 +62,7 @@ import 
org.apache.ignite.internal.tx.TransactionKilledException;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxPriority;
 import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.impl.FullyQualifiedResourceId;
 import org.apache.ignite.internal.type.DecimalNativeType;
 import org.apache.ignite.internal.type.NativeType;
 import org.apache.ignite.internal.type.TemporalNativeType;
@@ -544,6 +545,24 @@ public class ClientTableCommon {
                                         
MESSAGE_TX_ALREADY_FINISHED_DUE_TO_TIMEOUT + " [tx=" + remote + "].");
                             }
 
+                            // Track this remote enlistment for cleanup if 
client disconnects.
+                            try {
+                                resources.addTxCleaner(txId, tableId, 
commitPart, txManager, (IgniteTablesInternal) tables);
+                            } catch (IgniteInternalCheckedException e) {
+                                // Client disconnected (resource registry 
closed).
+                                try {
+                                    remote.rollback();
+                                } catch (Exception ex) {
+                                    e.addSuppressed(ex);
+                                }
+
+                                throw new IgniteException(e.traceId(), 
e.code(), "Client disconnected, tx rolled back: " + remote, e);
+                            }
+
+                            // Stop tracking on tx finish.
+                            txManager.resourceRegistry().register(
+                                    new FullyQualifiedResourceId(txId, txId), 
txId, () -> () -> resources.removeTxCleaner(txId));
+
                             return remote;
                         });
             }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionDiscardRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionDiscardRequest.java
index dc823da38ae..e6ae2a61664 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionDiscardRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionDiscardRequest.java
@@ -17,22 +17,13 @@
 
 package org.apache.ignite.client.handler.requests.tx;
 
-import static java.util.stream.Collectors.toList;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ResponseWriter;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
-import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
-import org.apache.ignite.internal.table.TableViewInternal;
-import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
 import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
 
 /**
  * Client transaction direct mapping discard request.
@@ -50,28 +41,17 @@ public class ClientTransactionDiscardRequest {
             TxManager txManager,
             IgniteTablesInternal igniteTables
     ) throws IgniteInternalCheckedException {
-        Map<ZonePartitionId, PendingTxPartitionEnlistment> enlistedPartitions 
= new HashMap<>();
-
         UUID txId = in.unpackUuid();
-
         int cnt = in.unpackInt(); // Number of direct enlistments.
+
+        var cleaner = new ClientTxPartitionEnlistmentCleaner(txId, txManager, 
igniteTables);
+
         for (int i = 0; i < cnt; i++) {
             int tableId = in.unpackInt();
             int partId = in.unpackInt();
-
-            TableViewInternal table = igniteTables.cachedTable(tableId);
-
-            if (table != null) {
-                ZonePartitionId replicationGroupId = 
table.internalTable().targetReplicationGroupId(partId);
-                enlistedPartitions.computeIfAbsent(replicationGroupId, k -> 
new PendingTxPartitionEnlistment("UNUSED", 0))
-                        .addTableId(tableId);
-            }
+            cleaner.addEnlistment(tableId, partId);
         }
 
-        List<EnlistedPartitionGroup> enlistedPartitionGroups = 
enlistedPartitions.entrySet().stream()
-                .map(entry -> new EnlistedPartitionGroup(entry.getKey(), 
entry.getValue().tableIds()))
-                .collect(toList());
-
-        return txManager.discardLocalWriteIntents(enlistedPartitionGroups, 
txId).handle((res, err) -> null);
+        return cleaner.clean(false).handle((res, err) -> null);
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTxPartitionEnlistmentCleaner.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTxPartitionEnlistmentCleaner.java
new file mode 100644
index 00000000000..d7843e7814e
--- /dev/null
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTxPartitionEnlistmentCleaner.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.tx;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
+
+/**
+ * Helper class to clean up direct transaction enlistments on the server side 
for a client connection.
+ */
+public class ClientTxPartitionEnlistmentCleaner {
+    private final UUID txId;
+
+    private final TxManager txManager;
+
+    private final IgniteTablesInternal igniteTables;
+
+    private final Map<ZonePartitionId, PendingTxPartitionEnlistment> 
enlistedPartitions = new ConcurrentHashMap<>();
+
+    /**
+     * Creates a new instance of the transaction partition enlistment cleaner.
+     *
+     * @param txId Transaction ID.
+     * @param txManager Transaction manager.
+     * @param igniteTables Ignite tables.
+     */
+    public ClientTxPartitionEnlistmentCleaner(UUID txId, TxManager txManager, 
IgniteTablesInternal igniteTables) {
+        this.txId = txId;
+        this.txManager = txManager;
+        this.igniteTables = igniteTables;
+    }
+
+    /**
+     * Adds a partition enlistment for the given table and partition.
+     *
+     * @param tableId Table ID.
+     * @param partId Partition ID.
+     */
+    public void addEnlistment(int tableId, int partId) {
+        TableViewInternal table = igniteTables.cachedTable(tableId);
+
+        if (table != null) {
+            ZonePartitionId replicationGroupId = 
table.internalTable().targetReplicationGroupId(partId);
+            enlistedPartitions.computeIfAbsent(replicationGroupId, k -> new 
PendingTxPartitionEnlistment("UNUSED", 0))
+                    .addTableId(tableId);
+        }
+    }
+
+    /**
+     * Discards local write intents for all enlisted partitions.
+     *
+     * @param abortTx Whether to abort the transaction as well.
+     * @return Future that completes when cleanup is done.
+     */
+    public CompletableFuture<Void> clean(boolean abortTx) {
+        List<EnlistedPartitionGroup> enlistedPartitionGroups = 
enlistedPartitions.entrySet().stream()
+                .map(entry -> new EnlistedPartitionGroup(entry.getKey(), 
entry.getValue().tableIds()))
+                .collect(toList());
+
+        return txManager.discardLocalWriteIntents(enlistedPartitionGroups, 
txId, abortTx);
+    }
+}
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
index 6960cf781b1..fcceb3cfdb8 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.client;
 
+import static 
org.apache.ignite.internal.util.ExceptionUtils.existingCauseOrSuppressed;
+
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.concurrent.CompletableFuture;
@@ -99,24 +101,6 @@ class ClientFutureUtils {
         });
     }
 
-    private static boolean existingCauseOrSuppressed(Throwable t, 
HashSet<Throwable> dejaVu) {
-        if (t == null) {
-            return false;
-        }
-
-        if (!dejaVu.add(t)) {
-            return true;
-        }
-
-        for (Throwable sup : t.getSuppressed()) {
-            if (existingCauseOrSuppressed(sup, dejaVu)) {
-                return true;
-            }
-        }
-
-        return existingCauseOrSuppressed(t.getCause(), dejaVu);
-    }
-
     static class RetryContext {
         int attempt;
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index f5730c3d4f0..e910a47f1e7 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -297,8 +297,7 @@ public class ClientTransaction implements Transaction {
             ClientChannel ch = reliableChannel.getNodeChannel(entry.getKey());
 
             if (ch == null) {
-                // Connection is lost, the transaction will be cleaned up by 
other means.
-                // TODO https://issues.apache.org/jira/browse/IGNITE-27651
+                // Connection is lost, the transaction will be cleaned up by 
the server.
                 continue;
             }
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DEVNOTES.md 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DEVNOTES.md
new file mode 100644
index 00000000000..01f00885273
--- /dev/null
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DEVNOTES.md
@@ -0,0 +1,60 @@
+# Client Direct Transaction Coordinator
+
+The "lightweight client tx coordinator" is an optimization that allows clients 
to bypass the traditional coordinator-proxy pattern 
+and communicate directly with partition owners.
+
+This requires existing active client connections to the partition owner nodes.
+
+## Two Operating Modes
+
+1. Proxy Mode (Traditional): Client → Coordinator → Partition Owners
+2. Direct Mapping Mode (Lightweight): Client → Partition Owners (directly)
+
+## Protocol
+
+1. Transaction Start (Piggybacked)
+- Request: TX_ID_FIRST_DIRECT = -1L embedded in first data operation (e.g., 
TUPLE_UPSERT)
+- Includes: Observable timestamp, read-only flag, timeout, options
+- Response: Transaction ID, UUID, coordinator ID, timeout
+- Location: ClientTableCommon#readTx
+
+2. Direct Operations
+- Request: TX_ID_DIRECT = 0L marker with enlistment metadata
+- Includes: Enlistment token, transaction UUID, commit table ID, commit 
partition ID, coordinator UUID
+- Sent to: Partition owner directly (not coordinator)
+- Location: DirectTxUtils.writeTx
+
+3. Transaction Commit
+- Operation: ClientOp.TX_COMMIT = 44
+- Includes: Resource ID + list of all direct enlistments (tableId, 
partitionId, consistentId, token)
+- Server merges client enlistments with server-side enlistments
+- Location: ClientTransactionCommitRequest.java
+
+4. Transaction Discard (Cleanup)
+- Operation: ClientOp.TX_DISCARD = 75
+- When: Transaction fails or rollback needed
+- Includes: Transaction UUID + list of (tableId, partitionId) pairs
+- Purpose: Clean up direct enlistments on partition owners
+- Location: ClientTransaction#sendDiscardRequests
+
+## Lifecycle
+
+1. Client creates lazy transaction (no network call)
+2. First operation triggers:
+    - DirectTxUtils.ensureStarted() checks if colocated
+    - DirectTxUtils.writeTx() writes TX_ID_FIRST_DIRECT (-1)
+    - Request sent directly to partition owner
+    - Server starts transaction and returns metadata
+    - DirectTxUtils.readTx() creates ClientTransaction
+
+3. Subsequent operations:
+    - DirectTxUtils.resolveChannel() picks partition owner
+    - ClientTransaction.enlistDirect() tracks enlistment
+    - DirectTxUtils.writeTx() writes TX_ID_DIRECT (0) + enlistment token
+    - Direct communication with partition owner
+
+4. Commit:
+    - Collects all enlistments from ClientTransaction.enlisted map
+    - Sends TX_COMMIT with enlistment metadata to coordinator
+    - Coordinator merges client + server enlistments
+    - Validates tokens and commits transaction
\ No newline at end of file
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 97183e24672..78f78cb2f2c 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
+import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.metrics.ResourceVacuumMetrics;
 import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
 import org.apache.ignite.tx.TransactionException;
@@ -286,7 +287,7 @@ public class FakeTxManager implements TxManager {
     }
 
     @Override
-    public CompletableFuture<Void> 
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId) {
+    public CompletableFuture<Void> 
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId, 
boolean abortTx) {
         return nullCompletedFuture();
     }
 
@@ -295,6 +296,11 @@ public class FakeTxManager implements TxManager {
         return 0;
     }
 
+    @Override
+    public RemotelyTriggeredResourceRegistry resourceRegistry() {
+        return null;
+    }
+
     @Override
     public int finished() {
         return 0;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
index 0162113cda3..4969432747e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
@@ -455,6 +455,31 @@ public final class ExceptionUtils {
         return false;
     }
 
+    /**
+     * Checks if the given throwable is already present in the cause or 
suppressed hierarchy of the given throwable.
+     *
+     * @param t Throwable.
+     * @param dejaVu Known exceptions.
+     * @return True if seen before, false otherwise.
+     */
+    public static boolean existingCauseOrSuppressed(@Nullable Throwable t, 
Set<Throwable> dejaVu) {
+        if (t == null) {
+            return false;
+        }
+
+        if (!dejaVu.add(t)) {
+            return true;
+        }
+
+        for (Throwable sup : t.getSuppressed()) {
+            if (existingCauseOrSuppressed(sup, dejaVu)) {
+                return true;
+            }
+        }
+
+        return existingCauseOrSuppressed(t.getCause(), dejaVu);
+    }
+
     /**
      * Unwraps exception cause from wrappers like CompletionException and 
ExecutionException.
      *
diff --git 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcConnectionFailoverTest.java
 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcConnectionFailoverTest.java
index f8eac10274f..6e4a2cd4e34 100644
--- 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcConnectionFailoverTest.java
+++ 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcConnectionFailoverTest.java
@@ -34,7 +34,6 @@ import 
org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.jdbc.JdbcConnection;
 import org.apache.ignite.internal.lang.RunnableX;
 import org.awaitility.Awaitility;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -197,7 +196,6 @@ public class ItJdbcConnectionFailoverTest extends 
ClusterPerTestIntegrationTest
      * Ensures that the client receives a meaningful exception when the node 
holding the client transaction goes down.
      */
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-27091";)
     void testTransactionCannotBeUsedAfterNodeRestart() throws SQLException {
         int nodesCount = 3;
         cluster.startAndInit(nodesCount, new int[]{2});
@@ -221,8 +219,7 @@ public class ItJdbcConnectionFailoverTest extends 
ClusterPerTestIntegrationTest
                 cluster.startNode(1);
 
                 //noinspection ThrowableNotThrown
-                assertThrowsSqlException("Transaction context has been lost 
due to connection errors",
-                        () -> stmt.execute(dummyQuery));
+                assertThrowsSqlException("Transaction is already finished", () 
-> stmt.execute(dummyQuery));
             }
         }
     }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionCleanupTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionCleanupTest.java
new file mode 100644
index 00000000000..bbae8717cdb
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionCleanupTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app.client;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.runner.app.client.ItThinClientTransactionsTest.generateKeysForNode;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.client.table.ClientTable;
+import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
+import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.partition.Partition;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for client transaction cleanup on disconnect.
+ */
+@SuppressWarnings({"DataFlowIssue"})
+public class ItThinClientTransactionCleanupTest extends 
ItAbstractThinClientTest {
+    /**
+     * Tests that locks are released when client disconnects with a 
transaction having direct enlistments.
+     */
+    @Test
+    void testClientDisconnectReleasesTxLocksFast() {
+        try (IgniteClient client = 
IgniteClient.builder().addresses(getClientAddresses().toArray(new 
String[0])).build()) {
+            var table = (ClientTable) client.tables().table(TABLE_NAME);
+            Map<Partition, ClusterNode> map = 
table.partitionDistribution().primaryReplicas();
+
+            IgniteImpl server0 = unwrapIgniteImpl(server(0));
+            IgniteImpl server1 = unwrapIgniteImpl(server(1));
+
+            List<Tuple> tuples0 = generateKeysForNode(300, 1, map, 
server0.cluster().localNode(), table);
+            List<Tuple> tuples1 = generateKeysForNode(310, 1, map, 
server1.cluster().localNode(), table);
+
+            // Perform a mix of operations to trigger direct tx logic.
+            Map<Tuple, Tuple> data = new HashMap<>();
+
+            data.put(tuples0.get(0), val(tuples0.get(0).intValue(0) + ""));
+            data.put(tuples1.get(0), val(tuples1.get(0).intValue(0) + ""));
+
+            ClientLazyTransaction tx0 = (ClientLazyTransaction) 
client.transactions().begin();
+
+            table.keyValueView().putAll(tx0, data);
+
+            for (Entry<Tuple, Tuple> entry : data.entrySet()) {
+                table.keyValueView().put(tx0, entry.getKey(), 
entry.getValue());
+            }
+
+            assertThat(txLockCount(), greaterThanOrEqualTo(2));
+
+            // Disconnect without commit or rollback.
+        }
+
+        await().atMost(Duration.ofSeconds(2))
+                .untilAsserted(() -> assertEquals(0, txLockCount()));
+    }
+
+    private int txLockCount() {
+        int count = 0;
+
+        for (int i = 0; i < nodes(); i++) {
+            count += txLockCount(server(i));
+        }
+
+        return count;
+    }
+
+    private static int txLockCount(Ignite server) {
+        IgniteImpl ignite = unwrapIgniteImpl(server);
+        LockManager lockManager = ignite.txManager().lockManager();
+        return CollectionUtils.count(lockManager.locks());
+    }
+
+    private static Tuple val(String v) {
+        return Tuple.create().set(COLUMN_VAL, v);
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 0544d860d8e..c113153d880 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
+import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.metrics.ResourceVacuumMetrics;
 import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
 import org.jetbrains.annotations.Nullable;
@@ -287,8 +288,9 @@ public interface TxManager extends IgniteComponent {
      *
      * @param groups Groups.
      * @param txId Transaction id.
+     * @param abortTx If {@code true}, the transaction will be aborted.
      */
-    CompletableFuture<Void> 
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId);
+    CompletableFuture<Void> 
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId, 
boolean abortTx);
 
     /**
      * Returns lock retry count.
@@ -297,6 +299,13 @@ public interface TxManager extends IgniteComponent {
      */
     int lockRetryCount();
 
+    /**
+     * Returns the resource registry.
+     *
+     * @return Resource registry.
+     */
+    RemotelyTriggeredResourceRegistry resourceRegistry();
+
     /**
      * Returns a number of finished transactions.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
index 8b9e77f7c99..ed6263fac0a 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/OrphanDetector.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.tx.impl;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
 import static org.apache.ignite.internal.tx.TransactionLogUtils.formatTxInfo;
@@ -25,7 +26,6 @@ import static org.apache.ignite.internal.tx.TxState.FINISHING;
 import static org.apache.ignite.internal.tx.TxState.isFinalState;
 import static org.apache.ignite.internal.util.CompletableFutures.allOf;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
 import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
 
@@ -37,7 +37,6 @@ import java.util.function.Supplier;
 import org.apache.ignite.internal.event.EventListener;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -183,7 +182,7 @@ public class OrphanDetector {
 
             // We can path the work to another thread without any condition, 
because it is a very rare scenario in which the transaction
             // coordinator left topology.
-            partitionOperationsExecutor.execute(() -> 
sendTxRecoveryMessage(txState.commitPartitionId(), txId));
+            partitionOperationsExecutor.execute(() -> 
sendTxRecoveryMessageIgnoreErrors(txState.commitPartitionId(), txId));
         }
 
         // TODO: https://issues.apache.org/jira/browse/IGNITE-21153
@@ -197,28 +196,9 @@ public class OrphanDetector {
      * @param cmpPartGrp Replication group of commit partition.
      * @param txId Transaction id.
      */
-    private void sendTxRecoveryMessage(ZonePartitionId cmpPartGrp, UUID txId) {
-        
placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(cmpPartGrp)
-                .thenCompose(replicaMeta -> {
-                    InternalClusterNode commitPartPrimaryNode =
-                            replicaMeta != null ? 
topologyService.getByConsistentId(replicaMeta.getLeaseholder()) : null;
-
-                    if (commitPartPrimaryNode == null) {
-                        LOG.warn(
-                                "The primary replica of the commit partition 
is not available [commitPartGrp={}, tx={}]",
-                                cmpPartGrp,
-                                txId
-                        );
-
-                        return nullCompletedFuture();
-                    }
-
-                    return replicaService.invoke(commitPartPrimaryNode, 
TX_MESSAGES_FACTORY.txRecoveryMessage()
-                            
.groupId(toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, cmpPartGrp))
-                            
.enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
-                            .txId(txId)
-                            .build());
-                }).exceptionally(throwable -> {
+    private void sendTxRecoveryMessageIgnoreErrors(ZonePartitionId cmpPartGrp, 
UUID txId) {
+        sendTxRecoveryMessage(cmpPartGrp, txId)
+                .exceptionally(throwable -> {
                     if (throwable != null) {
                         LOG.warn("A recovery message for the transaction was 
handled with the error {}.",
                                 throwable, formatTxInfo(txId, 
txLocalStateStorage));
@@ -228,6 +208,38 @@ public class OrphanDetector {
                 });
     }
 
+    /**
+     * Sends transaction recovery message to commit partition for particular 
transaction.
+     *
+     * @param txId Transaction id.
+     * @return Future.
+     */
+    CompletableFuture<Object> sendTxRecoveryMessage(UUID txId) {
+        TxStateMeta txState = txLocalStateStorage.state(txId);
+
+        if (txState == null || isFinalState(txState.txState())) {
+            return completedFuture(null);
+        }
+
+        return sendTxRecoveryMessage(txState.commitPartitionId(), txId);
+    }
+
+    /**
+     * Sends transaction recovery message to commit partition for particular 
transaction.
+     *
+     * @param cmpPartGrp Replication group of commit partition.
+     * @param txId Transaction id.
+     * @return Future.
+     */
+    private CompletableFuture<Object> sendTxRecoveryMessage(ZonePartitionId 
cmpPartGrp, UUID txId) {
+        return 
placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(cmpPartGrp)
+                .thenCompose(replicaMeta -> 
replicaService.invoke(replicaMeta.getLeaseholder(), 
TX_MESSAGES_FACTORY.txRecoveryMessage()
+                        
.groupId(toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, cmpPartGrp))
+                        
.enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
+                        .txId(txId)
+                        .build()));
+    }
+
     /**
      * Performs a life check for the transaction coordinator.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 5bbee2749f1..be03cc8af9b 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -241,6 +241,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
 
     private final ConcurrentLinkedQueue<CompletableFuture<?>> stopFuts = new 
ConcurrentLinkedQueue<>();
 
+    private final RemotelyTriggeredResourceRegistry resourcesRegistry;
+
     /**
      * Test-only constructor.
      *
@@ -366,6 +368,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
         this.commonScheduler = commonScheduler;
         this.failureProcessor = failureProcessor;
         this.metricsManager = metricManager;
+        this.resourcesRegistry = resourcesRegistry;
 
         placementDriverHelper = new PlacementDriverHelper(placementDriver, 
clockService);
 
@@ -1195,12 +1198,19 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
     }
 
     @Override
-    public CompletableFuture<Void> 
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId) {
-        return txCleanupRequestHandler.discardLocalWriteIntents(groups, 
txId).handle((r, e) -> {
-            // We don't need tx state any more.
-            updateTxMeta(txId, old -> null);
-            return null;
-        });
+    public CompletableFuture<Void> 
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId, 
boolean abortTx) {
+        CompletableFuture<Object> f = nullCompletedFuture();
+
+        if (abortTx) {
+            f = orphanDetector.sendTxRecoveryMessage(txId);
+        }
+
+        return f.thenCompose(unused -> 
txCleanupRequestHandler.discardLocalWriteIntents(groups, txId))
+                .handle((r, e) -> {
+                    // We don't need tx state any more.
+                    updateTxMeta(txId, old -> null);
+                    return null;
+                });
     }
 
     @Override
@@ -1208,6 +1218,11 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
         return lockRetryCount;
     }
 
+    @Override
+    public RemotelyTriggeredResourceRegistry resourceRegistry() {
+        return resourcesRegistry;
+    }
+
     @Override
     public Executor writeIntentSwitchExecutor() {
         return writeIntentSwitchPool;
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java
index 798d2fc6fe5..fd9e3f78106 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/OrphanDetectorTest.java
@@ -326,7 +326,7 @@ public class OrphanDetectorTest extends 
BaseIgniteAbstractTest {
         assertEquals(TxState.ABANDONED, orphanState.txState());
 
         // Send tx recovery message.
-        verify(replicaService).invoke(any(InternalClusterNode.class), any());
+        verify(replicaService).invoke(any(String.class), any());
 
         assertThat(acquire, willThrow(LockException.class, "Failed to acquire 
the abandoned lock due to a possible deadlock"));
 


Reply via email to