This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7d33bd5c281 IGNITE-27014 Fix NPE in write intent resolution (#6960)
7d33bd5c281 is described below
commit 7d33bd5c281ee9a8a9874e3881b9a27bcbaac9b1
Author: Egor <[email protected]>
AuthorDate: Thu Nov 20 20:50:42 2025 +0400
IGNITE-27014 Fix NPE in write intent resolution (#6960)
Co-authored-by: Egor Kuts <[email protected]>
---
.../tx/distributed/ItTxResourcesVacuumTest.java | 9 ++++----
.../internal/tx/impl/TxCleanupRequestSender.java | 25 ++++++++++++++++++++--
2 files changed, 27 insertions(+), 7 deletions(-)
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxResourcesVacuumTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxResourcesVacuumTest.java
index 304aeae44ee..701adb460db 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxResourcesVacuumTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxResourcesVacuumTest.java
@@ -52,6 +52,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.ignite.Ignite;
import org.apache.ignite.InitParametersBuilder;
@@ -85,7 +86,6 @@ import org.apache.ignite.tx.TransactionOptions;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -486,7 +486,6 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
* </ul>
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-27014")
public void testCommitPartitionPrimaryChangesBeforeVacuum() throws
InterruptedException {
// We can't leave TTL as 0 here, because the primary replica is
changed during cleanup, and this means
// WriteIntentSwitchReplicaRequest will be processed not on the
primary. Removing tx state instantly will cause incorrect
@@ -526,7 +525,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
view.upsert(tx, tuple);
CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
- boolean[] cleanupAllowed = new boolean[1];
+ AtomicBoolean cleanupAllowed = new AtomicBoolean();
commitPartitionLeaseholder.dropMessages((n, msg) -> {
if (msg instanceof TxCleanupMessage) {
@@ -534,7 +533,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
cleanupStarted.complete(null);
- if (!cleanupAllowed[0]) {
+ if (!cleanupAllowed.get()) {
log.info("Test: dropping cleanup on [node= {}].", n);
return true;
@@ -556,7 +555,7 @@ public class ItTxResourcesVacuumTest extends
ClusterPerTestIntegrationTest {
commitPartNodes::contains
);
- cleanupAllowed[0] = true;
+ cleanupAllowed.set(true);
assertThat(commitFut, willCompleteSuccessfully());
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index 52f059ccdaf..5ba10589260 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.ReplicatorRecoverableExceptions;
import org.apache.ignite.internal.tx.PartitionEnlistment;
+import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
@@ -122,17 +123,37 @@ public class TxCleanupRequestSender {
private void markTxnCleanupReplicated(UUID txId, TxState state,
ReplicationGroupId commitPartitionId) {
long cleanupCompletionTimestamp = System.currentTimeMillis();
- txStateVolatileStorage.updateMeta(txId, oldMeta ->
+ TxStateMeta txStateMeta = txStateVolatileStorage.state(txId);
+ final CompletableFuture<HybridTimestamp> commitTimestampFuture;
+ if (state == TxState.COMMITTED && (txStateMeta == null ||
txStateMeta.commitTimestamp() == null)) {
+ commitTimestampFuture =
placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(commitPartitionId)
+ .thenCompose(replicaMeta -> {
+ String primaryNode =
replicaMeta.getLeaseholder();
+ HybridTimestamp startTime =
replicaMeta.getStartTime();
+ return
txMessageSender.resolveTxStateFromCommitPartition(
+ primaryNode,
+ txId,
+ commitPartitionId,
+
startTime.longValue()).thenApply(TransactionMeta::commitTimestamp);
+ }
+ );
+ } else {
+ HybridTimestamp existingCommitTs = txStateMeta == null ? null :
txStateMeta.commitTimestamp();
+ commitTimestampFuture =
CompletableFuture.completedFuture(existingCommitTs);
+ }
+
+ commitTimestampFuture.thenAccept(commitTimestamp ->
txStateVolatileStorage.updateMeta(txId, oldMeta ->
new TxStateMeta(
oldMeta == null ? state : oldMeta.txState(),
oldMeta == null ? null : oldMeta.txCoordinatorId(),
commitPartitionId,
- oldMeta == null ? null : oldMeta.commitTimestamp(),
+ oldMeta == null ? commitTimestamp :
oldMeta.commitTimestamp(),
oldMeta == null ? null : oldMeta.tx(),
oldMeta == null ? null :
oldMeta.initialVacuumObservationTimestamp(),
cleanupCompletionTimestamp,
oldMeta == null ? null :
oldMeta.isFinishedDueToTimeout()
)
+ )
);
}