This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 72fcb8b0fb8 IGNITE-25464 Fix double write intent switch application 
(#5894)
72fcb8b0fb8 is described below

commit 72fcb8b0fb811b6a6d823790a700175cfe4a8be1
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon May 26 21:55:41 2025 +0400

    IGNITE-25464 Fix double write intent switch application (#5894)
---
 .../apache/ignite/internal/tx/ItTxCleanupTest.java | 85 ++++++++++++++++++++++
 .../internal/tx/impl/TxCleanupExceptionUtils.java  | 29 ++++++++
 .../internal/tx/impl/TxCleanupRequestHandler.java  | 43 ++++++-----
 .../internal/tx/impl/TxCleanupRequestSender.java   | 17 +++++
 .../ignite/internal/tx/impl/TxMessageSender.java   |  5 +-
 .../tx/impl/WriteIntentSwitchProcessor.java        | 10 +--
 6 files changed, 165 insertions(+), 24 deletions(-)

diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxCleanupTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxCleanupTest.java
new file mode 100644
index 00000000000..b758fdbd0dd
--- /dev/null
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxCleanupTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
+import org.apache.ignite.table.KeyValueView;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class ItTxCleanupTest extends ClusterPerTestIntegrationTest {
+    public static final String TABLE_NAME = "TEST_TABLE";
+    private IgniteImpl ignite;
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @BeforeEach
+    void init() {
+        ignite = igniteImpl(0);
+    }
+
+    @ParameterizedTest(name = "readsOnly = {0}")
+    @ValueSource(booleans = {true, false})
+    void writeIntentSwitchHappensOncePerTx(boolean readsOnly) throws Exception 
{
+        ignite.sql().executeScript(
+                "CREATE ZONE TEST_ZONE (partitions 1, replicas 1) storage 
profiles ['" + CatalogService.DEFAULT_STORAGE_PROFILE + "']; "
+                        + "CREATE TABLE " + TABLE_NAME + " (ID INT PRIMARY 
KEY, VAL VARCHAR(20)) ZONE TEST_ZONE"
+        );
+
+        AtomicInteger writeIntentSwitchRequestCount = new AtomicInteger();
+
+        ignite.dropMessages((recipientId, message) -> {
+            if (message instanceof WriteIntentSwitchReplicaRequest) {
+                writeIntentSwitchRequestCount.incrementAndGet();
+            }
+
+            return false;
+        });
+
+        ignite.transactions().runInTransaction(tx -> {
+            KeyValueView<Integer, String> view = 
ignite.tables().table(TABLE_NAME).keyValueView(Integer.class, String.class);
+
+            if (readsOnly) {
+                view.get(tx, 1);
+            } else {
+                view.put(tx, 1, "one");
+            }
+        });
+
+        // We should see one WI switch...
+        assertTrue(waitForCondition(() -> writeIntentSwitchRequestCount.get() 
> 0, SECONDS.toMillis(10)));
+
+        // ... but not more than one.
+        waitForCondition(() -> writeIntentSwitchRequestCount.get() > 1, 
SECONDS.toMillis(1));
+        assertThat(writeIntentSwitchRequestCount.get(), is(1));
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupExceptionUtils.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupExceptionUtils.java
new file mode 100644
index 00000000000..a5cfefa9d7b
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupExceptionUtils.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+
+class TxCleanupExceptionUtils {
+    static boolean writeIntentSwitchFailureShouldBeLogged(Throwable failure) {
+        return !hasCause(failure, NodeStoppingException.class, 
TimeoutException.class);
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
index 165f6187188..f182311c5b3 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.tx.impl;
 
 import static java.util.concurrent.CompletableFuture.allOf;
 import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage;
+import static 
org.apache.ignite.internal.tx.impl.TxCleanupExceptionUtils.writeIntentSwitchFailureShouldBeLogged;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -34,12 +35,13 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.ChannelType;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
-import org.apache.ignite.internal.replicator.message.ReplicaResponse;
 import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
@@ -57,6 +59,8 @@ import org.jetbrains.annotations.Nullable;
  * Handles TX Cleanup request ({@link TxCleanupMessage}).
  */
 public class TxCleanupRequestHandler {
+    private static final IgniteLogger LOG = 
Loggers.forClass(TxCleanupRequestHandler.class);
+
     /** Tx messages factory. */
     private static final TxMessagesFactory TX_MESSAGES_FACTORY = new 
TxMessagesFactory();
 
@@ -172,12 +176,22 @@ public class TxCleanupRequestHandler {
                         // No need to wait on this future.
                         writeIntentSwitches.forEach((groupId, future) -> {
                             if (future.isCompletedExceptionally()) {
-                                
writeIntentSwitchProcessor.switchWriteIntentsWithRetry(
-                                        txCleanupMessage.commit(),
-                                        txCleanupMessage.commitTimestamp(),
-                                        txCleanupMessage.txId(),
-                                        groupId
-                                
).thenAccept(this::processWriteIntentSwitchResponse);
+                                writeIntentSwitchProcessor
+                                        .switchWriteIntentsWithRetry(
+                                                txCleanupMessage.commit(),
+                                                
txCleanupMessage.commitTimestamp(),
+                                                txCleanupMessage.txId(),
+                                                groupId
+                                        )
+                                        
.thenAccept(this::processWriteIntentSwitchResponse)
+                                        .whenComplete((retryRes, retryEx) -> {
+                                            if (retryEx != null && 
writeIntentSwitchFailureShouldBeLogged(retryEx)) {
+                                                LOG.warn(
+                                                        "Second cleanup 
attempt failed (the transaction outcome is not affected) [txId={}]",
+                                                        retryEx, 
txCleanupMessage.txId()
+                                                );
+                                            }
+                                        });
                             }
                         });
                     }
@@ -226,21 +240,16 @@ public class TxCleanupRequestHandler {
     }
 
     /**
-     * Process the replication response from a write intent switch request.
+     * Process the replication response from a write intent switch result.
      *
-     * @param response Write intent replication response.
+     * @param result Write intent replication result.
      */
-    private void processWriteIntentSwitchResponse(ReplicaResponse response) {
-        if (response == null) {
+    private void 
processWriteIntentSwitchResponse(WriteIntentSwitchReplicatedInfo result) {
+        if (result == null) {
             return;
         }
 
-        Object result = response.result();
-
-        assert (result instanceof WriteIntentSwitchReplicatedInfo) :
-                "Unexpected type of cleanup replication response: [result=" + 
result + "].";
-
-        writeIntentSwitchReplicated((WriteIntentSwitchReplicatedInfo) result);
+        writeIntentSwitchReplicated(result);
     }
 
     /**
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index 16bac3bbdb2..b66c05b6c5c 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
+import static 
org.apache.ignite.internal.tx.impl.TxCleanupExceptionUtils.writeIntentSwitchFailureShouldBeLogged;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -35,6 +36,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.tx.PartitionEnlistment;
 import org.apache.ignite.internal.tx.TxState;
@@ -52,6 +55,8 @@ import org.jetbrains.annotations.Nullable;
  * Sends TX Cleanup request.
  */
 public class TxCleanupRequestSender {
+    private static final IgniteLogger LOG = 
Loggers.forClass(TxCleanupRequestSender.class);
+
     /** Placement driver helper. */
     private final PlacementDriverHelper placementDriverHelper;
 
@@ -313,6 +318,18 @@ public class TxCleanupRequestSender {
                         return CompletableFuture.<Void>failedFuture(throwable);
                     }
 
+                    if (networkMessage instanceof 
TxCleanupMessageErrorResponse) {
+                        TxCleanupMessageErrorResponse errorResponse = 
(TxCleanupMessageErrorResponse) networkMessage;
+                        if 
(writeIntentSwitchFailureShouldBeLogged(errorResponse.throwable())) {
+                            LOG.warn(
+                                    "First cleanup attempt failed (the 
transaction outcome is not affected) [txId={}]",
+                                    errorResponse.throwable(), txId
+                            );
+                        }
+
+                        // We don't fail the resulting future as a failing 
cleanup is not a problem.
+                    }
+
                     return CompletableFutures.<Void>nullCompletedFuture();
                 })
                 .thenCompose(v -> v);
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
index 4cf305d66a9..7fb40c0f2d8 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
@@ -42,6 +42,7 @@ import 
org.apache.ignite.internal.tx.message.EnlistedPartitionGroupMessage;
 import org.apache.ignite.internal.tx.message.PartitionEnlistmentMessage;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.tx.message.TxStateResponse;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -89,9 +90,9 @@ public class TxMessageSender {
      * @param txId Transaction id.
      * @param commit {@code True} if a commit requested.
      * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
-     * @return Completable future of ReplicaResponse.
+     * @return Completable future of WriteIntentSwitchReplicatedInfo.
      */
-    public CompletableFuture<ReplicaResponse> switchWriteIntents(
+    public CompletableFuture<WriteIntentSwitchReplicatedInfo> 
switchWriteIntents(
             String primaryConsistentId,
             EnlistedPartitionGroup partition,
             UUID txId,
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
index 6ddee2e544e..8aa3d107ecc 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
@@ -24,8 +24,8 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.TopologyService;
-import org.apache.ignite.internal.replicator.message.ReplicaResponse;
 import 
org.apache.ignite.internal.tx.impl.TxManagerImpl.TransactionFailureHandler;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.jetbrains.annotations.Nullable;
@@ -65,7 +65,7 @@ public class WriteIntentSwitchProcessor {
     /**
      * Run switch write intent on the provided node.
      */
-    public CompletableFuture<ReplicaResponse> switchLocalWriteIntents(
+    public CompletableFuture<WriteIntentSwitchReplicatedInfo> 
switchLocalWriteIntents(
             EnlistedPartitionGroup partition,
             UUID txId,
             boolean commit,
@@ -79,7 +79,7 @@ public class WriteIntentSwitchProcessor {
     /**
      * Run switch write intent on the primary node of the provided partition 
in a durable manner.
      */
-    public CompletableFuture<ReplicaResponse> switchWriteIntentsWithRetry(
+    public CompletableFuture<WriteIntentSwitchReplicatedInfo> 
switchWriteIntentsWithRetry(
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId,
@@ -100,10 +100,10 @@ public class WriteIntentSwitchProcessor {
 
                         LOG.info("Failed to switch write intents for Tx 
[txId={}].", txId, ex);
 
-                        return 
CompletableFuture.<ReplicaResponse>failedFuture(ex);
+                        return 
CompletableFuture.<WriteIntentSwitchReplicatedInfo>failedFuture(ex);
                     }
 
-                    return 
CompletableFutures.<ReplicaResponse>nullCompletedFuture();
+                    return 
CompletableFutures.<WriteIntentSwitchReplicatedInfo>nullCompletedFuture();
                 })
                 .thenCompose(Function.identity());
     }

Reply via email to