This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8c108291dc4 IGNITE-28380 Fix NPE during txn cleanup (#7887)
8c108291dc4 is described below
commit 8c108291dc47cbd548de6be0c2593aea8035f824
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Apr 2 17:12:36 2026 +0300
IGNITE-28380 Fix NPE during txn cleanup (#7887)
---
...xStateCommitPartitionReplicaRequestHandler.java | 15 ++++-
.../internal/tx/impl/TxCleanupRequestSender.java | 69 +++++++++++-----------
2 files changed, 48 insertions(+), 36 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
index 9648a012d8d..fe0483cc8a6 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.partition.replicator.handlers;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.tx.TxState.ABANDONED;
import static org.apache.ignite.internal.tx.TxState.FINISHING;
import static org.apache.ignite.internal.tx.TxState.PENDING;
@@ -46,6 +47,7 @@ import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.impl.PlacementDriverHelper;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
+import org.apache.ignite.internal.tx.message.RowIdMessage;
import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import org.jetbrains.annotations.Nullable;
@@ -117,7 +119,7 @@ public class TxStateCommitPartitionReplicaRequestHandler {
request.senderCurrentConsistencyToken(),
senderGroupId,
senderId,
- request.rowId().asRowId(),
+ extractRowId(request.rowId()),
request.newestCommitTimestamp()
);
} else {
@@ -168,6 +170,12 @@ public class TxStateCommitPartitionReplicaRequestHandler {
// - txn is not finished, volatile state is lost
// - txn was finished, state was vacuumized
// both mean primary replica resolution path.
+ if (rowId == null) {
+ throw new IllegalStateException(format("Failed to resolve
transaction state for transaction "
+ + "using primary replica path, because row id is
not provided [txId={}, commitGroupId={}, tableId={}, "
+ + "senderGroupId={}, readTimestamp={}].", txId,
commitGroupId, tableId, senderGroupId, readTimestamp));
+ }
+
return resolveTxStateFromPrimaryReplica(
txId,
tableId,
@@ -321,4 +329,9 @@ public class TxStateCommitPartitionReplicaRequestHandler {
private void markAbandoned(UUID txId) {
txManager.updateTxMeta(txId, stateMeta -> stateMeta != null ?
stateMeta.abandoned() : null);
}
+
+ @Nullable
+ private static RowId extractRowId(@Nullable RowIdMessage rowIdMessage) {
+ return rowIdMessage == null ? null : rowIdMessage.asRowId();
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index 95ef78dcc41..953d0baf249 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -24,7 +24,6 @@ import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.internal.logger.Loggers.toThrottledLogger;
import static org.apache.ignite.internal.tx.TxStateMeta.builder;
-import static
org.apache.ignite.internal.tx.impl.TxStateResolutionParameters.txStateResolutionParameters;
import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
import static org.apache.ignite.internal.util.IgniteUtils.scheduleRetry;
@@ -51,9 +50,7 @@ import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.replicator.ReplicatorRecoverableExceptions;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tx.PartitionEnlistment;
-import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TxState;
-import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
import org.apache.ignite.internal.tx.message.CleanupReplicatedInfoMessage;
import org.apache.ignite.internal.tx.message.TxCleanupMessageErrorResponse;
@@ -149,41 +146,25 @@ public class TxCleanupRequestSender {
});
if (ctx != null && ctx.partitions.isEmpty()) {
- markTxnCleanupReplicated(info.txId(), ctx.txState,
ctx.commitPartitionId);
+ markTxnCleanupReplicated(info.txId(), ctx.txState,
ctx.commitTimestamp, ctx.commitPartitionId);
writeIntentsReplicated.remove(info.txId());
}
}
- private void markTxnCleanupReplicated(UUID txId, TxState state,
ZonePartitionId commitPartitionId) {
+ private void markTxnCleanupReplicated(
+ UUID txId,
+ TxState state,
+ @Nullable HybridTimestamp commitTimestamp,
+ ZonePartitionId commitPartitionId
+ ) {
long cleanupCompletionTimestamp = System.currentTimeMillis();
- TxStateMeta txStateMeta = txStateVolatileStorage.state(txId);
- final CompletableFuture<HybridTimestamp> commitTimestampFuture;
- if (state == TxState.COMMITTED && (txStateMeta == null ||
txStateMeta.commitTimestamp() == null)) {
- commitTimestampFuture =
placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(commitPartitionId)
- .thenCompose(replicaMeta -> {
- String primaryNode =
replicaMeta.getLeaseholder();
- HybridTimestamp startTime =
replicaMeta.getStartTime();
- return
txMessageSender.resolveTxStateFromCommitPartition(
-
txStateResolutionParameters().txId(txId).commitGroupId(commitPartitionId).build(),
- primaryNode,
- startTime.longValue()
- )
-
.thenApply(TransactionMeta::commitTimestamp);
- }
- );
- } else {
- HybridTimestamp existingCommitTs = txStateMeta == null ? null :
txStateMeta.commitTimestamp();
- commitTimestampFuture =
CompletableFuture.completedFuture(existingCommitTs);
- }
-
- commitTimestampFuture.thenAccept(commitTimestamp ->
- txStateVolatileStorage.updateMeta(txId, oldMeta ->
builder(oldMeta, state)
- .commitPartitionId(commitPartitionId)
- .commitTimestamp(commitTimestamp)
- .cleanupCompletionTimestamp(cleanupCompletionTimestamp)
- .build())
+ txStateVolatileStorage.updateMeta(txId, oldMeta -> builder(oldMeta,
state)
+ .commitPartitionId(commitPartitionId)
+ .commitTimestamp(commitTimestamp)
+ .cleanupCompletionTimestamp(cleanupCompletionTimestamp)
+ .build()
);
}
@@ -228,7 +209,12 @@ public class TxCleanupRequestSender {
if (commitPartitionId != null) {
writeIntentsReplicated.put(
txId,
- new CleanupContext(commitPartitionId,
enlistedPartitions.keySet(), commit ? TxState.COMMITTED : TxState.ABORTED)
+ new CleanupContext(
+ commitPartitionId,
+ enlistedPartitions.keySet(),
+ commit ? TxState.COMMITTED : TxState.ABORTED,
+ commitTimestamp
+ )
);
}
@@ -280,8 +266,12 @@ public class TxCleanupRequestSender {
// Start tracking the partitions we want to learn the replication
confirmation from.
writeIntentsReplicated.put(
txId,
- new CleanupContext(commitPartitionId, new
HashSet<>(partitionIds.keySet()),
- commit ? TxState.COMMITTED : TxState.ABORTED)
+ new CleanupContext(
+ commitPartitionId,
+ new HashSet<>(partitionIds.keySet()),
+ commit ? TxState.COMMITTED : TxState.ABORTED,
+ commitTimestamp
+ )
);
}
@@ -492,10 +482,19 @@ public class TxCleanupRequestSender {
*/
private final TxState txState;
- private CleanupContext(ZonePartitionId commitPartitionId,
Set<ZonePartitionId> partitions, TxState txState) {
+ @Nullable
+ private final HybridTimestamp commitTimestamp;
+
+ private CleanupContext(
+ ZonePartitionId commitPartitionId,
+ Set<ZonePartitionId> partitions,
+ TxState txState,
+ @Nullable HybridTimestamp commitTimestamp
+ ) {
this.commitPartitionId = commitPartitionId;
this.partitions = partitions;
this.txState = txState;
+ this.commitTimestamp = commitTimestamp;
}
}