This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new fd96bce081d IGNITE-26753 Fix inconsistent behaviour on concurrent
commit and timeout with 1PC - Fixes #12434.
fd96bce081d is described below
commit fd96bce081df220e6e511fe4e1c22c866193124b
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Wed Oct 22 15:38:01 2025 +0300
IGNITE-26753 Fix inconsistent behaviour on concurrent commit and timeout
with 1PC - Fixes #12434.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
...dNearOptimisticSerializableTxPrepareFuture.java | 2 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 5 +-
.../cache/distributed/near/GridNearTxLocal.java | 5 +
.../CommunicationMessageDelayTest.java | 123 +++++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite10.java | 2 +
5 files changed, 134 insertions(+), 3 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index cd39b95e998..c49744df809 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -271,7 +271,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture
extends GridNearOptim
* @param remap Remap flag.
*/
@Override protected void prepare0(boolean remap, boolean topLocked) {
- boolean txStateCheck = remap ? tx.state() == PREPARING :
tx.state(PREPARING);
+ boolean txStateCheck = remap ? tx.state() == PREPARING || tx.state()
== PREPARED : tx.state(PREPARING);
if (!txStateCheck) {
if (tx.isRollbackOnly() || tx.setRollbackOnly()) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 68b32610bc8..1858576e0d7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -313,7 +313,7 @@ public class GridNearOptimisticTxPrepareFuture extends
GridNearOptimisticTxPrepa
*/
@Override protected void prepare0(boolean remap, boolean topLocked) {
try {
- boolean txStateCheck = remap ? tx.state() == PREPARING :
tx.state(PREPARING);
+ boolean txStateCheck = remap ? tx.state() == PREPARING ||
tx.state() == PREPARED : tx.state(PREPARING);
if (!txStateCheck) {
if (tx.isRollbackOnly() || tx.setRollbackOnly()) {
@@ -967,7 +967,8 @@ public class GridNearOptimisticTxPrepareFuture extends
GridNearOptimisticTxPrepa
return;
if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
- if (parent.tx.remainingTime() == -1 || res.error() instanceof
IgniteTxTimeoutCheckedException) {
+ if ((parent.tx.state() != PREPARED &&
parent.tx.remainingTime() == -1)
+ || res.error() instanceof IgniteTxTimeoutCheckedException)
{
parent.onTimeout();
return;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c9788c1f346..5552517d51f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3381,6 +3381,11 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
fut.prepare();
+ // In case of one phase commit move state one step forward to prohibit
tx rollback on timeout after send message
+ // to remote node.
+ if (onePhaseCommit)
+ state(PREPARED);
+
return fut;
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CommunicationMessageDelayTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CommunicationMessageDelayTest.java
new file mode 100644
index 00000000000..d124433ac50
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CommunicationMessageDelayTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.Test;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/** */
+public class CommunicationMessageDelayTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return
super.getConfiguration(igniteInstanceName).setCommunicationSpi(new
DelayingTcpCommunicationSpi());
+ }
+
+ /** */
+ @Test
+ public void testTxConsistencyAfterTimeout1PC() throws Exception {
+ IgniteEx srv = startGrid(0);
+ IgniteEx client = startClientGrid(1);
+
+ checkTx(client, srv, PESSIMISTIC, REPEATABLE_READ);
+ checkTx(client, srv, OPTIMISTIC, REPEATABLE_READ);
+ checkTx(client, srv, OPTIMISTIC, SERIALIZABLE);
+ }
+
+ /** */
+ private void checkTx(
+ IgniteEx client,
+ IgniteEx srv,
+ TransactionConcurrency txConcurrency,
+ TransactionIsolation txIsolation
+ ) {
+ long txTimout = 1_000L;
+
+ IgniteCache<Object, Object> cache = client.getOrCreateCache(new
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+ Transaction tx = client.transactions().txStart(txConcurrency,
txIsolation, txTimout, 0);
+
+ cache.put(0, 0);
+
+ DelayingTcpCommunicationSpi.delay(srv,
GridNearTxPrepareResponse.class, 2 * txTimout);
+
+ try {
+ tx.commit();
+
+ assertTrue(cache.containsKey(0));
+ }
+ catch (Exception e) {
+ assertFalse(cache.containsKey(0));
+ }
+
+ cache.remove(0);
+ }
+
+ /** */
+ private static class DelayingTcpCommunicationSpi extends
TcpCommunicationSpi {
+ /** */
+ private Class<?> delayMsg;
+
+ /** */
+ private long delayTime;
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(
+ ClusterNode node,
+ Message msg,
+ IgniteInClosure<IgniteException> ackClosure
+ ) throws IgniteSpiException {
+ if (delayMsg != null &&
((GridIoMessage)msg).message().getClass().equals(delayMsg))
+ doSleep(delayTime);
+
+ super.sendMessage(node, msg, ackClosure);
+ }
+
+ /** */
+ private void delay(Class<?> delayMsg, long delayTime) {
+ this.delayMsg = delayMsg;
+ this.delayTime = delayTime;
+ }
+
+ /** */
+ private static void delay(Ignite ignite, Class<?> delayMsg, long
delayTime) {
+
((DelayingTcpCommunicationSpi)ignite.configuration().getCommunicationSpi()).delay(delayMsg,
delayTime);
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite10.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite10.java
index 826716fb9dc..18eef5f83ff 100755
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite10.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite10.java
@@ -26,6 +26,7 @@ import
org.apache.ignite.cache.store.CacheTransactionalStoreReadFromBackupTest;
import org.apache.ignite.cache.store.GridStoreLoadCacheTest;
import org.apache.ignite.cache.store.jdbc.dialect.OracleDialectTest;
import org.apache.ignite.internal.IgniteInternalCacheRemoveTest;
+import
org.apache.ignite.internal.managers.communication.CommunicationMessageDelayTest;
import org.apache.ignite.internal.managers.communication.GridIoManagerSelfTest;
import
org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceMultipleConnectionsTest;
import
org.apache.ignite.internal.managers.communication.IgniteCommunicationBalancePairedConnectionsTest;
@@ -188,6 +189,7 @@ public class IgniteCacheTestSuite10 {
GridTestUtils.addTestIfNeeded(suite,
IgniteMessageFactoryImplTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
MessageDirectTypeIdConflictTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgniteIoCommunicationMessageSerializationTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
CommunicationMessageDelayTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
IgniteIncompleteCacheObjectSelfTest.class, ignoredTests);