This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 72fcb8b0fb8 IGNITE-25464 Fix double write intent switch application
(#5894)
72fcb8b0fb8 is described below
commit 72fcb8b0fb811b6a6d823790a700175cfe4a8be1
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon May 26 21:55:41 2025 +0400
IGNITE-25464 Fix double write intent switch application (#5894)
---
.../apache/ignite/internal/tx/ItTxCleanupTest.java | 85 ++++++++++++++++++++++
.../internal/tx/impl/TxCleanupExceptionUtils.java | 29 ++++++++
.../internal/tx/impl/TxCleanupRequestHandler.java | 43 ++++++-----
.../internal/tx/impl/TxCleanupRequestSender.java | 17 +++++
.../ignite/internal/tx/impl/TxMessageSender.java | 5 +-
.../tx/impl/WriteIntentSwitchProcessor.java | 10 +--
6 files changed, 165 insertions(+), 24 deletions(-)
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxCleanupTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxCleanupTest.java
new file mode 100644
index 00000000000..b758fdbd0dd
--- /dev/null
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxCleanupTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
+import org.apache.ignite.table.KeyValueView;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class ItTxCleanupTest extends ClusterPerTestIntegrationTest {
+ public static final String TABLE_NAME = "TEST_TABLE";
+ private IgniteImpl ignite;
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @BeforeEach
+ void init() {
+ ignite = igniteImpl(0);
+ }
+
+ @ParameterizedTest(name = "readsOnly = {0}")
+ @ValueSource(booleans = {true, false})
+ void writeIntentSwitchHappensOncePerTx(boolean readsOnly) throws Exception
{
+ ignite.sql().executeScript(
+ "CREATE ZONE TEST_ZONE (partitions 1, replicas 1) storage
profiles ['" + CatalogService.DEFAULT_STORAGE_PROFILE + "']; "
+ + "CREATE TABLE " + TABLE_NAME + " (ID INT PRIMARY
KEY, VAL VARCHAR(20)) ZONE TEST_ZONE"
+ );
+
+ AtomicInteger writeIntentSwitchRequestCount = new AtomicInteger();
+
+ ignite.dropMessages((recipientId, message) -> {
+ if (message instanceof WriteIntentSwitchReplicaRequest) {
+ writeIntentSwitchRequestCount.incrementAndGet();
+ }
+
+ return false;
+ });
+
+ ignite.transactions().runInTransaction(tx -> {
+ KeyValueView<Integer, String> view =
ignite.tables().table(TABLE_NAME).keyValueView(Integer.class, String.class);
+
+ if (readsOnly) {
+ view.get(tx, 1);
+ } else {
+ view.put(tx, 1, "one");
+ }
+ });
+
+ // We should see one WI switch...
+ assertTrue(waitForCondition(() -> writeIntentSwitchRequestCount.get()
> 0, SECONDS.toMillis(10)));
+
+ // ... but not more than one.
+ waitForCondition(() -> writeIntentSwitchRequestCount.get() > 1,
SECONDS.toMillis(1));
+ assertThat(writeIntentSwitchRequestCount.get(), is(1));
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupExceptionUtils.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupExceptionUtils.java
new file mode 100644
index 00000000000..a5cfefa9d7b
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupExceptionUtils.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+
+class TxCleanupExceptionUtils {
+ static boolean writeIntentSwitchFailureShouldBeLogged(Throwable failure) {
+ return !hasCause(failure, NodeStoppingException.class,
TimeoutException.class);
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
index 165f6187188..f182311c5b3 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.tx.impl;
import static java.util.concurrent.CompletableFuture.allOf;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage;
+import static
org.apache.ignite.internal.tx.impl.TxCleanupExceptionUtils.writeIntentSwitchFailureShouldBeLogged;
import java.util.ArrayList;
import java.util.Collection;
@@ -34,12 +35,13 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ChannelType;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
-import org.apache.ignite.internal.replicator.message.ReplicaResponse;
import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
@@ -57,6 +59,8 @@ import org.jetbrains.annotations.Nullable;
* Handles TX Cleanup request ({@link TxCleanupMessage}).
*/
public class TxCleanupRequestHandler {
+ private static final IgniteLogger LOG =
Loggers.forClass(TxCleanupRequestHandler.class);
+
/** Tx messages factory. */
private static final TxMessagesFactory TX_MESSAGES_FACTORY = new
TxMessagesFactory();
@@ -172,12 +176,22 @@ public class TxCleanupRequestHandler {
// No need to wait on this future.
writeIntentSwitches.forEach((groupId, future) -> {
if (future.isCompletedExceptionally()) {
-
writeIntentSwitchProcessor.switchWriteIntentsWithRetry(
- txCleanupMessage.commit(),
- txCleanupMessage.commitTimestamp(),
- txCleanupMessage.txId(),
- groupId
-
).thenAccept(this::processWriteIntentSwitchResponse);
+ writeIntentSwitchProcessor
+ .switchWriteIntentsWithRetry(
+ txCleanupMessage.commit(),
+
txCleanupMessage.commitTimestamp(),
+ txCleanupMessage.txId(),
+ groupId
+ )
+
.thenAccept(this::processWriteIntentSwitchResponse)
+ .whenComplete((retryRes, retryEx) -> {
+ if (retryEx != null &&
writeIntentSwitchFailureShouldBeLogged(retryEx)) {
+ LOG.warn(
+ "Second cleanup
attempt failed (the transaction outcome is not affected) [txId={}]",
+ retryEx,
txCleanupMessage.txId()
+ );
+ }
+ });
}
});
}
@@ -226,21 +240,16 @@ public class TxCleanupRequestHandler {
}
/**
- * Process the replication response from a write intent switch request.
+ * Process the replication response from a write intent switch result.
*
- * @param response Write intent replication response.
+ * @param result Write intent replication result.
*/
- private void processWriteIntentSwitchResponse(ReplicaResponse response) {
- if (response == null) {
+ private void
processWriteIntentSwitchResponse(WriteIntentSwitchReplicatedInfo result) {
+ if (result == null) {
return;
}
- Object result = response.result();
-
- assert (result instanceof WriteIntentSwitchReplicatedInfo) :
- "Unexpected type of cleanup replication response: [result=" +
result + "].";
-
- writeIntentSwitchReplicated((WriteIntentSwitchReplicatedInfo) result);
+ writeIntentSwitchReplicated(result);
}
/**
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index 16bac3bbdb2..b66c05b6c5c 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
+import static
org.apache.ignite.internal.tx.impl.TxCleanupExceptionUtils.writeIntentSwitchFailureShouldBeLogged;
import java.util.ArrayList;
import java.util.Collection;
@@ -35,6 +36,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.tx.PartitionEnlistment;
import org.apache.ignite.internal.tx.TxState;
@@ -52,6 +55,8 @@ import org.jetbrains.annotations.Nullable;
* Sends TX Cleanup request.
*/
public class TxCleanupRequestSender {
+ private static final IgniteLogger LOG =
Loggers.forClass(TxCleanupRequestSender.class);
+
/** Placement driver helper. */
private final PlacementDriverHelper placementDriverHelper;
@@ -313,6 +318,18 @@ public class TxCleanupRequestSender {
return CompletableFuture.<Void>failedFuture(throwable);
}
+ if (networkMessage instanceof
TxCleanupMessageErrorResponse) {
+ TxCleanupMessageErrorResponse errorResponse =
(TxCleanupMessageErrorResponse) networkMessage;
+ if
(writeIntentSwitchFailureShouldBeLogged(errorResponse.throwable())) {
+ LOG.warn(
+ "First cleanup attempt failed (the
transaction outcome is not affected) [txId={}]",
+ errorResponse.throwable(), txId
+ );
+ }
+
+ // We don't fail the resulting future as a failing
cleanup is not a problem.
+ }
+
return CompletableFutures.<Void>nullCompletedFuture();
})
.thenCompose(v -> v);
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
index 4cf305d66a9..7fb40c0f2d8 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
@@ -42,6 +42,7 @@ import
org.apache.ignite.internal.tx.message.EnlistedPartitionGroupMessage;
import org.apache.ignite.internal.tx.message.PartitionEnlistmentMessage;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.message.TxStateResponse;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo;
import org.jetbrains.annotations.Nullable;
/**
@@ -89,9 +90,9 @@ public class TxMessageSender {
* @param txId Transaction id.
* @param commit {@code True} if a commit requested.
* @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
- * @return Completable future of ReplicaResponse.
+ * @return Completable future of WriteIntentSwitchReplicatedInfo.
*/
- public CompletableFuture<ReplicaResponse> switchWriteIntents(
+ public CompletableFuture<WriteIntentSwitchReplicatedInfo>
switchWriteIntents(
String primaryConsistentId,
EnlistedPartitionGroup partition,
UUID txId,
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
index 6ddee2e544e..8aa3d107ecc 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
@@ -24,8 +24,8 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.TopologyService;
-import org.apache.ignite.internal.replicator.message.ReplicaResponse;
import
org.apache.ignite.internal.tx.impl.TxManagerImpl.TransactionFailureHandler;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.jetbrains.annotations.Nullable;
@@ -65,7 +65,7 @@ public class WriteIntentSwitchProcessor {
/**
* Run switch write intent on the provided node.
*/
- public CompletableFuture<ReplicaResponse> switchLocalWriteIntents(
+ public CompletableFuture<WriteIntentSwitchReplicatedInfo>
switchLocalWriteIntents(
EnlistedPartitionGroup partition,
UUID txId,
boolean commit,
@@ -79,7 +79,7 @@ public class WriteIntentSwitchProcessor {
/**
* Run switch write intent on the primary node of the provided partition
in a durable manner.
*/
- public CompletableFuture<ReplicaResponse> switchWriteIntentsWithRetry(
+ public CompletableFuture<WriteIntentSwitchReplicatedInfo>
switchWriteIntentsWithRetry(
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId,
@@ -100,10 +100,10 @@ public class WriteIntentSwitchProcessor {
LOG.info("Failed to switch write intents for Tx
[txId={}].", txId, ex);
- return
CompletableFuture.<ReplicaResponse>failedFuture(ex);
+ return
CompletableFuture.<WriteIntentSwitchReplicatedInfo>failedFuture(ex);
}
- return
CompletableFutures.<ReplicaResponse>nullCompletedFuture();
+ return
CompletableFutures.<WriteIntentSwitchReplicatedInfo>nullCompletedFuture();
})
.thenCompose(Function.identity());
}