This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d33bd5c281 IGNITE-27014 Fix NPE in write intent resolution (#6960)
7d33bd5c281 is described below

commit 7d33bd5c281ee9a8a9874e3881b9a27bcbaac9b1
Author: Egor <[email protected]>
AuthorDate: Thu Nov 20 20:50:42 2025 +0400

    IGNITE-27014 Fix NPE in write intent resolution (#6960)
    
    Co-authored-by: Egor Kuts <[email protected]>
---
 .../tx/distributed/ItTxResourcesVacuumTest.java    |  9 ++++----
 .../internal/tx/impl/TxCleanupRequestSender.java   | 25 ++++++++++++++++++++--
 2 files changed, 27 insertions(+), 7 deletions(-)

diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxResourcesVacuumTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxResourcesVacuumTest.java
index 304aeae44ee..701adb460db 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxResourcesVacuumTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxResourcesVacuumTest.java
@@ -52,6 +52,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.InitParametersBuilder;
@@ -85,7 +86,6 @@ import org.apache.ignite.tx.TransactionOptions;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -486,7 +486,6 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
      * </ul>
      */
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-27014";)
     public void testCommitPartitionPrimaryChangesBeforeVacuum() throws 
InterruptedException {
         // We can't leave TTL as 0 here, because the primary replica is 
changed during cleanup, and this means
         // WriteIntentSwitchReplicaRequest will be processed not on the 
primary. Removing tx state instantly will cause incorrect
@@ -526,7 +525,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
         view.upsert(tx, tuple);
 
         CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
-        boolean[] cleanupAllowed = new boolean[1];
+        AtomicBoolean cleanupAllowed = new AtomicBoolean();
 
         commitPartitionLeaseholder.dropMessages((n, msg) -> {
             if (msg instanceof TxCleanupMessage) {
@@ -534,7 +533,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
 
                 cleanupStarted.complete(null);
 
-                if (!cleanupAllowed[0]) {
+                if (!cleanupAllowed.get()) {
                     log.info("Test: dropping cleanup on [node= {}].", n);
 
                     return true;
@@ -556,7 +555,7 @@ public class ItTxResourcesVacuumTest extends 
ClusterPerTestIntegrationTest {
                 commitPartNodes::contains
         );
 
-        cleanupAllowed[0] = true;
+        cleanupAllowed.set(true);
 
         assertThat(commitFut, willCompleteSuccessfully());
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index 52f059ccdaf..5ba10589260 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.ReplicatorRecoverableExceptions;
 import org.apache.ignite.internal.tx.PartitionEnlistment;
+import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
@@ -122,17 +123,37 @@ public class TxCleanupRequestSender {
     private void markTxnCleanupReplicated(UUID txId, TxState state, 
ReplicationGroupId commitPartitionId) {
         long cleanupCompletionTimestamp = System.currentTimeMillis();
 
-        txStateVolatileStorage.updateMeta(txId, oldMeta ->
+        TxStateMeta txStateMeta = txStateVolatileStorage.state(txId);
+        final CompletableFuture<HybridTimestamp> commitTimestampFuture;
+        if (state == TxState.COMMITTED && (txStateMeta == null || 
txStateMeta.commitTimestamp() == null)) {
+            commitTimestampFuture = 
placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(commitPartitionId)
+                    .thenCompose(replicaMeta -> {
+                                String primaryNode = 
replicaMeta.getLeaseholder();
+                                HybridTimestamp startTime = 
replicaMeta.getStartTime();
+                                return 
txMessageSender.resolveTxStateFromCommitPartition(
+                                        primaryNode,
+                                        txId,
+                                        commitPartitionId,
+                                        
startTime.longValue()).thenApply(TransactionMeta::commitTimestamp);
+                            }
+                    );
+        } else {
+            HybridTimestamp existingCommitTs = txStateMeta == null ? null : 
txStateMeta.commitTimestamp();
+            commitTimestampFuture = 
CompletableFuture.completedFuture(existingCommitTs);
+        }
+
+        commitTimestampFuture.thenAccept(commitTimestamp -> 
txStateVolatileStorage.updateMeta(txId, oldMeta ->
                 new TxStateMeta(
                         oldMeta == null ? state : oldMeta.txState(),
                         oldMeta == null ? null : oldMeta.txCoordinatorId(),
                         commitPartitionId,
-                        oldMeta == null ? null : oldMeta.commitTimestamp(),
+                        oldMeta == null ? commitTimestamp : 
oldMeta.commitTimestamp(),
                         oldMeta == null ? null : oldMeta.tx(),
                         oldMeta == null ? null : 
oldMeta.initialVacuumObservationTimestamp(),
                         cleanupCompletionTimestamp,
                         oldMeta == null ? null : 
oldMeta.isFinishedDueToTimeout()
                 )
+                )
         );
     }
 

Reply via email to