This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2699a72bcbe IGNITE-27089 Fix handling for delayed ack failures
2699a72bcbe is described below

commit 2699a72bcbe6b2cb2a10b3fdb8b63ec35ef90c63
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Fri Nov 28 14:29:49 2025 +0300

    IGNITE-27089 Fix handling for delayed ack failures
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   3 +
 .../internal/client/proto/ErrorExtensions.java     |   2 +
 .../handler/ClientInboundMessageHandler.java       |   6 +
 .../internal/client/ClientDelayedAckException.java |  58 +++
 .../ignite/internal/client/TcpClientChannel.java   |  29 +-
 .../internal/client/table/ClientTableMapUtils.java |  11 +-
 .../internal/client/tx/ClientTransaction.java      |  25 +-
 modules/platforms/cpp/ignite/common/error_codes.h  |   1 +
 modules/platforms/cpp/ignite/odbc/common_types.cpp |   1 +
 .../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs |   3 +
 .../ignite/internal/replicator/ReplicaManager.java |  19 +-
 .../internal/client/ItClientDirectMappingTest.java |  17 -
 .../app/client/ItAbstractThinClientTest.java       |  14 +-
 .../app/client/ItThinClientTransactionsTest.java   |  44 +--
 ...ClientTransactionsWithBrokenReplicatorTest.java | 389 +++++++++++++++++++++
 .../replicator/PartitionReplicaListener.java       |  18 +-
 .../ignite/internal/tx/DelayedAckException.java    |  52 +++
 .../tx/TransactionExceptionMapperProvider.java     |   2 +
 .../internal/tx/impl/TransactionInflights.java     |  64 +++-
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  16 +-
 20 files changed, 706 insertions(+), 68 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 9bd5cd73357..d62069fc500 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -453,6 +453,9 @@ public class ErrorGroups {
 
         /** Operation failed because the transaction is already finished with 
timeout. */
         public static final int TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR = 
TX_ERR_GROUP.registerErrorCode((short) 16);
+
+        /** Operation failed due to replication delayed ack failure. */
+        public static final int TX_DELAYED_ACK_ERR = 
TX_ERR_GROUP.registerErrorCode((short) 17);
     }
 
     /** Replicator error group. */
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
index 3e4229ec18f..c7ce644f608 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
@@ -24,4 +24,6 @@ public class ErrorExtensions {
     public static final String EXPECTED_SCHEMA_VERSION = "expected-schema-ver";
 
     public static final String SQL_UPDATE_COUNTERS = "sql-update-counters";
+
+    public static final String DELAYED_ACK = "delayed-ack";
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 01dea112d25..0432a9934c7 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -166,6 +166,7 @@ import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
+import org.apache.ignite.internal.tx.DelayedAckException;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.CancelHandle;
@@ -686,6 +687,7 @@ public class ClientInboundMessageHandler
     private void writeErrorCore(Throwable err, ClientMessagePacker packer) {
         SchemaVersionMismatchException schemaVersionMismatchException = 
findException(err, SchemaVersionMismatchException.class);
         SqlBatchException sqlBatchException = findException(err, 
SqlBatchException.class);
+        DelayedAckException delayedAckException = findException(err, 
DelayedAckException.class);
 
         err = firstNotNull(
                 schemaVersionMismatchException,
@@ -727,6 +729,10 @@ public class ClientInboundMessageHandler
             packer.packInt(1); // 1 extension.
             packer.packString(ErrorExtensions.SQL_UPDATE_COUNTERS);
             packer.packLongArray(sqlBatchException.updateCounters());
+        } else if (delayedAckException != null) {
+            packer.packInt(1); // 1 extension.
+            packer.packString(ErrorExtensions.DELAYED_ACK);
+            packer.packUuid(delayedAckException.txId());
         } else {
             packer.packNil(); // No extensions.
         }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
new file mode 100644
index 00000000000..ef9e06513b5
--- /dev/null
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.util.UUID;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Holds the transaction id and the cause for delayed replication ack failure.
+ */
+public class ClientDelayedAckException extends IgniteInternalException {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Transaction id. */
+    private final UUID txId;
+
+    /**
+     * Constructor.
+     *
+     * @param traceId Trace ID.
+     * @param code Error code.
+     *
+     * @param message String message.
+     * @param txId Related transaction id.
+     * @param cause Cause.
+     */
+    ClientDelayedAckException(UUID traceId, int code, @Nullable String 
message, UUID txId, @Nullable Throwable cause) {
+        super(traceId, code, message, cause);
+
+        this.txId = txId;
+    }
+
+    /**
+     * Gets expected schema version.
+     *
+     * @return Expected schema version.
+     */
+    public UUID txId() {
+        return txId;
+    }
+}
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 2eae304895e..1d86556a9b3 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -75,6 +75,7 @@ import org.apache.ignite.lang.ErrorGroups.Table;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.sql.SqlBatchException;
+import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -590,6 +591,17 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
 
         if (handler == null) {
             // Default notification handler. Used to deliver delayed 
replication acks.
+            if (err != null) {
+                if (err instanceof ClientDelayedAckException) {
+                    ClientDelayedAckException err0 = 
(ClientDelayedAckException) err;
+
+                    inflights.removeInflight(err0.txId(), new 
TransactionException(err0.code(), err0.getMessage(), err0.getCause()));
+                }
+
+                // Can't do anything to remove stuck inflight.
+                return;
+            }
+
             UUID txId = unpacker.unpackUuid();
             inflights.removeInflight(txId, err);
 
@@ -617,6 +629,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         int extSize = unpacker.tryUnpackNil() ? 0 : unpacker.unpackInt();
         int expectedSchemaVersion = -1;
         long[] sqlUpdateCounters = null;
+        UUID txId = null;
 
         for (int i = 0; i < extSize; i++) {
             String key = unpacker.unpackString();
@@ -625,12 +638,23 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                 expectedSchemaVersion = unpacker.unpackInt();
             } else if (key.equals(ErrorExtensions.SQL_UPDATE_COUNTERS)) {
                 sqlUpdateCounters = unpacker.unpackLongArray();
+            } else if (key.equals(ErrorExtensions.DELAYED_ACK)) {
+                txId = unpacker.unpackUuid();
             } else {
                 // Unknown extension - ignore.
                 unpacker.skipValues(1);
             }
         }
 
+        if (txId != null) {
+            return new ClientDelayedAckException(traceId, code, errMsg, txId, 
causeWithStackTrace);
+        }
+
+        if (sqlUpdateCounters != null) {
+            errMsg = errMsg != null ? errMsg : "SQL batch execution error";
+            return new SqlBatchException(traceId, code, sqlUpdateCounters, 
errMsg, causeWithStackTrace);
+        }
+
         if (code == Table.SCHEMA_VERSION_MISMATCH_ERR) {
             if (expectedSchemaVersion == -1) {
                 return new IgniteException(
@@ -640,11 +664,6 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             return new ClientSchemaVersionMismatchException(traceId, code, 
errMsg, expectedSchemaVersion, causeWithStackTrace);
         }
 
-        if (sqlUpdateCounters != null) {
-            errMsg = errMsg != null ? errMsg : "SQL batch execution error";
-            return new SqlBatchException(traceId, code, sqlUpdateCounters, 
errMsg, causeWithStackTrace);
-        }
-
         try {
             Class<? extends Throwable> errCls = (Class<? extends Throwable>) 
Class.forName(errClassName);
             return copyExceptionWithCause(errCls, traceId, code, errMsg, 
causeWithStackTrace);
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTableMapUtils.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTableMapUtils.java
index 826b08289e4..49f9f6ce3b6 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTableMapUtils.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTableMapUtils.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import org.apache.ignite.client.IgniteClientConnectionException;
 import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.util.CompletableFutures;
@@ -126,7 +127,10 @@ class ClientTableMapUtils {
             }
             Transaction tx0 = txns.get(i);
             tx0.rollbackAsync().exceptionally(e -> {
-                log.error("Failed to rollback a transactional batch: [tx=" + 
tx0 + ']', e);
+                Throwable cause = unwrapCause(e);
+                if (!(cause instanceof IgniteClientConnectionException)) {
+                    log.error("Failed to rollback a transactional batch: [tx=" 
+ tx0 + ']', cause);
+                }
                 return null;
             });
         }
@@ -141,7 +145,10 @@ class ClientTableMapUtils {
         for (Transaction txn : txns) {
             ClientLazyTransaction tx0 = (ClientLazyTransaction) txn;
             CompletableFuture<Void> fut = tx0.commitAsync().exceptionally(e -> 
{
-                log.error("Failed to commit a transactional batch: [tx=" + tx0 
+ ']', e);
+                Throwable cause = unwrapCause(e);
+                if (!(cause instanceof IgniteClientConnectionException)) {
+                    log.error("Failed to commit a transactional batch: [tx=" + 
tx0 + ']', cause);
+                }
                 return null;
             });
             // Enforce sync commit to avoid lock conflicts then working in 
compatibility mode.
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index 2eceadf027f..74a7d8c3ac4 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.client.tx;
 
+import static java.util.function.Function.identity;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -221,13 +222,27 @@ public class ClientTransaction implements Transaction {
         boolean enabled = 
ch.protocolContext().isFeatureSupported(TX_PIGGYBACK);
         CompletableFuture<Void> finishFut = enabled ? 
ch.inflights().finishFuture(txId()) : nullCompletedFuture();
 
-        CompletableFuture<Void> mainFinishFut = finishFut.thenCompose(ignored 
-> ch.serviceAsync(ClientOp.TX_COMMIT, w -> {
-            w.out().packLong(id);
+        CompletableFuture<Void> mainFinishFut = finishFut.handle((ignored, e) 
-> {
+            if (e != null) {
+                ch.serviceAsync(ClientOp.TX_ROLLBACK, w -> {
+                    w.out().packLong(id);
 
-            if (!isReadOnly && enabled) {
-                packEnlisted(w);
+                    if (!isReadOnly && enabled) {
+                        packEnlisted(w);
+                    }
+                }, r -> null);
+
+                return CompletableFuture.<Void>failedFuture(e);
             }
-        }, r -> null));
+
+            return ch.serviceAsync(ClientOp.TX_COMMIT, w -> {
+                w.out().packLong(id);
+
+                if (!isReadOnly && enabled) {
+                    packEnlisted(w);
+                }
+            }, r -> (Void) null);
+        }).thenCompose(identity());
 
         mainFinishFut.handle((res, e) -> {
             setState(STATE_COMMITTED);
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h 
b/modules/platforms/cpp/ignite/common/error_codes.h
index f6d47e1ea33..ea8036ad0ec 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -137,6 +137,7 @@ enum class code : underlying_t {
     TX_STALE_OPERATION = 0x7000e,
     TX_STALE_READ_ONLY_OPERATION = 0x7000f,
     TX_ALREADY_FINISHED_WITH_TIMEOUT = 0x70010,
+    TX_DELAYED_ACK = 0x70011,
 
     // Replicator group. Group code: 8
     REPLICA_COMMON = 0x80001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp 
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index ff092b83a14..4eefe311cf4 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -208,6 +208,7 @@ sql_state error_code_to_sql_state(error::code code) {
         case error::code::TX_STALE_OPERATION:
         case error::code::TX_STALE_READ_ONLY_OPERATION:
         case error::code::TX_ALREADY_FINISHED_WITH_TIMEOUT:
+        case error::code::TX_DELAYED_ACK:
             return sql_state::S25000_INVALID_TRANSACTION_STATE;
 
         // Replicator group. Group code: 8
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs 
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index 547daee778a..8355f91e01c 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -364,6 +364,9 @@ namespace Apache.Ignite
 
             /// <summary> TxAlreadyFinishedWithTimeout error. </summary>
             public const int TxAlreadyFinishedWithTimeout = (GroupCode << 16) 
| (16 & 0xFFFF);
+
+            /// <summary> TxDelayedAck error. </summary>
+            public const int TxDelayedAck = (GroupCode << 16) | (17 & 0xFFFF);
         }
 
         /// <summary> Replicator errors. </summary>
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 57aa60e5242..9d11528e820 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -420,7 +420,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                                 LOG.debug("Sending delayed response for 
replica request [request={}]", request);
 
                                 if (ex0 == null) {
-                                    msg0 = 
prepareReplicaResponse(sendTimestamp, new ReplicaResult(res0, null));
+                                    msg0 = 
prepareDelayedReplicaResponse(sendTimestamp, res0);
                                 } else {
                                     if (indicatesUnexpectedProblem(ex0)) {
                                         LOG.warn("Failed to process delayed 
response [request={}]", ex0, request);
@@ -1104,7 +1104,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             return REPLICA_MESSAGES_FACTORY
                     .errorTimestampAwareReplicaResponse()
                     .throwable(ex)
-                    .timestamp(clockService.now())
+                    .timestamp(clockService.current())
                     .build();
         } else {
             return REPLICA_MESSAGES_FACTORY
@@ -1114,6 +1114,21 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         }
     }
 
+    private NetworkMessage prepareDelayedReplicaResponse(boolean 
sendTimestamp, Object result) {
+        if (sendTimestamp) {
+            return REPLICA_MESSAGES_FACTORY
+                    .timestampAwareReplicaResponse()
+                    .result(result)
+                    .timestamp(clockService.current())
+                    .build();
+        } else {
+            return REPLICA_MESSAGES_FACTORY
+                    .replicaResponse()
+                    .result(result)
+                    .build();
+        }
+    }
+
     /**
      * Idle safe time sync for replicas.
      */
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/client/ItClientDirectMappingTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/client/ItClientDirectMappingTest.java
index d4f87d39755..b671b25ca24 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/client/ItClientDirectMappingTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/client/ItClientDirectMappingTest.java
@@ -23,13 +23,9 @@ import static 
org.apache.ignite.internal.tx.test.ItTransactionTestUtils.withTx;
 import static 
org.apache.ignite.internal.tx.test.ItTransactionTestUtils.withTxVoid;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.util.function.Predicate;
-import java.util.stream.IntStream;
 import org.apache.ignite.InitParametersBuilder;
 import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
-import org.apache.ignite.internal.TestWrappers;
-import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.client.table.ClientTable;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.Tuple;
@@ -129,19 +125,6 @@ public class ItClientDirectMappingTest extends 
ClusterPerTestIntegrationTest {
         return NODE_BOOTSTRAP_CFG_TEMPLATE;
     }
 
-    private IgniteImpl findNode(int startRange, int endRange, 
Predicate<IgniteImpl> filter) {
-        return IntStream.range(startRange, endRange)
-                .mapToObj(this::node)
-                .map(TestWrappers::unwrapIgniteImpl)
-                .filter(filter::test)
-                .findFirst()
-                .get();
-    }
-
-    private IgniteImpl findNodeByName(String leaseholder) {
-        return findNode(0, initialNodes(), n -> leaseholder.equals(n.name()));
-    }
-
     @Override
     protected int initialNodes() {
         return 2;
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
index 243819b87c6..f9d20b46339 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
@@ -43,6 +43,7 @@ import org.apache.ignite.compute.BroadcastJobTarget;
 import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.internal.raft.configuration.RaftConfigurationSchema;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
@@ -66,6 +67,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @ExtendWith(WorkDirectoryExtension.class)
 public abstract class ItAbstractThinClientTest extends BaseIgniteAbstractTest {
+    protected static final String ZONE_NAME = "TEST_ZONE";
+
     protected static final String TABLE_NAME = "TBL1";
 
     protected static final String COLUMN_KEY = "key";
@@ -97,8 +100,9 @@ public abstract class ItAbstractThinClientTest extends 
BaseIgniteAbstractTest {
                             + (i == 1 ? ("  
clientConnector.sendServerExceptionStackTraceToClient: true\n"
                             + "  clientConnector.metricsEnabled: true\n") : "")
                             + "  clientConnector.port: " + (10800 + i) + ",\n"
-                            + "  rest.port: " + (10300 + i) + "\n"
-                            + "  compute.threadPoolSize: 1\n"
+                            + "  rest.port: " + (10300 + i) + ",\n"
+                            + "  compute.threadPoolSize: 1,\n"
+                            + "  raft.retryTimeoutMillis: " + 
raftTimeoutMillis()
                             + "}"
             );
         }
@@ -124,7 +128,7 @@ public abstract class ItAbstractThinClientTest extends 
BaseIgniteAbstractTest {
 
         IgniteSql sql = startedNodes.get(0).sql();
 
-        sql.execute(null, "CREATE ZONE TEST_ZONE (REPLICAS " + replicas() + ", 
PARTITIONS " + PARTITIONS + ") STORAGE PROFILES ['"
+        sql.execute(null, "CREATE ZONE " + ZONE_NAME + " (REPLICAS " + 
replicas() + ", PARTITIONS " + PARTITIONS + ") STORAGE PROFILES ['"
                 + DEFAULT_STORAGE_PROFILE + "']");
         sql.execute(null, "CREATE TABLE " + TABLE_NAME + "("
                 + COLUMN_KEY + " INT PRIMARY KEY, " + COLUMN_VAL + " VARCHAR) 
ZONE TEST_ZONE");
@@ -187,6 +191,10 @@ public abstract class ItAbstractThinClientTest extends 
BaseIgniteAbstractTest {
         return DEFAULT_IDLE_SAFE_TIME_PROP_DURATION;
     }
 
+    protected long raftTimeoutMillis() {
+        return new RaftConfigurationSchema().retryTimeoutMillis;
+    }
+
     protected IgniteClient client() {
         return client;
     }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
index 2d31607b24b..2d94a465045 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.runner.app.client;
 
 import static java.util.Collections.emptyList;
 import static java.util.Comparator.comparing;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.startsWith;
@@ -49,7 +50,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.client.IgniteClient;
-import org.apache.ignite.internal.TestWrappers;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.client.ClientChannel;
 import org.apache.ignite.internal.client.ClientTransactionInflights;
@@ -450,10 +450,10 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
 
         assertTrue(coordIdx != -1);
 
-        IgniteImpl coord = TestWrappers.unwrapIgniteImpl(server(coordIdx));
+        IgniteImpl coord = unwrapIgniteImpl(server(coordIdx));
         assertNotNull(coord.txManager().stateMeta(txId), "Transaction expected 
to be colocated with enlistment");
 
-        IgniteImpl other = TestWrappers.unwrapIgniteImpl(server(1 - coordIdx));
+        IgniteImpl other = unwrapIgniteImpl(server(1 - coordIdx));
 
         do {
             k++;
@@ -620,7 +620,7 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
 
         ClientLazyTransaction tx0 = (ClientLazyTransaction) 
client().transactions().begin();
 
-        IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
 
         List<Tuple> tuples0 = generateKeysForNode(200, 2, map, 
server0.cluster().localNode(), table);
 
@@ -650,8 +650,8 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
 
         ClientTable table = (ClientTable) table();
 
-        IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
-        IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = unwrapIgniteImpl(server(1));
 
         List<Tuple> tuples0 = generateKeysForNode(300, 1, map, 
server0.cluster().localNode(), table);
         List<Tuple> tuples1 = generateKeysForNode(310, 1, map, 
server1.cluster().localNode(), table);
@@ -682,8 +682,8 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
 
         ClientTable table = (ClientTable) table();
 
-        IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
-        IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = unwrapIgniteImpl(server(1));
 
         List<Tuple> tuples0 = generateKeysForNode(400, 1, map, 
server0.cluster().localNode(), table);
         List<Tuple> tuples1 = generateKeysForNode(410, 1, map, 
server1.cluster().localNode(), table);
@@ -731,8 +731,8 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
 
         ClientTable table = (ClientTable) table();
 
-        IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
-        IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = unwrapIgniteImpl(server(1));
 
         List<Tuple> tuples0 = generateKeysForNode(500, 1, map, 
server0.cluster().localNode(), table);
         List<Tuple> tuples1 = generateKeysForNode(510, 80, map, 
server1.cluster().localNode(), table);
@@ -858,8 +858,8 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
 
         ClientTable table = (ClientTable) table();
 
-        IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
-        IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = unwrapIgniteImpl(server(1));
 
         List<Tuple> tuples0 = generateKeysForNode(600, 50, map, 
server0.cluster().localNode(), table);
         List<Tuple> tuples1 = generateKeysForNode(610, 50, map, 
server1.cluster().localNode(), table);
@@ -889,8 +889,8 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
 
         ClientTable table = (ClientTable) table();
 
-        IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
-        IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = unwrapIgniteImpl(server(1));
 
         List<Tuple> tuples0 = generateKeysForNode(600, 2, map, 
server0.cluster().localNode(), table);
         List<Tuple> tuples1 = generateKeysForNode(610, 1, map, 
server1.cluster().localNode(), table);
@@ -954,8 +954,8 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
 
         ClientTable table = (ClientTable) table();
 
-        IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
-        IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = unwrapIgniteImpl(server(1));
 
         List<Tuple> keys0 = generateKeysForNode(600, 2, map, 
server0.cluster().localNode(), table);
         List<Tuple> keys1 = generateKeysForNode(610, 1, map, 
server1.cluster().localNode(), table);
@@ -1019,8 +1019,8 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
 
         ClientTable table = (ClientTable) table();
 
-        IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
-        IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = unwrapIgniteImpl(server(1));
 
         List<Tuple> tuples0 = generateKeysForNode(700, 50, map, 
server0.cluster().localNode(), table);
         List<Tuple> tuples1 = generateKeysForNode(710, 50, map, 
server1.cluster().localNode(), table);
@@ -1097,8 +1097,8 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
         // Load partition map to ensure all entries are directly mapped.
         Map<Partition, ClusterNode> map = 
table.partitionManager().primaryReplicasAsync().join();
 
-        IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
-        IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = unwrapIgniteImpl(server(1));
 
         List<Tuple> tuples0 = generateKeysForNode(600, 20, map, 
server0.cluster().localNode(), table);
         List<Tuple> tuples1 = generateKeysForNode(600, 20, map, 
server1.cluster().localNode(), table);
@@ -1135,8 +1135,8 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
         // Load partition map to ensure all entries are directly mapped.
         Map<Partition, ClusterNode> map = 
table.partitionManager().primaryReplicasAsync().join();
 
-        IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
-        IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = unwrapIgniteImpl(server(1));
 
         List<Tuple> tuples0 = generateKeysForNode(600, 20, map, 
server0.cluster().localNode(), table);
         List<Tuple> tuples1 = generateKeysForNode(600, 20, map, 
server1.cluster().localNode(), table);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithBrokenReplicatorTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithBrokenReplicatorTest.java
new file mode 100644
index 00000000000..18da22a1173
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithBrokenReplicatorTest.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app.client;
+
+import static 
org.apache.ignite.internal.ReplicationGroupsUtils.zonePartitionIds;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.runner.app.client.ItThinClientTransactionsTest.generateKeysForPartition;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.client.table.ClientTable;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.JRaftUtils;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.AppendEntriesRequestImpl;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
+import org.apache.ignite.raft.jraft.rpc.RaftServerService;
+import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
+import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
+import org.apache.ignite.raft.jraft.rpc.RpcRequestProcessor;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
+import org.apache.ignite.raft.jraft.rpc.RpcServer;
+import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
+import 
org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestProcessor;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test thin client transactions failure handling with broken replicator.
+ */
+public class ItThinClientTransactionsWithBrokenReplicatorTest extends 
ItAbstractThinClientTest {
+    @Override
+    protected long raftTimeoutMillis() {
+        return TimeUnit.SECONDS.toMillis(2); // Set small retry timeout to 
reduce the test execution time.
+    }
+
+    @Test
+    public void testErrorDuringDirectMappingSinglePartitionTransaction() {
+        Map<Partition, ClusterNode> map = 
table().partitionManager().primaryReplicasAsync().join();
+
+        ClientTable table = (ClientTable) table();
+
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = unwrapIgniteImpl(server(1));
+
+        List<ZonePartitionId> replicationGroupIds = getPartitions(server0);
+        ZonePartitionId part0 = replicationGroupIds.get(0);
+
+        List<Tuple> tuples0 = generateKeysForPartition(100, 1, map, 
part0.partitionId(), table);
+
+        FaultyAppendEntriesRequestProcessor proc0 = 
installFaultyAppendEntriesProcessor(server0);
+        proc0.setFaultyGroup(part0);
+
+        FaultyAppendEntriesRequestProcessor proc1 = 
installFaultyAppendEntriesProcessor(server1);
+        proc1.setFaultyGroup(part0);
+
+        KeyValueView<Tuple, Tuple> tupleView = table.keyValueView();
+
+        Transaction tx = client().transactions().begin();
+        tupleView.put(tx, tuples0.get(0), val(tuples0.get(0).intValue(0) + 
""));
+
+        try {
+            tx.commit();
+            fail("Expecting commit failure");
+        } catch (TransactionException exception) {
+            assertEquals(Transactions.TX_DELAYED_ACK_ERR, exception.code());
+        }
+
+        proc0.setFaultyGroup(null);
+        proc1.setFaultyGroup(null);
+
+        assertNull(tupleView.get(null, tuples0.get(0)));
+    }
+
+    @Test
+    public void testErrorDuringDirectMappingSinglePartitionTransactionBatch() {
+        Map<Partition, ClusterNode> map = 
table().partitionManager().primaryReplicasAsync().join();
+
+        ClientTable table = (ClientTable) table();
+
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = unwrapIgniteImpl(server(1));
+
+        List<ZonePartitionId> replicationGroupIds = getPartitions(server0);
+        ZonePartitionId part0 = replicationGroupIds.get(0);
+
+        List<Tuple> tuples0 = generateKeysForPartition(200, 2, map, 
part0.partitionId(), table);
+
+        Map<Tuple, Tuple> batch = new HashMap<>();
+
+        for (Tuple tup : tuples0) {
+            batch.put(tup, val(tup.intValue(0) + ""));
+        }
+
+        FaultyAppendEntriesRequestProcessor proc0 = 
installFaultyAppendEntriesProcessor(server0);
+        proc0.setFaultyGroup(part0);
+
+        FaultyAppendEntriesRequestProcessor proc1 = 
installFaultyAppendEntriesProcessor(server1);
+        proc1.setFaultyGroup(part0);
+
+        KeyValueView<Tuple, Tuple> tupleView = table.keyValueView();
+
+        Transaction tx = client().transactions().begin();
+        tupleView.putAll(tx, batch);
+
+        try {
+            tx.commit();
+            fail("Expecting commit failure");
+        } catch (TransactionException exception) {
+            assertEquals(Transactions.TX_DELAYED_ACK_ERR, exception.code());
+        }
+
+        proc0.setFaultyGroup(null);
+        proc1.setFaultyGroup(null);
+
+        assertTrue(tupleView.getAll(null, batch.keySet()).isEmpty());
+    }
+
+    @Test
+    public void testErrorDuringDirectMappingTwoPartitionTransaction() {
+        Map<Partition, ClusterNode> map = 
table().partitionManager().primaryReplicasAsync().join();
+        Map<Integer, ClusterNode> mapPartById = 
map.entrySet().stream().collect(Collectors.toMap(
+                entry -> ((HashPartition) entry.getKey()).partitionId(),
+                Entry::getValue
+        ));
+
+        ClientTable table = (ClientTable) table();
+
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = unwrapIgniteImpl(server(1));
+
+        List<ZonePartitionId> replicationGroupIds = getPartitions(server0);
+        ZonePartitionId part0 = replicationGroupIds.get(0);
+        ClusterNode firstNode = mapPartById.get(part0.partitionId());
+
+        ZonePartitionId part1 = null;
+
+        // We need to find a partition on other node.
+        for (int i = 1; i < replicationGroupIds.size(); i++) {
+            ZonePartitionId tmp = replicationGroupIds.get(i);
+            ClusterNode otherNode = mapPartById.get(tmp.partitionId());
+            if (!otherNode.equals(firstNode)) {
+                part1 = tmp;
+                break;
+            }
+        }
+
+        assertNotNull(part1);
+
+        List<Tuple> tuples0 = generateKeysForPartition(300, 1, map, 
part0.partitionId(), table);
+        List<Tuple> tuples1 = generateKeysForPartition(310, 1, map, 
part1.partitionId(), table);
+
+        FaultyAppendEntriesRequestProcessor proc0 = 
installFaultyAppendEntriesProcessor(server0);
+        proc0.setFaultyGroup(part1);
+
+        FaultyAppendEntriesRequestProcessor proc1 = 
installFaultyAppendEntriesProcessor(server1);
+        proc1.setFaultyGroup(part1);
+
+        KeyValueView<Tuple, Tuple> tupleView = table.keyValueView();
+
+        Transaction tx = client().transactions().begin();
+
+        Map<Tuple, Tuple> batch = new LinkedHashMap<>();
+
+        for (Tuple tup : tuples0) {
+            batch.put(tup, val(tup.intValue(0) + ""));
+        }
+
+        for (Tuple tup : tuples1) {
+            batch.put(tup, val(tup.intValue(0) + ""));
+        }
+
+        Iterator<Entry<Tuple, Tuple>> iter = batch.entrySet().iterator();
+        Entry<Tuple, Tuple> first = iter.next();
+
+        tupleView.put(tx, first.getKey(), first.getValue());
+
+        // Directly mapped request, will cause client inflight failure.
+        Entry<Tuple, Tuple> second = iter.next();
+        tupleView.put(tx, second.getKey(), second.getValue());
+
+        try {
+            tx.commit();
+            fail("Expecting commit failure");
+        } catch (TransactionException exception) {
+            assertEquals(Transactions.TX_DELAYED_ACK_ERR, exception.code());
+        }
+
+        proc0.setFaultyGroup(null);
+        proc1.setFaultyGroup(null);
+
+        assertTrue(tupleView.getAll(null, batch.keySet()).isEmpty());
+    }
+
+    @Test
+    public void testErrorDuringDirectMappingTwoPartitionTransactionColocated() 
{
+        Map<Partition, ClusterNode> map = 
table().partitionManager().primaryReplicasAsync().join();
+        Map<Integer, ClusterNode> mapPartById = 
map.entrySet().stream().collect(Collectors.toMap(
+                entry -> ((HashPartition) entry.getKey()).partitionId(),
+                Entry::getValue
+        ));
+
+        ClientTable table = (ClientTable) table();
+
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = unwrapIgniteImpl(server(1));
+
+        List<ZonePartitionId> replicationGroupIds = getPartitions(server0);
+        ZonePartitionId part0 = replicationGroupIds.get(0);
+        ClusterNode firstNode = mapPartById.get(part0.partitionId());
+
+        ZonePartitionId part1 = null;
+
+        // We need to find a partition on the same node.
+        for (int i = 1; i < replicationGroupIds.size(); i++) {
+            ZonePartitionId tmp = replicationGroupIds.get(i);
+            ClusterNode otherNode = mapPartById.get(tmp.partitionId());
+            if (otherNode.equals(firstNode)) {
+                part1 = tmp;
+                break;
+            }
+        }
+
+        assertNotNull(part1);
+
+        List<Tuple> tuples0 = generateKeysForPartition(400, 1, map, 
part0.partitionId(), table);
+        List<Tuple> tuples1 = generateKeysForPartition(410, 1, map, 
part1.partitionId(), table);
+
+        FaultyAppendEntriesRequestProcessor proc0 = 
installFaultyAppendEntriesProcessor(server0);
+        proc0.setFaultyGroup(part1);
+
+        FaultyAppendEntriesRequestProcessor proc1 = 
installFaultyAppendEntriesProcessor(server1);
+        proc1.setFaultyGroup(part1);
+
+        KeyValueView<Tuple, Tuple> tupleView = table.keyValueView();
+
+        Transaction tx = client().transactions().begin();
+
+        Map<Tuple, Tuple> batch = new LinkedHashMap<>();
+
+        for (Tuple tup : tuples0) {
+            batch.put(tup, val(tup.intValue(0) + ""));
+        }
+
+        for (Tuple tup : tuples1) {
+            batch.put(tup, val(tup.intValue(0) + ""));
+        }
+
+        Iterator<Entry<Tuple, Tuple>> iter = batch.entrySet().iterator();
+        Entry<Tuple, Tuple> first = iter.next();
+
+        tupleView.put(tx, first.getKey(), first.getValue());
+
+        // Colocated request, will cause server inflight failure.
+        Entry<Tuple, Tuple> second = iter.next();
+        tupleView.put(tx, second.getKey(), second.getValue());
+
+        try {
+            tx.commit();
+            fail("Expecting commit failure");
+        } catch (TransactionException exception) {
+            assertEquals(Transactions.TX_DELAYED_ACK_ERR, exception.code());
+        }
+
+        proc0.setFaultyGroup(null);
+        proc1.setFaultyGroup(null);
+
+        assertTrue(tupleView.getAll(null, batch.keySet()).isEmpty());
+    }
+
+    private Table table() {
+        return client().tables().tables().get(0);
+    }
+
+    private static Tuple val(String v) {
+        return Tuple.create().set(COLUMN_VAL, v);
+    }
+
+    private static Tuple key(Integer k) {
+        return Tuple.create().set(COLUMN_KEY, k);
+    }
+
+    @Override
+    protected int replicas() {
+        return 2;
+    }
+
+    private static FaultyAppendEntriesRequestProcessor 
installFaultyAppendEntriesProcessor(IgniteImpl node) {
+        RaftServer raftServer = node.raftManager().server();
+        RpcServer<?> rpcServer = getFieldValue(raftServer, 
JraftServerImpl.class, "rpcServer");
+        Map<String, RpcProcessor<?>> processors = getFieldValue(rpcServer, 
IgniteRpcServer.class, "processors");
+
+        AppendEntriesRequestProcessor originalProcessor =
+                (AppendEntriesRequestProcessor) 
processors.get(AppendEntriesRequest.class.getName());
+        Executor appenderExecutor = getFieldValue(originalProcessor, 
RpcRequestProcessor.class, "executor");
+        RaftMessagesFactory raftMessagesFactory = 
getFieldValue(originalProcessor, RpcRequestProcessor.class, "msgFactory");
+
+        FaultyAppendEntriesRequestProcessor blockingProcessor = new 
FaultyAppendEntriesRequestProcessor(
+                appenderExecutor,
+                raftMessagesFactory
+        );
+
+        rpcServer.registerProcessor(blockingProcessor);
+
+        return blockingProcessor;
+    }
+
+    private static class FaultyAppendEntriesRequestProcessor extends 
AppendEntriesRequestProcessor {
+        private volatile @Nullable String partId;
+
+        FaultyAppendEntriesRequestProcessor(Executor executor, 
RaftMessagesFactory msgFactory) {
+            super(executor, msgFactory);
+        }
+
+        @Override
+        public Message processRequest0(RaftServerService service, 
AppendEntriesRequest request, RpcRequestClosure done) {
+            boolean isHeartbeat = JRaftUtils.isHeartbeatRequest(request);
+
+            if (partId != null && partId.equals(request.groupId()) && 
!isHeartbeat) {
+                // This response puts replicator to endless retry loop.
+                return RaftRpcFactory.DEFAULT //
+                        .newResponse(done.getMsgFactory(), RaftError.EINTERNAL,
+                                "Fail AppendEntries on '%s'.", 
request.groupId());
+            }
+
+            return super.processRequest0(service, request, done);
+        }
+
+        void setFaultyGroup(@Nullable ZonePartitionId partId) {
+            this.partId = partId == null ? null : partId.toString();
+        }
+
+        @Override
+        public String interest() {
+            return AppendEntriesRequestImpl.class.getName();
+        }
+    }
+
+    private static List<ZonePartitionId> getPartitions(IgniteImpl server) {
+        Catalog catalog = 
server.catalogManager().catalog(server.catalogManager().latestCatalogVersion());
+        CatalogZoneDescriptor zoneDescriptor = 
catalog.zone(ZONE_NAME.toUpperCase());
+        List<ZonePartitionId> replicationGroupIds = zonePartitionIds(server, 
zoneDescriptor.id());
+        return replicationGroupIds;
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index fb33446034a..a0cd0633951 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -43,6 +43,7 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.emptyCollection
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.internal.util.IgniteUtils.findAny;
 import static org.apache.ignite.internal.util.IgniteUtils.findFirst;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
@@ -170,6 +171,7 @@ import 
org.apache.ignite.internal.table.distributed.TableUtils;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.replicator.handlers.BuildIndexReplicaRequestHandler;
 import org.apache.ignite.internal.table.metrics.TableMetricSource;
+import org.apache.ignite.internal.tx.DelayedAckException;
 import org.apache.ignite.internal.tx.Lock;
 import org.apache.ignite.internal.tx.LockKey;
 import org.apache.ignite.internal.tx.LockManager;
@@ -2296,7 +2298,13 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                     );
                 }
 
-                CompletableFuture<UUID> repFut = 
applyCmdWithExceptionHandling(cmd).thenApply(res -> cmd.txId());
+                CompletableFuture<UUID> repFut = 
applyCmdWithExceptionHandling(cmd).handle((r, e) -> {
+                    if (e != null) {
+                        throw new DelayedAckException(cmd.txId(), 
unwrapCause(e));
+                    }
+
+                    return cmd.txId();
+                });
 
                 return completedFuture(new CommandApplicationResult(null, 
repFut));
             }
@@ -2429,7 +2437,13 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
                 );
             }
 
-            CompletableFuture<UUID> repFut = 
applyCmdWithExceptionHandling(cmd).thenApply(res -> cmd.txId());
+            CompletableFuture<UUID> repFut = 
applyCmdWithExceptionHandling(cmd).handle((r, e) -> {
+                if (e != null) {
+                    throw new DelayedAckException(cmd.txId(), unwrapCause(e));
+                }
+
+                return cmd.txId();
+            });
 
             return completedFuture(new CommandApplicationResult(null, repFut));
         } else {
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/DelayedAckException.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/DelayedAckException.java
new file mode 100644
index 00000000000..decab36b942
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/DelayedAckException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_DELAYED_ACK_ERR;
+
+import java.util.UUID;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+
+/**
+ * Holds the transaction id and the cause for delayed replication ack failure.
+ */
+public class DelayedAckException extends IgniteInternalException {
+    private static final long serialVersionUID = 0L;
+
+    private final UUID txId;
+
+    /**
+     * Create the exception with txId and caus.
+     *
+     * @param txId The transaction id.
+     * @param cause The cause.
+     */
+    public DelayedAckException(UUID txId, Throwable cause) {
+        super(TX_DELAYED_ACK_ERR, "Failed to commit the transaction due to 
failed replication acknowledgement [txId=" + txId + ']', cause);
+        this.txId = txId;
+    }
+
+    /**
+     * Get the transaction id.
+     *
+     * @return The id.
+     */
+    public UUID txId() {
+        return txId;
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
index 35af89caa08..da5311e8da7 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
@@ -46,6 +46,8 @@ public class TransactionExceptionMapperProvider implements 
IgniteExceptionMapper
                 err -> new 
MismatchingTransactionOutcomeException(err.traceId(), err.code(), 
err.getMessage(), err)));
         mappers.add(unchecked(IncompatibleSchemaAbortException.class,
                 err -> new IncompatibleSchemaException(err.traceId(), 
err.code(), err.getMessage(), err)));
+        mappers.add(unchecked(DelayedAckException.class,
+                err -> new TransactionException(err.traceId(), err.code(), 
err.getMessage(), err.getCause())));
 
         return mappers;
     }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
index 5e8f4e4511f..c6c79c1c89c 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
@@ -170,7 +170,26 @@ public class TransactionInflights {
 
         // Avoid completion under lock.
         if (tuple != null) {
-            tuple.onInflightsRemoved();
+            tuple.onInflightRemoved(tuple.err);
+        }
+    }
+
+    void removeInflight(UUID txId, Throwable cause) {
+        // Can be null if tx was aborted and inflights were removed from the 
collection.
+        TxContext tuple = txCtxMap.computeIfPresent(txId, (uuid, ctx) -> {
+            if (cause != null && ctx.err == null) {
+                ctx.err = cause; // Retain only first exception.
+            }
+
+            // Update inflight counter after assigning error value to avoid 
issues with visibility.
+            ctx.removeInflight(txId);
+
+            return ctx;
+        });
+
+        // Avoid completion under lock.
+        if (tuple != null) {
+            tuple.onInflightRemoved(tuple.err);
         }
     }
 
@@ -251,6 +270,7 @@ public class TransactionInflights {
 
     abstract static class TxContext {
         volatile long inflights = 0; // Updated under lock.
+        Throwable err;
 
         boolean addInflight() {
             if (isTxFinishing()) {
@@ -269,7 +289,7 @@ public class TransactionInflights {
             inflights--;
         }
 
-        abstract void onInflightsRemoved();
+        abstract void onInflightRemoved(@Nullable Throwable t);
 
         abstract void finishTx(@Nullable Map<ReplicationGroupId, 
PendingTxPartitionEnlistment> enlistedGroups);
 
@@ -293,7 +313,7 @@ public class TransactionInflights {
         }
 
         @Override
-        public void onInflightsRemoved() {
+        void onInflightRemoved(Throwable t) {
             // No-op.
         }
 
@@ -337,12 +357,29 @@ public class TransactionInflights {
         }
 
         CompletableFuture<Void> performFinish(boolean commit, 
Function<Boolean, CompletableFuture<Void>> finishAction) {
-            waitReadyToFinish(commit).whenComplete((ignoredReadyToFinish, 
readyException) -> {
+            waitReadyToFinish(commit).whenComplete((ignored, readyException) 
-> {
                 try {
-                    CompletableFuture<Void> actionFut = 
finishAction.apply(commit && readyException == null);
+                    if (commit) {
+                        if (readyException == null) {
+                            CompletableFuture<Void> actionFut = 
finishAction.apply(true);
+
+                            actionFut.whenComplete((ignoredFinishActionResult, 
finishException) ->
+                                    completeFinishInProgressFuture(true, null, 
finishException));
+                        } else {
+                            // If we got ready exception, that means some of 
enlisted partitions could be broken/unavailable.
+                            // Respond to caller with the commit failure 
immediately to reduce potential unavailability window.
+                            completeFinishInProgressFuture(true, 
readyException, null);
+
+                            finishAction.apply(false);
+                        }
+
+                        return;
+                    }
+
+                    CompletableFuture<Void> actionFut = 
finishAction.apply(false);
 
                     actionFut.whenComplete((ignoredFinishActionResult, 
finishException) ->
-                            completeFinishInProgressFuture(commit, 
readyException, finishException));
+                            completeFinishInProgressFuture(false, 
readyException, finishException));
                 } catch (Throwable err) {
                     completeFinishInProgressFuture(commit, readyException, 
err);
                 }
@@ -407,8 +444,13 @@ public class TransactionInflights {
         }
 
         private CompletableFuture<Void> waitNoInflights() {
+            // no new inflights are possible due to locked tx for update.
             if (inflights == 0) {
-                waitRepFut.complete(null);
+                if (err != null) {
+                    waitRepFut.completeExceptionally(err);
+                } else {
+                    waitRepFut.complete(null);
+                }
             }
             return waitRepFut;
         }
@@ -418,9 +460,13 @@ public class TransactionInflights {
         }
 
         @Override
-        public void onInflightsRemoved() {
+        void onInflightRemoved(@Nullable Throwable t) {
             if (inflights == 0 && finishInProgressFuture != null) {
-                waitRepFut.complete(null);
+                if (t == null) {
+                    waitRepFut.complete(null);
+                } else {
+                    waitRepFut.completeExceptionally(t);
+                }
             }
         }
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 83231bc39a0..f366de3c722 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -97,6 +97,7 @@ import 
org.apache.ignite.internal.replicator.message.ReplicaResponse;
 import org.apache.ignite.internal.systemview.api.SystemView;
 import org.apache.ignite.internal.systemview.api.SystemViewProvider;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.tx.DelayedAckException;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.InternalTxOptions;
 import org.apache.ignite.internal.tx.LocalRwTxCounter;
@@ -1213,8 +1214,21 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
             return;
         }
 
-        // Ignore error responses here. A transaction will be rolled back in 
other place.
+        // Only removing the failed inflight here. A transaction will be 
rolled back in other place.
         if (message instanceof ErrorReplicaResponse) {
+            ErrorReplicaResponse response = (ErrorReplicaResponse) message;
+
+            Throwable err = response.throwable();
+
+            Throwable cause = ExceptionUtils.unwrapCause(err);
+
+            if (cause instanceof DelayedAckException) {
+                // Keep compatibility.
+                DelayedAckException err0 = (DelayedAckException) cause;
+
+                transactionInflights.removeInflight(err0.txId(), err0);
+            }
+
             return;
         }
 

Reply via email to