This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8355b3f3ef2 IGNITE-27598 Fixed StackOverflow in txs clean up process
(#7443)
8355b3f3ef2 is described below
commit 8355b3f3ef26d43dfca7725992668d44f72e017f
Author: Denis Chudov <[email protected]>
AuthorDate: Wed Jan 21 18:11:44 2026 +0400
IGNITE-27598 Fixed StackOverflow in txs clean up process (#7443)
---
.../internal/testframework/IgniteTestUtils.java | 41 +++++++++++
.../internal/tx/impl/TxCleanupRequestSender.java | 84 ++++++++++++++++++----
.../ignite/internal/tx/impl/TxManagerImpl.java | 9 ++-
.../apache/ignite/internal/tx/TxCleanupTest.java | 10 ++-
4 files changed, 126 insertions(+), 18 deletions(-)
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 08bcbf50428..bc104d20fc3 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -52,6 +52,7 @@ import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -78,6 +79,7 @@ import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.IgniteException;
import org.awaitility.Awaitility;
import org.hamcrest.CustomMatcher;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestInfo;
@@ -1091,4 +1093,43 @@ public final class IgniteTestUtils {
public static UUID deriveUuidFrom(String str) {
return new UUID(str.hashCode(), new
StringBuilder(str).reverse().toString().hashCode());
}
+
+ /**
+ * Non-concurrent executor service for test purposes.
+ *
+ * @return Executor service.
+ */
+ public static ExecutorService testSyncExecutorService() {
+ return new AbstractExecutorService() {
+ @Override
+ public void shutdown() {
+ // No-op.
+ }
+
+ @Override
+ public @NotNull List<Runnable> shutdownNow() {
+ return List.of();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return false;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return false;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
+ return false;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ command.run();
+ }
+ };
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index fefa113562c..70a98f7df38 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.internal.logger.Loggers.toThrottledLogger;
import static org.apache.ignite.internal.tx.TxStateMeta.builder;
import static
org.apache.ignite.internal.tx.impl.TxCleanupExceptionUtils.writeIntentSwitchFailureShouldBeLogged;
@@ -36,8 +37,11 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.IgniteThrottledLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.ReplicatorRecoverableExceptions;
import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -59,6 +63,10 @@ import org.jetbrains.annotations.Nullable;
public class TxCleanupRequestSender {
private static final IgniteLogger LOG =
Loggers.forClass(TxCleanupRequestSender.class);
+ private static final int ATTEMPTS_LOG_THRESHOLD = 100;
+
+ private final IgniteThrottledLogger throttledLog;
+
/** Placement driver helper. */
private final PlacementDriverHelper placementDriverHelper;
@@ -70,21 +78,30 @@ public class TxCleanupRequestSender {
/** Local transaction state storage. */
private final VolatileTxStateMetaStorage txStateVolatileStorage;
+ /** Executor that executes async cleanup actions. */
+ private final ExecutorService cleanupExecutor;
+
/**
* The constructor.
*
* @param txMessageSender Message sender.
* @param placementDriverHelper Placement driver helper.
* @param txStateVolatileStorage Volatile transaction state storage.
+ * @param cleanupExecutor Cleanup executor.
+ * @param throttledLogExecutor Executor to clean up the throttled logger
cache.
*/
public TxCleanupRequestSender(
TxMessageSender txMessageSender,
PlacementDriverHelper placementDriverHelper,
- VolatileTxStateMetaStorage txStateVolatileStorage
+ VolatileTxStateMetaStorage txStateVolatileStorage,
+ ExecutorService cleanupExecutor,
+ Executor throttledLogExecutor
) {
this.txMessageSender = txMessageSender;
this.placementDriverHelper = placementDriverHelper;
this.txStateVolatileStorage = txStateVolatileStorage;
+ this.cleanupExecutor = cleanupExecutor;
+ this.throttledLog =
toThrottledLogger(Loggers.forClass(TxCleanupRequestSender.class),
throttledLogExecutor);
}
/**
@@ -164,7 +181,7 @@ public class TxCleanupRequestSender {
* @return Completable future of Void.
*/
public CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId,
String node, UUID txId) {
- return sendCleanupMessageWithRetries(commitPartitionId, false, null,
txId, node, null);
+ return sendCleanupMessageWithRetries(commitPartitionId, false, null,
txId, node, null, 0);
}
/**
@@ -201,7 +218,7 @@ public class TxCleanupRequestSender {
enlistedPartitionGroups.add(new
EnlistedPartitionGroup(partitionId, partition.tableIds()));
});
- return cleanupPartitions(commitPartitionId, partitionsByPrimaryName,
commit, commitTimestamp, txId);
+ return cleanupPartitions(commitPartitionId, partitionsByPrimaryName,
commit, commitTimestamp, txId, 0);
}
/**
@@ -220,6 +237,17 @@ public class TxCleanupRequestSender {
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId
+ ) {
+ return cleanup(commitPartitionId, partitions, commit, commitTimestamp,
txId, 0);
+ }
+
+ private CompletableFuture<Void> cleanup(
+ @Nullable ZonePartitionId commitPartitionId,
+ Collection<EnlistedPartitionGroup> partitions,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp,
+ UUID txId,
+ int attemptsMade
) {
Map<ZonePartitionId, EnlistedPartitionGroup> partitionIds =
partitions.stream()
.collect(toMap(EnlistedPartitionGroup::groupId, identity()));
@@ -240,14 +268,22 @@ public class TxCleanupRequestSender {
commit,
commitTimestamp,
txId,
-
toPartitionInfos(partitionData.partitionsWithoutPrimary, partitionIds)
+
toPartitionInfos(partitionData.partitionsWithoutPrimary, partitionIds),
+ attemptsMade
);
Map<String, List<EnlistedPartitionGroup>>
partitionsByPrimaryName = toPartitionInfosByPrimaryName(
partitionData.partitionsByNode,
partitionIds
);
- return cleanupPartitions(commitPartitionId,
partitionsByPrimaryName, commit, commitTimestamp, txId);
+ return cleanupPartitions(
+ commitPartitionId,
+ partitionsByPrimaryName,
+ commit,
+ commitTimestamp,
+ txId,
+ attemptsMade
+ );
});
}
@@ -273,7 +309,8 @@ public class TxCleanupRequestSender {
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId,
- List<EnlistedPartitionGroup> partitionsWithoutPrimary
+ List<EnlistedPartitionGroup> partitionsWithoutPrimary,
+ int attemptsMade
) {
Map<ZonePartitionId, EnlistedPartitionGroup> partitionIds =
partitionsWithoutPrimary.stream()
.collect(toMap(EnlistedPartitionGroup::groupId, identity()));
@@ -286,7 +323,7 @@ public class TxCleanupRequestSender {
partitionIdsByPrimaryName,
partitionIds
);
- return cleanupPartitions(commitPartitionId,
partitionsByPrimaryName, commit, commitTimestamp, txId);
+ return cleanupPartitions(commitPartitionId,
partitionsByPrimaryName, commit, commitTimestamp, txId, attemptsMade);
});
}
@@ -295,7 +332,8 @@ public class TxCleanupRequestSender {
Map<String, List<EnlistedPartitionGroup>> partitionsByNode,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
- UUID txId
+ UUID txId,
+ int attemptsMade
) {
List<CompletableFuture<Void>> cleanupFutures = new ArrayList<>();
@@ -304,7 +342,7 @@ public class TxCleanupRequestSender {
List<EnlistedPartitionGroup> nodePartitions = entry.getValue();
cleanupFutures.add(sendCleanupMessageWithRetries(commitPartitionId, commit,
commitTimestamp, txId, node,
- commitPartitionId == null ? null : nodePartitions));
+ commitPartitionId == null ? null : nodePartitions,
attemptsMade));
}
return allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]));
@@ -316,12 +354,22 @@ public class TxCleanupRequestSender {
@Nullable HybridTimestamp commitTimestamp,
UUID txId,
String node,
- @Nullable Collection<EnlistedPartitionGroup> partitions
+ @Nullable Collection<EnlistedPartitionGroup> partitions,
+ int attemptsMade
) {
return txMessageSender.cleanup(node, partitions, txId, commit,
commitTimestamp)
- .handle((networkMessage, throwable) -> {
+ .handleAsync((networkMessage, throwable) -> {
if (throwable != null) {
if
(ReplicatorRecoverableExceptions.isRecoverable(throwable)) {
+ if (attemptsMade > ATTEMPTS_LOG_THRESHOLD) {
+ throttledLog.warn(
+ "Unsuccessful transaction cleanup
after {} attempts, keep retrying [txId={}]",
+ throwable,
+ ATTEMPTS_LOG_THRESHOLD,
+ txId
+ );
+ }
+
// In the case of a failure we repeat the process,
but start with finding correct primary replicas
// for this subset of partitions. If nothing
changed in terms of the nodes and primaries
// we eventually will call ourselves with the same
parameters.
@@ -332,12 +380,20 @@ public class TxCleanupRequestSender {
if (partitions == null) {
// If we don't have any partition, which is
the recovery or unlock only case,
// just try again with the same node.
- return
sendCleanupMessageWithRetries(commitPartitionId, commit, commitTimestamp, txId,
node, partitions);
+ return sendCleanupMessageWithRetries(
+ commitPartitionId,
+ commit,
+ commitTimestamp,
+ txId,
+ node,
+ partitions,
+ attemptsMade + 1
+ );
}
// Run a cleanup that finds new primaries for the
given partitions.
// This covers the case when a partition primary
died and we still want to switch write intents.
- return cleanup(commitPartitionId, partitions,
commit, commitTimestamp, txId);
+ return cleanup(commitPartitionId, partitions,
commit, commitTimestamp, txId, attemptsMade + 1);
}
return CompletableFuture.<Void>failedFuture(throwable);
@@ -356,7 +412,7 @@ public class TxCleanupRequestSender {
}
return CompletableFutures.<Void>nullCompletedFuture();
- })
+ }, cleanupExecutor)
.thenCompose(v -> v);
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 6f25ad4e6b1..76074bed750 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -389,8 +389,13 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
writeIntentSwitchPool
);
- txCleanupRequestSender =
- new TxCleanupRequestSender(txMessageSender,
placementDriverHelper, txStateVolatileStorage);
+ txCleanupRequestSender = new TxCleanupRequestSender(
+ txMessageSender,
+ placementDriverHelper,
+ txStateVolatileStorage,
+ writeIntentSwitchPool,
+ commonScheduler
+ );
txMetrics = new TransactionMetricsSource(clockService);
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
index fbbf820a26d..060af184b42 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
@@ -21,6 +21,7 @@ import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testSyncExecutorService;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -141,8 +142,13 @@ public class TxCleanupTest extends IgniteAbstractTest {
PlacementDriverHelper placementDriverHelper = new
PlacementDriverHelper(placementDriver, clockService);
- cleanupRequestSender = new TxCleanupRequestSender(txMessageSender,
placementDriverHelper, mock(
- VolatileTxStateMetaStorage.class));
+ cleanupRequestSender = new TxCleanupRequestSender(
+ txMessageSender,
+ placementDriverHelper,
+ mock(VolatileTxStateMetaStorage.class),
+ testSyncExecutorService(),
+ Runnable::run
+ );
}
@Test