This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8355b3f3ef2 IGNITE-27598 Fixed StackOverflow in txs clean up process 
(#7443)
8355b3f3ef2 is described below

commit 8355b3f3ef26d43dfca7725992668d44f72e017f
Author: Denis Chudov <[email protected]>
AuthorDate: Wed Jan 21 18:11:44 2026 +0400

    IGNITE-27598 Fixed StackOverflow in txs clean up process (#7443)
---
 .../internal/testframework/IgniteTestUtils.java    | 41 +++++++++++
 .../internal/tx/impl/TxCleanupRequestSender.java   | 84 ++++++++++++++++++----
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  9 ++-
 .../apache/ignite/internal/tx/TxCleanupTest.java   | 10 ++-
 4 files changed, 126 insertions(+), 18 deletions(-)

diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 08bcbf50428..bc104d20fc3 100644
--- 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -52,6 +52,7 @@ import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.AbstractExecutorService;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
@@ -78,6 +79,7 @@ import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.IgniteException;
 import org.awaitility.Awaitility;
 import org.hamcrest.CustomMatcher;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.TestInfo;
@@ -1091,4 +1093,43 @@ public final class IgniteTestUtils {
     public static UUID deriveUuidFrom(String str) {
         return new UUID(str.hashCode(), new 
StringBuilder(str).reverse().toString().hashCode());
     }
+
+    /**
+     * Non-concurrent executor service for test purposes.
+     *
+     * @return Executor service.
+     */
+    public static ExecutorService testSyncExecutorService() {
+        return new AbstractExecutorService() {
+            @Override
+            public void shutdown() {
+                // No-op.
+            }
+
+            @Override
+            public @NotNull List<Runnable> shutdownNow() {
+                return List.of();
+            }
+
+            @Override
+            public boolean isShutdown() {
+                return false;
+            }
+
+            @Override
+            public boolean isTerminated() {
+                return false;
+            }
+
+            @Override
+            public boolean awaitTermination(long timeout, TimeUnit unit) 
throws InterruptedException {
+                return false;
+            }
+
+            @Override
+            public void execute(Runnable command) {
+                command.run();
+            }
+        };
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index fefa113562c..70a98f7df38 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.internal.logger.Loggers.toThrottledLogger;
 import static org.apache.ignite.internal.tx.TxStateMeta.builder;
 import static 
org.apache.ignite.internal.tx.impl.TxCleanupExceptionUtils.writeIntentSwitchFailureShouldBeLogged;
 
@@ -36,8 +37,11 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.IgniteThrottledLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.replicator.ReplicatorRecoverableExceptions;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -59,6 +63,10 @@ import org.jetbrains.annotations.Nullable;
 public class TxCleanupRequestSender {
     private static final IgniteLogger LOG = 
Loggers.forClass(TxCleanupRequestSender.class);
 
+    private static final int ATTEMPTS_LOG_THRESHOLD = 100;
+
+    private final IgniteThrottledLogger throttledLog;
+
     /** Placement driver helper. */
     private final PlacementDriverHelper placementDriverHelper;
 
@@ -70,21 +78,30 @@ public class TxCleanupRequestSender {
     /** Local transaction state storage. */
     private final VolatileTxStateMetaStorage txStateVolatileStorage;
 
+    /** Executor that executes async cleanup actions. */
+    private final ExecutorService cleanupExecutor;
+
     /**
      * The constructor.
      *
      * @param txMessageSender Message sender.
      * @param placementDriverHelper Placement driver helper.
      * @param txStateVolatileStorage Volatile transaction state storage.
+     * @param cleanupExecutor Cleanup executor.
+     * @param throttledLogExecutor Executor to clean up the throttled logger 
cache.
      */
     public TxCleanupRequestSender(
             TxMessageSender txMessageSender,
             PlacementDriverHelper placementDriverHelper,
-            VolatileTxStateMetaStorage txStateVolatileStorage
+            VolatileTxStateMetaStorage txStateVolatileStorage,
+            ExecutorService cleanupExecutor,
+            Executor throttledLogExecutor
     ) {
         this.txMessageSender = txMessageSender;
         this.placementDriverHelper = placementDriverHelper;
         this.txStateVolatileStorage = txStateVolatileStorage;
+        this.cleanupExecutor = cleanupExecutor;
+        this.throttledLog = 
toThrottledLogger(Loggers.forClass(TxCleanupRequestSender.class), 
throttledLogExecutor);
     }
 
     /**
@@ -164,7 +181,7 @@ public class TxCleanupRequestSender {
      * @return Completable future of Void.
      */
     public CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId, 
String node, UUID txId) {
-        return sendCleanupMessageWithRetries(commitPartitionId, false, null, 
txId, node, null);
+        return sendCleanupMessageWithRetries(commitPartitionId, false, null, 
txId, node, null, 0);
     }
 
     /**
@@ -201,7 +218,7 @@ public class TxCleanupRequestSender {
             enlistedPartitionGroups.add(new 
EnlistedPartitionGroup(partitionId, partition.tableIds()));
         });
 
-        return cleanupPartitions(commitPartitionId, partitionsByPrimaryName, 
commit, commitTimestamp, txId);
+        return cleanupPartitions(commitPartitionId, partitionsByPrimaryName, 
commit, commitTimestamp, txId, 0);
     }
 
     /**
@@ -220,6 +237,17 @@ public class TxCleanupRequestSender {
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
+    ) {
+        return cleanup(commitPartitionId, partitions, commit, commitTimestamp, 
txId, 0);
+    }
+
+    private CompletableFuture<Void> cleanup(
+            @Nullable ZonePartitionId commitPartitionId,
+            Collection<EnlistedPartitionGroup> partitions,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId,
+            int attemptsMade
     ) {
         Map<ZonePartitionId, EnlistedPartitionGroup> partitionIds = 
partitions.stream()
                 .collect(toMap(EnlistedPartitionGroup::groupId, identity()));
@@ -240,14 +268,22 @@ public class TxCleanupRequestSender {
                             commit,
                             commitTimestamp,
                             txId,
-                            
toPartitionInfos(partitionData.partitionsWithoutPrimary, partitionIds)
+                            
toPartitionInfos(partitionData.partitionsWithoutPrimary, partitionIds),
+                            attemptsMade
                     );
 
                     Map<String, List<EnlistedPartitionGroup>> 
partitionsByPrimaryName = toPartitionInfosByPrimaryName(
                             partitionData.partitionsByNode,
                             partitionIds
                     );
-                    return cleanupPartitions(commitPartitionId, 
partitionsByPrimaryName, commit, commitTimestamp, txId);
+                    return cleanupPartitions(
+                            commitPartitionId,
+                            partitionsByPrimaryName,
+                            commit,
+                            commitTimestamp,
+                            txId,
+                            attemptsMade
+                    );
                 });
     }
 
@@ -273,7 +309,8 @@ public class TxCleanupRequestSender {
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId,
-            List<EnlistedPartitionGroup> partitionsWithoutPrimary
+            List<EnlistedPartitionGroup> partitionsWithoutPrimary,
+            int attemptsMade
     ) {
         Map<ZonePartitionId, EnlistedPartitionGroup> partitionIds = 
partitionsWithoutPrimary.stream()
                 .collect(toMap(EnlistedPartitionGroup::groupId, identity()));
@@ -286,7 +323,7 @@ public class TxCleanupRequestSender {
                             partitionIdsByPrimaryName,
                             partitionIds
                     );
-                    return cleanupPartitions(commitPartitionId, 
partitionsByPrimaryName, commit, commitTimestamp, txId);
+                    return cleanupPartitions(commitPartitionId, 
partitionsByPrimaryName, commit, commitTimestamp, txId, attemptsMade);
                 });
     }
 
@@ -295,7 +332,8 @@ public class TxCleanupRequestSender {
             Map<String, List<EnlistedPartitionGroup>> partitionsByNode,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
-            UUID txId
+            UUID txId,
+            int attemptsMade
     ) {
         List<CompletableFuture<Void>> cleanupFutures = new ArrayList<>();
 
@@ -304,7 +342,7 @@ public class TxCleanupRequestSender {
             List<EnlistedPartitionGroup> nodePartitions = entry.getValue();
 
             
cleanupFutures.add(sendCleanupMessageWithRetries(commitPartitionId, commit, 
commitTimestamp, txId, node,
-                    commitPartitionId == null ? null : nodePartitions));
+                    commitPartitionId == null ? null : nodePartitions, 
attemptsMade));
         }
 
         return allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]));
@@ -316,12 +354,22 @@ public class TxCleanupRequestSender {
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId,
             String node,
-            @Nullable Collection<EnlistedPartitionGroup> partitions
+            @Nullable Collection<EnlistedPartitionGroup> partitions,
+            int attemptsMade
     ) {
         return txMessageSender.cleanup(node, partitions, txId, commit, 
commitTimestamp)
-                .handle((networkMessage, throwable) -> {
+                .handleAsync((networkMessage, throwable) -> {
                     if (throwable != null) {
                         if 
(ReplicatorRecoverableExceptions.isRecoverable(throwable)) {
+                            if (attemptsMade > ATTEMPTS_LOG_THRESHOLD) {
+                                throttledLog.warn(
+                                        "Unsuccessful transaction cleanup 
after {} attempts, keep retrying [txId={}]",
+                                        throwable,
+                                        ATTEMPTS_LOG_THRESHOLD,
+                                        txId
+                                );
+                            }
+
                             // In the case of a failure we repeat the process, 
but start with finding correct primary replicas
                             // for this subset of partitions. If nothing 
changed in terms of the nodes and primaries
                             // we eventually will call ourselves with the same 
parameters.
@@ -332,12 +380,20 @@ public class TxCleanupRequestSender {
                             if (partitions == null) {
                                 // If we don't have any partition, which is 
the recovery or unlock only case,
                                 // just try again with the same node.
-                                return 
sendCleanupMessageWithRetries(commitPartitionId, commit, commitTimestamp, txId, 
node, partitions);
+                                return sendCleanupMessageWithRetries(
+                                        commitPartitionId,
+                                        commit,
+                                        commitTimestamp,
+                                        txId,
+                                        node,
+                                        partitions,
+                                        attemptsMade + 1
+                                );
                             }
 
                             // Run a cleanup that finds new primaries for the 
given partitions.
                             // This covers the case when a partition primary 
died and we still want to switch write intents.
-                            return cleanup(commitPartitionId, partitions, 
commit, commitTimestamp, txId);
+                            return cleanup(commitPartitionId, partitions, 
commit, commitTimestamp, txId, attemptsMade + 1);
                         }
 
                         return CompletableFuture.<Void>failedFuture(throwable);
@@ -356,7 +412,7 @@ public class TxCleanupRequestSender {
                     }
 
                     return CompletableFutures.<Void>nullCompletedFuture();
-                })
+                }, cleanupExecutor)
                 .thenCompose(v -> v);
     }
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 6f25ad4e6b1..76074bed750 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -389,8 +389,13 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
                 writeIntentSwitchPool
         );
 
-        txCleanupRequestSender =
-                new TxCleanupRequestSender(txMessageSender, 
placementDriverHelper, txStateVolatileStorage);
+        txCleanupRequestSender = new TxCleanupRequestSender(
+                txMessageSender,
+                placementDriverHelper,
+                txStateVolatileStorage,
+                writeIntentSwitchPool,
+                commonScheduler
+        );
 
         txMetrics = new TransactionMetricsSource(clockService);
     }
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
index fbbf820a26d..060af184b42 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
@@ -21,6 +21,7 @@ import static java.util.UUID.randomUUID;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testSyncExecutorService;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -141,8 +142,13 @@ public class TxCleanupTest extends IgniteAbstractTest {
 
         PlacementDriverHelper placementDriverHelper = new 
PlacementDriverHelper(placementDriver, clockService);
 
-        cleanupRequestSender = new TxCleanupRequestSender(txMessageSender, 
placementDriverHelper, mock(
-                VolatileTxStateMetaStorage.class));
+        cleanupRequestSender = new TxCleanupRequestSender(
+                txMessageSender,
+                placementDriverHelper,
+                mock(VolatileTxStateMetaStorage.class),
+                testSyncExecutorService(),
+                Runnable::run
+        );
     }
 
     @Test

Reply via email to