This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new fd96bce081d IGNITE-26753 Fix inconsistent behaviour on concurrent 
commit and timeout with 1PC - Fixes #12434.
fd96bce081d is described below

commit fd96bce081df220e6e511fe4e1c22c866193124b
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Wed Oct 22 15:38:01 2025 +0300

    IGNITE-26753 Fix inconsistent behaviour on concurrent commit and timeout 
with 1PC - Fixes #12434.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 ...dNearOptimisticSerializableTxPrepareFuture.java |   2 +-
 .../near/GridNearOptimisticTxPrepareFuture.java    |   5 +-
 .../cache/distributed/near/GridNearTxLocal.java    |   5 +
 .../CommunicationMessageDelayTest.java             | 123 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite10.java  |   2 +
 5 files changed, 134 insertions(+), 3 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index cd39b95e998..c49744df809 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -271,7 +271,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
      * @param remap Remap flag.
      */
     @Override protected void prepare0(boolean remap, boolean topLocked) {
-        boolean txStateCheck = remap ? tx.state() == PREPARING : 
tx.state(PREPARING);
+        boolean txStateCheck = remap ? tx.state() == PREPARING || tx.state() 
== PREPARED : tx.state(PREPARING);
 
         if (!txStateCheck) {
             if (tx.isRollbackOnly() || tx.setRollbackOnly()) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 68b32610bc8..1858576e0d7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -313,7 +313,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
      */
     @Override protected void prepare0(boolean remap, boolean topLocked) {
         try {
-            boolean txStateCheck = remap ? tx.state() == PREPARING : 
tx.state(PREPARING);
+            boolean txStateCheck = remap ? tx.state() == PREPARING || 
tx.state() == PREPARED : tx.state(PREPARING);
 
             if (!txStateCheck) {
                 if (tx.isRollbackOnly() || tx.setRollbackOnly()) {
@@ -967,7 +967,8 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
                 return;
 
             if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
-                if (parent.tx.remainingTime() == -1 || res.error() instanceof 
IgniteTxTimeoutCheckedException) {
+                if ((parent.tx.state() != PREPARED && 
parent.tx.remainingTime() == -1)
+                    || res.error() instanceof IgniteTxTimeoutCheckedException) 
{
                     parent.onTimeout();
 
                     return;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c9788c1f346..5552517d51f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3381,6 +3381,11 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
 
         fut.prepare();
 
+        // In case of one phase commit move state one step forward to prohibit 
tx rollback on timeout after send message
+        // to remote node.
+        if (onePhaseCommit)
+            state(PREPARED);
+
         return fut;
     }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CommunicationMessageDelayTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CommunicationMessageDelayTest.java
new file mode 100644
index 00000000000..d124433ac50
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CommunicationMessageDelayTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.Test;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/** */
+public class CommunicationMessageDelayTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        return 
super.getConfiguration(igniteInstanceName).setCommunicationSpi(new 
DelayingTcpCommunicationSpi());
+    }
+
+    /** */
+    @Test
+    public void testTxConsistencyAfterTimeout1PC() throws Exception {
+        IgniteEx srv = startGrid(0);
+        IgniteEx client = startClientGrid(1);
+
+        checkTx(client, srv, PESSIMISTIC, REPEATABLE_READ);
+        checkTx(client, srv, OPTIMISTIC, REPEATABLE_READ);
+        checkTx(client, srv, OPTIMISTIC, SERIALIZABLE);
+    }
+
+    /** */
+    private void checkTx(
+        IgniteEx client,
+        IgniteEx srv,
+        TransactionConcurrency txConcurrency,
+        TransactionIsolation txIsolation
+    ) {
+        long txTimout = 1_000L;
+
+        IgniteCache<Object, Object> cache = client.getOrCreateCache(new 
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        Transaction tx = client.transactions().txStart(txConcurrency, 
txIsolation, txTimout, 0);
+
+        cache.put(0, 0);
+
+        DelayingTcpCommunicationSpi.delay(srv, 
GridNearTxPrepareResponse.class, 2 * txTimout);
+
+        try {
+            tx.commit();
+
+            assertTrue(cache.containsKey(0));
+        }
+        catch (Exception e) {
+            assertFalse(cache.containsKey(0));
+        }
+
+        cache.remove(0);
+    }
+
+    /** */
+    private static class DelayingTcpCommunicationSpi extends 
TcpCommunicationSpi {
+        /** */
+        private Class<?> delayMsg;
+
+        /** */
+        private long delayTime;
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(
+            ClusterNode node,
+            Message msg,
+            IgniteInClosure<IgniteException> ackClosure
+        ) throws IgniteSpiException {
+            if (delayMsg != null && 
((GridIoMessage)msg).message().getClass().equals(delayMsg))
+                doSleep(delayTime);
+
+            super.sendMessage(node, msg, ackClosure);
+        }
+
+        /** */
+        private void delay(Class<?> delayMsg, long delayTime) {
+            this.delayMsg = delayMsg;
+            this.delayTime = delayTime;
+        }
+
+        /** */
+        private static void delay(Ignite ignite, Class<?> delayMsg, long 
delayTime) {
+            
((DelayingTcpCommunicationSpi)ignite.configuration().getCommunicationSpi()).delay(delayMsg,
 delayTime);
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite10.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite10.java
index 826716fb9dc..18eef5f83ff 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite10.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite10.java
@@ -26,6 +26,7 @@ import 
org.apache.ignite.cache.store.CacheTransactionalStoreReadFromBackupTest;
 import org.apache.ignite.cache.store.GridStoreLoadCacheTest;
 import org.apache.ignite.cache.store.jdbc.dialect.OracleDialectTest;
 import org.apache.ignite.internal.IgniteInternalCacheRemoveTest;
+import 
org.apache.ignite.internal.managers.communication.CommunicationMessageDelayTest;
 import org.apache.ignite.internal.managers.communication.GridIoManagerSelfTest;
 import 
org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceMultipleConnectionsTest;
 import 
org.apache.ignite.internal.managers.communication.IgniteCommunicationBalancePairedConnectionsTest;
@@ -188,6 +189,7 @@ public class IgniteCacheTestSuite10 {
         GridTestUtils.addTestIfNeeded(suite, 
IgniteMessageFactoryImplTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
MessageDirectTypeIdConflictTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgniteIoCommunicationMessageSerializationTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
CommunicationMessageDelayTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, 
IgniteIncompleteCacheObjectSelfTest.class, ignoredTests);
 

Reply via email to