This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2699a72bcbe IGNITE-27089 Fix handling for delayed ack failures
2699a72bcbe is described below
commit 2699a72bcbe6b2cb2a10b3fdb8b63ec35ef90c63
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Fri Nov 28 14:29:49 2025 +0300
IGNITE-27089 Fix handling for delayed ack failures
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 3 +
.../internal/client/proto/ErrorExtensions.java | 2 +
.../handler/ClientInboundMessageHandler.java | 6 +
.../internal/client/ClientDelayedAckException.java | 58 +++
.../ignite/internal/client/TcpClientChannel.java | 29 +-
.../internal/client/table/ClientTableMapUtils.java | 11 +-
.../internal/client/tx/ClientTransaction.java | 25 +-
modules/platforms/cpp/ignite/common/error_codes.h | 1 +
modules/platforms/cpp/ignite/odbc/common_types.cpp | 1 +
.../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs | 3 +
.../ignite/internal/replicator/ReplicaManager.java | 19 +-
.../internal/client/ItClientDirectMappingTest.java | 17 -
.../app/client/ItAbstractThinClientTest.java | 14 +-
.../app/client/ItThinClientTransactionsTest.java | 44 +--
...ClientTransactionsWithBrokenReplicatorTest.java | 389 +++++++++++++++++++++
.../replicator/PartitionReplicaListener.java | 18 +-
.../ignite/internal/tx/DelayedAckException.java | 52 +++
.../tx/TransactionExceptionMapperProvider.java | 2 +
.../internal/tx/impl/TransactionInflights.java | 64 +++-
.../ignite/internal/tx/impl/TxManagerImpl.java | 16 +-
20 files changed, 706 insertions(+), 68 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 9bd5cd73357..d62069fc500 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -453,6 +453,9 @@ public class ErrorGroups {
/** Operation failed because the transaction is already finished with
timeout. */
public static final int TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR =
TX_ERR_GROUP.registerErrorCode((short) 16);
+
+ /** Operation failed due to replication delayed ack failure. */
+ public static final int TX_DELAYED_ACK_ERR =
TX_ERR_GROUP.registerErrorCode((short) 17);
}
/** Replicator error group. */
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
index 3e4229ec18f..c7ce644f608 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
@@ -24,4 +24,6 @@ public class ErrorExtensions {
public static final String EXPECTED_SCHEMA_VERSION = "expected-schema-ver";
public static final String SQL_UPDATE_COUNTERS = "sql-update-counters";
+
+ public static final String DELAYED_ACK = "delayed-ack";
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 01dea112d25..0432a9934c7 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -166,6 +166,7 @@ import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
+import org.apache.ignite.internal.tx.DelayedAckException;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.CancelHandle;
@@ -686,6 +687,7 @@ public class ClientInboundMessageHandler
private void writeErrorCore(Throwable err, ClientMessagePacker packer) {
SchemaVersionMismatchException schemaVersionMismatchException =
findException(err, SchemaVersionMismatchException.class);
SqlBatchException sqlBatchException = findException(err,
SqlBatchException.class);
+ DelayedAckException delayedAckException = findException(err,
DelayedAckException.class);
err = firstNotNull(
schemaVersionMismatchException,
@@ -727,6 +729,10 @@ public class ClientInboundMessageHandler
packer.packInt(1); // 1 extension.
packer.packString(ErrorExtensions.SQL_UPDATE_COUNTERS);
packer.packLongArray(sqlBatchException.updateCounters());
+ } else if (delayedAckException != null) {
+ packer.packInt(1); // 1 extension.
+ packer.packString(ErrorExtensions.DELAYED_ACK);
+ packer.packUuid(delayedAckException.txId());
} else {
packer.packNil(); // No extensions.
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
new file mode 100644
index 00000000000..ef9e06513b5
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.util.UUID;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Holds the transaction id and the cause for delayed replication ack failure.
+ */
+public class ClientDelayedAckException extends IgniteInternalException {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Transaction id. */
+ private final UUID txId;
+
+ /**
+ * Constructor.
+ *
+ * @param traceId Trace ID.
+ * @param code Error code.
+ *
+ * @param message String message.
+ * @param txId Related transaction id.
+ * @param cause Cause.
+ */
+ ClientDelayedAckException(UUID traceId, int code, @Nullable String
message, UUID txId, @Nullable Throwable cause) {
+ super(traceId, code, message, cause);
+
+ this.txId = txId;
+ }
+
+ /**
+ * Gets expected schema version.
+ *
+ * @return Expected schema version.
+ */
+ public UUID txId() {
+ return txId;
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 2eae304895e..1d86556a9b3 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -75,6 +75,7 @@ import org.apache.ignite.lang.ErrorGroups.Table;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.sql.SqlBatchException;
+import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
/**
@@ -590,6 +591,17 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
if (handler == null) {
// Default notification handler. Used to deliver delayed
replication acks.
+ if (err != null) {
+ if (err instanceof ClientDelayedAckException) {
+ ClientDelayedAckException err0 =
(ClientDelayedAckException) err;
+
+ inflights.removeInflight(err0.txId(), new
TransactionException(err0.code(), err0.getMessage(), err0.getCause()));
+ }
+
+ // Can't do anything to remove stuck inflight.
+ return;
+ }
+
UUID txId = unpacker.unpackUuid();
inflights.removeInflight(txId, err);
@@ -617,6 +629,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
int extSize = unpacker.tryUnpackNil() ? 0 : unpacker.unpackInt();
int expectedSchemaVersion = -1;
long[] sqlUpdateCounters = null;
+ UUID txId = null;
for (int i = 0; i < extSize; i++) {
String key = unpacker.unpackString();
@@ -625,12 +638,23 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
expectedSchemaVersion = unpacker.unpackInt();
} else if (key.equals(ErrorExtensions.SQL_UPDATE_COUNTERS)) {
sqlUpdateCounters = unpacker.unpackLongArray();
+ } else if (key.equals(ErrorExtensions.DELAYED_ACK)) {
+ txId = unpacker.unpackUuid();
} else {
// Unknown extension - ignore.
unpacker.skipValues(1);
}
}
+ if (txId != null) {
+ return new ClientDelayedAckException(traceId, code, errMsg, txId,
causeWithStackTrace);
+ }
+
+ if (sqlUpdateCounters != null) {
+ errMsg = errMsg != null ? errMsg : "SQL batch execution error";
+ return new SqlBatchException(traceId, code, sqlUpdateCounters,
errMsg, causeWithStackTrace);
+ }
+
if (code == Table.SCHEMA_VERSION_MISMATCH_ERR) {
if (expectedSchemaVersion == -1) {
return new IgniteException(
@@ -640,11 +664,6 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
return new ClientSchemaVersionMismatchException(traceId, code,
errMsg, expectedSchemaVersion, causeWithStackTrace);
}
- if (sqlUpdateCounters != null) {
- errMsg = errMsg != null ? errMsg : "SQL batch execution error";
- return new SqlBatchException(traceId, code, sqlUpdateCounters,
errMsg, causeWithStackTrace);
- }
-
try {
Class<? extends Throwable> errCls = (Class<? extends Throwable>)
Class.forName(errClassName);
return copyExceptionWithCause(errCls, traceId, code, errMsg,
causeWithStackTrace);
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTableMapUtils.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTableMapUtils.java
index 826b08289e4..49f9f6ce3b6 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTableMapUtils.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTableMapUtils.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import org.apache.ignite.client.IgniteClientConnectionException;
import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.util.CompletableFutures;
@@ -126,7 +127,10 @@ class ClientTableMapUtils {
}
Transaction tx0 = txns.get(i);
tx0.rollbackAsync().exceptionally(e -> {
- log.error("Failed to rollback a transactional batch: [tx=" +
tx0 + ']', e);
+ Throwable cause = unwrapCause(e);
+ if (!(cause instanceof IgniteClientConnectionException)) {
+ log.error("Failed to rollback a transactional batch: [tx="
+ tx0 + ']', cause);
+ }
return null;
});
}
@@ -141,7 +145,10 @@ class ClientTableMapUtils {
for (Transaction txn : txns) {
ClientLazyTransaction tx0 = (ClientLazyTransaction) txn;
CompletableFuture<Void> fut = tx0.commitAsync().exceptionally(e ->
{
- log.error("Failed to commit a transactional batch: [tx=" + tx0
+ ']', e);
+ Throwable cause = unwrapCause(e);
+ if (!(cause instanceof IgniteClientConnectionException)) {
+ log.error("Failed to commit a transactional batch: [tx=" +
tx0 + ']', cause);
+ }
return null;
});
// Enforce sync commit to avoid lock conflicts then working in
compatibility mode.
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index 2eceadf027f..74a7d8c3ac4 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.client.tx;
+import static java.util.function.Function.identity;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -221,13 +222,27 @@ public class ClientTransaction implements Transaction {
boolean enabled =
ch.protocolContext().isFeatureSupported(TX_PIGGYBACK);
CompletableFuture<Void> finishFut = enabled ?
ch.inflights().finishFuture(txId()) : nullCompletedFuture();
- CompletableFuture<Void> mainFinishFut = finishFut.thenCompose(ignored
-> ch.serviceAsync(ClientOp.TX_COMMIT, w -> {
- w.out().packLong(id);
+ CompletableFuture<Void> mainFinishFut = finishFut.handle((ignored, e)
-> {
+ if (e != null) {
+ ch.serviceAsync(ClientOp.TX_ROLLBACK, w -> {
+ w.out().packLong(id);
- if (!isReadOnly && enabled) {
- packEnlisted(w);
+ if (!isReadOnly && enabled) {
+ packEnlisted(w);
+ }
+ }, r -> null);
+
+ return CompletableFuture.<Void>failedFuture(e);
}
- }, r -> null));
+
+ return ch.serviceAsync(ClientOp.TX_COMMIT, w -> {
+ w.out().packLong(id);
+
+ if (!isReadOnly && enabled) {
+ packEnlisted(w);
+ }
+ }, r -> (Void) null);
+ }).thenCompose(identity());
mainFinishFut.handle((res, e) -> {
setState(STATE_COMMITTED);
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h
b/modules/platforms/cpp/ignite/common/error_codes.h
index f6d47e1ea33..ea8036ad0ec 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -137,6 +137,7 @@ enum class code : underlying_t {
TX_STALE_OPERATION = 0x7000e,
TX_STALE_READ_ONLY_OPERATION = 0x7000f,
TX_ALREADY_FINISHED_WITH_TIMEOUT = 0x70010,
+ TX_DELAYED_ACK = 0x70011,
// Replicator group. Group code: 8
REPLICA_COMMON = 0x80001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index ff092b83a14..4eefe311cf4 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -208,6 +208,7 @@ sql_state error_code_to_sql_state(error::code code) {
case error::code::TX_STALE_OPERATION:
case error::code::TX_STALE_READ_ONLY_OPERATION:
case error::code::TX_ALREADY_FINISHED_WITH_TIMEOUT:
+ case error::code::TX_DELAYED_ACK:
return sql_state::S25000_INVALID_TRANSACTION_STATE;
// Replicator group. Group code: 8
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index 547daee778a..8355f91e01c 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -364,6 +364,9 @@ namespace Apache.Ignite
/// <summary> TxAlreadyFinishedWithTimeout error. </summary>
public const int TxAlreadyFinishedWithTimeout = (GroupCode << 16)
| (16 & 0xFFFF);
+
+ /// <summary> TxDelayedAck error. </summary>
+ public const int TxDelayedAck = (GroupCode << 16) | (17 & 0xFFFF);
}
/// <summary> Replicator errors. </summary>
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 57aa60e5242..9d11528e820 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -420,7 +420,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
LOG.debug("Sending delayed response for
replica request [request={}]", request);
if (ex0 == null) {
- msg0 =
prepareReplicaResponse(sendTimestamp, new ReplicaResult(res0, null));
+ msg0 =
prepareDelayedReplicaResponse(sendTimestamp, res0);
} else {
if (indicatesUnexpectedProblem(ex0)) {
LOG.warn("Failed to process delayed
response [request={}]", ex0, request);
@@ -1104,7 +1104,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
return REPLICA_MESSAGES_FACTORY
.errorTimestampAwareReplicaResponse()
.throwable(ex)
- .timestamp(clockService.now())
+ .timestamp(clockService.current())
.build();
} else {
return REPLICA_MESSAGES_FACTORY
@@ -1114,6 +1114,21 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
}
}
+ private NetworkMessage prepareDelayedReplicaResponse(boolean
sendTimestamp, Object result) {
+ if (sendTimestamp) {
+ return REPLICA_MESSAGES_FACTORY
+ .timestampAwareReplicaResponse()
+ .result(result)
+ .timestamp(clockService.current())
+ .build();
+ } else {
+ return REPLICA_MESSAGES_FACTORY
+ .replicaResponse()
+ .result(result)
+ .build();
+ }
+ }
+
/**
* Idle safe time sync for replicas.
*/
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/client/ItClientDirectMappingTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/client/ItClientDirectMappingTest.java
index d4f87d39755..b671b25ca24 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/client/ItClientDirectMappingTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/client/ItClientDirectMappingTest.java
@@ -23,13 +23,9 @@ import static
org.apache.ignite.internal.tx.test.ItTransactionTestUtils.withTx;
import static
org.apache.ignite.internal.tx.test.ItTransactionTestUtils.withTxVoid;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.util.function.Predicate;
-import java.util.stream.IntStream;
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
-import org.apache.ignite.internal.TestWrappers;
-import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.client.table.ClientTable;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Tuple;
@@ -129,19 +125,6 @@ public class ItClientDirectMappingTest extends
ClusterPerTestIntegrationTest {
return NODE_BOOTSTRAP_CFG_TEMPLATE;
}
- private IgniteImpl findNode(int startRange, int endRange,
Predicate<IgniteImpl> filter) {
- return IntStream.range(startRange, endRange)
- .mapToObj(this::node)
- .map(TestWrappers::unwrapIgniteImpl)
- .filter(filter::test)
- .findFirst()
- .get();
- }
-
- private IgniteImpl findNodeByName(String leaseholder) {
- return findNode(0, initialNodes(), n -> leaseholder.equals(n.name()));
- }
-
@Override
protected int initialNodes() {
return 2;
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
index 243819b87c6..f9d20b46339 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
@@ -43,6 +43,7 @@ import org.apache.ignite.compute.BroadcastJobTarget;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.internal.raft.configuration.RaftConfigurationSchema;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
@@ -66,6 +67,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ExtendWith(WorkDirectoryExtension.class)
public abstract class ItAbstractThinClientTest extends BaseIgniteAbstractTest {
+ protected static final String ZONE_NAME = "TEST_ZONE";
+
protected static final String TABLE_NAME = "TBL1";
protected static final String COLUMN_KEY = "key";
@@ -97,8 +100,9 @@ public abstract class ItAbstractThinClientTest extends
BaseIgniteAbstractTest {
+ (i == 1 ? ("
clientConnector.sendServerExceptionStackTraceToClient: true\n"
+ " clientConnector.metricsEnabled: true\n") : "")
+ " clientConnector.port: " + (10800 + i) + ",\n"
- + " rest.port: " + (10300 + i) + "\n"
- + " compute.threadPoolSize: 1\n"
+ + " rest.port: " + (10300 + i) + ",\n"
+ + " compute.threadPoolSize: 1,\n"
+ + " raft.retryTimeoutMillis: " +
raftTimeoutMillis()
+ "}"
);
}
@@ -124,7 +128,7 @@ public abstract class ItAbstractThinClientTest extends
BaseIgniteAbstractTest {
IgniteSql sql = startedNodes.get(0).sql();
- sql.execute(null, "CREATE ZONE TEST_ZONE (REPLICAS " + replicas() + ",
PARTITIONS " + PARTITIONS + ") STORAGE PROFILES ['"
+ sql.execute(null, "CREATE ZONE " + ZONE_NAME + " (REPLICAS " +
replicas() + ", PARTITIONS " + PARTITIONS + ") STORAGE PROFILES ['"
+ DEFAULT_STORAGE_PROFILE + "']");
sql.execute(null, "CREATE TABLE " + TABLE_NAME + "("
+ COLUMN_KEY + " INT PRIMARY KEY, " + COLUMN_VAL + " VARCHAR)
ZONE TEST_ZONE");
@@ -187,6 +191,10 @@ public abstract class ItAbstractThinClientTest extends
BaseIgniteAbstractTest {
return DEFAULT_IDLE_SAFE_TIME_PROP_DURATION;
}
+ protected long raftTimeoutMillis() {
+ return new RaftConfigurationSchema().retryTimeoutMillis;
+ }
+
protected IgniteClient client() {
return client;
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
index 2d31607b24b..2d94a465045 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.runner.app.client;
import static java.util.Collections.emptyList;
import static java.util.Comparator.comparing;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.startsWith;
@@ -49,7 +50,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.client.IgniteClient;
-import org.apache.ignite.internal.TestWrappers;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.client.ClientChannel;
import org.apache.ignite.internal.client.ClientTransactionInflights;
@@ -450,10 +450,10 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
assertTrue(coordIdx != -1);
- IgniteImpl coord = TestWrappers.unwrapIgniteImpl(server(coordIdx));
+ IgniteImpl coord = unwrapIgniteImpl(server(coordIdx));
assertNotNull(coord.txManager().stateMeta(txId), "Transaction expected
to be colocated with enlistment");
- IgniteImpl other = TestWrappers.unwrapIgniteImpl(server(1 - coordIdx));
+ IgniteImpl other = unwrapIgniteImpl(server(1 - coordIdx));
do {
k++;
@@ -620,7 +620,7 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
ClientLazyTransaction tx0 = (ClientLazyTransaction)
client().transactions().begin();
- IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
List<Tuple> tuples0 = generateKeysForNode(200, 2, map,
server0.cluster().localNode(), table);
@@ -650,8 +650,8 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
ClientTable table = (ClientTable) table();
- IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
- IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = unwrapIgniteImpl(server(1));
List<Tuple> tuples0 = generateKeysForNode(300, 1, map,
server0.cluster().localNode(), table);
List<Tuple> tuples1 = generateKeysForNode(310, 1, map,
server1.cluster().localNode(), table);
@@ -682,8 +682,8 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
ClientTable table = (ClientTable) table();
- IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
- IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = unwrapIgniteImpl(server(1));
List<Tuple> tuples0 = generateKeysForNode(400, 1, map,
server0.cluster().localNode(), table);
List<Tuple> tuples1 = generateKeysForNode(410, 1, map,
server1.cluster().localNode(), table);
@@ -731,8 +731,8 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
ClientTable table = (ClientTable) table();
- IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
- IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = unwrapIgniteImpl(server(1));
List<Tuple> tuples0 = generateKeysForNode(500, 1, map,
server0.cluster().localNode(), table);
List<Tuple> tuples1 = generateKeysForNode(510, 80, map,
server1.cluster().localNode(), table);
@@ -858,8 +858,8 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
ClientTable table = (ClientTable) table();
- IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
- IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = unwrapIgniteImpl(server(1));
List<Tuple> tuples0 = generateKeysForNode(600, 50, map,
server0.cluster().localNode(), table);
List<Tuple> tuples1 = generateKeysForNode(610, 50, map,
server1.cluster().localNode(), table);
@@ -889,8 +889,8 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
ClientTable table = (ClientTable) table();
- IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
- IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = unwrapIgniteImpl(server(1));
List<Tuple> tuples0 = generateKeysForNode(600, 2, map,
server0.cluster().localNode(), table);
List<Tuple> tuples1 = generateKeysForNode(610, 1, map,
server1.cluster().localNode(), table);
@@ -954,8 +954,8 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
ClientTable table = (ClientTable) table();
- IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
- IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = unwrapIgniteImpl(server(1));
List<Tuple> keys0 = generateKeysForNode(600, 2, map,
server0.cluster().localNode(), table);
List<Tuple> keys1 = generateKeysForNode(610, 1, map,
server1.cluster().localNode(), table);
@@ -1019,8 +1019,8 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
ClientTable table = (ClientTable) table();
- IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
- IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = unwrapIgniteImpl(server(1));
List<Tuple> tuples0 = generateKeysForNode(700, 50, map,
server0.cluster().localNode(), table);
List<Tuple> tuples1 = generateKeysForNode(710, 50, map,
server1.cluster().localNode(), table);
@@ -1097,8 +1097,8 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
// Load partition map to ensure all entries are directly mapped.
Map<Partition, ClusterNode> map =
table.partitionManager().primaryReplicasAsync().join();
- IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
- IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = unwrapIgniteImpl(server(1));
List<Tuple> tuples0 = generateKeysForNode(600, 20, map,
server0.cluster().localNode(), table);
List<Tuple> tuples1 = generateKeysForNode(600, 20, map,
server1.cluster().localNode(), table);
@@ -1135,8 +1135,8 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
// Load partition map to ensure all entries are directly mapped.
Map<Partition, ClusterNode> map =
table.partitionManager().primaryReplicasAsync().join();
- IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
- IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = unwrapIgniteImpl(server(1));
List<Tuple> tuples0 = generateKeysForNode(600, 20, map,
server0.cluster().localNode(), table);
List<Tuple> tuples1 = generateKeysForNode(600, 20, map,
server1.cluster().localNode(), table);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithBrokenReplicatorTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithBrokenReplicatorTest.java
new file mode 100644
index 00000000000..18da22a1173
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithBrokenReplicatorTest.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app.client;
+
+import static
org.apache.ignite.internal.ReplicationGroupsUtils.zonePartitionIds;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.runner.app.client.ItThinClientTransactionsTest.generateKeysForPartition;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.client.table.ClientTable;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.JRaftUtils;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.AppendEntriesRequestImpl;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
+import org.apache.ignite.raft.jraft.rpc.RaftServerService;
+import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
+import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
+import org.apache.ignite.raft.jraft.rpc.RpcRequestProcessor;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
+import org.apache.ignite.raft.jraft.rpc.RpcServer;
+import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
+import
org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestProcessor;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test thin client transactions failure handling with broken replicator.
+ */
+public class ItThinClientTransactionsWithBrokenReplicatorTest extends
ItAbstractThinClientTest {
+ @Override
+ protected long raftTimeoutMillis() {
+ return TimeUnit.SECONDS.toMillis(2); // Set small retry timeout to
reduce the test execution time.
+ }
+
+ @Test
+ public void testErrorDuringDirectMappingSinglePartitionTransaction() {
+ Map<Partition, ClusterNode> map =
table().partitionManager().primaryReplicasAsync().join();
+
+ ClientTable table = (ClientTable) table();
+
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = unwrapIgniteImpl(server(1));
+
+ List<ZonePartitionId> replicationGroupIds = getPartitions(server0);
+ ZonePartitionId part0 = replicationGroupIds.get(0);
+
+ List<Tuple> tuples0 = generateKeysForPartition(100, 1, map,
part0.partitionId(), table);
+
+ FaultyAppendEntriesRequestProcessor proc0 =
installFaultyAppendEntriesProcessor(server0);
+ proc0.setFaultyGroup(part0);
+
+ FaultyAppendEntriesRequestProcessor proc1 =
installFaultyAppendEntriesProcessor(server1);
+ proc1.setFaultyGroup(part0);
+
+ KeyValueView<Tuple, Tuple> tupleView = table.keyValueView();
+
+ Transaction tx = client().transactions().begin();
+ tupleView.put(tx, tuples0.get(0), val(tuples0.get(0).intValue(0) +
""));
+
+ try {
+ tx.commit();
+ fail("Expecting commit failure");
+ } catch (TransactionException exception) {
+ assertEquals(Transactions.TX_DELAYED_ACK_ERR, exception.code());
+ }
+
+ proc0.setFaultyGroup(null);
+ proc1.setFaultyGroup(null);
+
+ assertNull(tupleView.get(null, tuples0.get(0)));
+ }
+
+ @Test
+ public void testErrorDuringDirectMappingSinglePartitionTransactionBatch() {
+ Map<Partition, ClusterNode> map =
table().partitionManager().primaryReplicasAsync().join();
+
+ ClientTable table = (ClientTable) table();
+
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = unwrapIgniteImpl(server(1));
+
+ List<ZonePartitionId> replicationGroupIds = getPartitions(server0);
+ ZonePartitionId part0 = replicationGroupIds.get(0);
+
+ List<Tuple> tuples0 = generateKeysForPartition(200, 2, map,
part0.partitionId(), table);
+
+ Map<Tuple, Tuple> batch = new HashMap<>();
+
+ for (Tuple tup : tuples0) {
+ batch.put(tup, val(tup.intValue(0) + ""));
+ }
+
+ FaultyAppendEntriesRequestProcessor proc0 =
installFaultyAppendEntriesProcessor(server0);
+ proc0.setFaultyGroup(part0);
+
+ FaultyAppendEntriesRequestProcessor proc1 =
installFaultyAppendEntriesProcessor(server1);
+ proc1.setFaultyGroup(part0);
+
+ KeyValueView<Tuple, Tuple> tupleView = table.keyValueView();
+
+ Transaction tx = client().transactions().begin();
+ tupleView.putAll(tx, batch);
+
+ try {
+ tx.commit();
+ fail("Expecting commit failure");
+ } catch (TransactionException exception) {
+ assertEquals(Transactions.TX_DELAYED_ACK_ERR, exception.code());
+ }
+
+ proc0.setFaultyGroup(null);
+ proc1.setFaultyGroup(null);
+
+ assertTrue(tupleView.getAll(null, batch.keySet()).isEmpty());
+ }
+
+ @Test
+ public void testErrorDuringDirectMappingTwoPartitionTransaction() {
+ Map<Partition, ClusterNode> map =
table().partitionManager().primaryReplicasAsync().join();
+ Map<Integer, ClusterNode> mapPartById =
map.entrySet().stream().collect(Collectors.toMap(
+ entry -> ((HashPartition) entry.getKey()).partitionId(),
+ Entry::getValue
+ ));
+
+ ClientTable table = (ClientTable) table();
+
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = unwrapIgniteImpl(server(1));
+
+ List<ZonePartitionId> replicationGroupIds = getPartitions(server0);
+ ZonePartitionId part0 = replicationGroupIds.get(0);
+ ClusterNode firstNode = mapPartById.get(part0.partitionId());
+
+ ZonePartitionId part1 = null;
+
+ // We need to find a partition on other node.
+ for (int i = 1; i < replicationGroupIds.size(); i++) {
+ ZonePartitionId tmp = replicationGroupIds.get(i);
+ ClusterNode otherNode = mapPartById.get(tmp.partitionId());
+ if (!otherNode.equals(firstNode)) {
+ part1 = tmp;
+ break;
+ }
+ }
+
+ assertNotNull(part1);
+
+ List<Tuple> tuples0 = generateKeysForPartition(300, 1, map,
part0.partitionId(), table);
+ List<Tuple> tuples1 = generateKeysForPartition(310, 1, map,
part1.partitionId(), table);
+
+ FaultyAppendEntriesRequestProcessor proc0 =
installFaultyAppendEntriesProcessor(server0);
+ proc0.setFaultyGroup(part1);
+
+ FaultyAppendEntriesRequestProcessor proc1 =
installFaultyAppendEntriesProcessor(server1);
+ proc1.setFaultyGroup(part1);
+
+ KeyValueView<Tuple, Tuple> tupleView = table.keyValueView();
+
+ Transaction tx = client().transactions().begin();
+
+ Map<Tuple, Tuple> batch = new LinkedHashMap<>();
+
+ for (Tuple tup : tuples0) {
+ batch.put(tup, val(tup.intValue(0) + ""));
+ }
+
+ for (Tuple tup : tuples1) {
+ batch.put(tup, val(tup.intValue(0) + ""));
+ }
+
+ Iterator<Entry<Tuple, Tuple>> iter = batch.entrySet().iterator();
+ Entry<Tuple, Tuple> first = iter.next();
+
+ tupleView.put(tx, first.getKey(), first.getValue());
+
+ // Directly mapped request, will cause client inflight failure.
+ Entry<Tuple, Tuple> second = iter.next();
+ tupleView.put(tx, second.getKey(), second.getValue());
+
+ try {
+ tx.commit();
+ fail("Expecting commit failure");
+ } catch (TransactionException exception) {
+ assertEquals(Transactions.TX_DELAYED_ACK_ERR, exception.code());
+ }
+
+ proc0.setFaultyGroup(null);
+ proc1.setFaultyGroup(null);
+
+ assertTrue(tupleView.getAll(null, batch.keySet()).isEmpty());
+ }
+
+ @Test
+ public void testErrorDuringDirectMappingTwoPartitionTransactionColocated()
{
+ Map<Partition, ClusterNode> map =
table().partitionManager().primaryReplicasAsync().join();
+ Map<Integer, ClusterNode> mapPartById =
map.entrySet().stream().collect(Collectors.toMap(
+ entry -> ((HashPartition) entry.getKey()).partitionId(),
+ Entry::getValue
+ ));
+
+ ClientTable table = (ClientTable) table();
+
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = unwrapIgniteImpl(server(1));
+
+ List<ZonePartitionId> replicationGroupIds = getPartitions(server0);
+ ZonePartitionId part0 = replicationGroupIds.get(0);
+ ClusterNode firstNode = mapPartById.get(part0.partitionId());
+
+ ZonePartitionId part1 = null;
+
+ // We need to find a partition on the same node.
+ for (int i = 1; i < replicationGroupIds.size(); i++) {
+ ZonePartitionId tmp = replicationGroupIds.get(i);
+ ClusterNode otherNode = mapPartById.get(tmp.partitionId());
+ if (otherNode.equals(firstNode)) {
+ part1 = tmp;
+ break;
+ }
+ }
+
+ assertNotNull(part1);
+
+ List<Tuple> tuples0 = generateKeysForPartition(400, 1, map,
part0.partitionId(), table);
+ List<Tuple> tuples1 = generateKeysForPartition(410, 1, map,
part1.partitionId(), table);
+
+ FaultyAppendEntriesRequestProcessor proc0 =
installFaultyAppendEntriesProcessor(server0);
+ proc0.setFaultyGroup(part1);
+
+ FaultyAppendEntriesRequestProcessor proc1 =
installFaultyAppendEntriesProcessor(server1);
+ proc1.setFaultyGroup(part1);
+
+ KeyValueView<Tuple, Tuple> tupleView = table.keyValueView();
+
+ Transaction tx = client().transactions().begin();
+
+ Map<Tuple, Tuple> batch = new LinkedHashMap<>();
+
+ for (Tuple tup : tuples0) {
+ batch.put(tup, val(tup.intValue(0) + ""));
+ }
+
+ for (Tuple tup : tuples1) {
+ batch.put(tup, val(tup.intValue(0) + ""));
+ }
+
+ Iterator<Entry<Tuple, Tuple>> iter = batch.entrySet().iterator();
+ Entry<Tuple, Tuple> first = iter.next();
+
+ tupleView.put(tx, first.getKey(), first.getValue());
+
+ // Colocated request, will cause server inflight failure.
+ Entry<Tuple, Tuple> second = iter.next();
+ tupleView.put(tx, second.getKey(), second.getValue());
+
+ try {
+ tx.commit();
+ fail("Expecting commit failure");
+ } catch (TransactionException exception) {
+ assertEquals(Transactions.TX_DELAYED_ACK_ERR, exception.code());
+ }
+
+ proc0.setFaultyGroup(null);
+ proc1.setFaultyGroup(null);
+
+ assertTrue(tupleView.getAll(null, batch.keySet()).isEmpty());
+ }
+
+ private Table table() {
+ return client().tables().tables().get(0);
+ }
+
+ private static Tuple val(String v) {
+ return Tuple.create().set(COLUMN_VAL, v);
+ }
+
+ private static Tuple key(Integer k) {
+ return Tuple.create().set(COLUMN_KEY, k);
+ }
+
+ @Override
+ protected int replicas() {
+ return 2;
+ }
+
+ private static FaultyAppendEntriesRequestProcessor
installFaultyAppendEntriesProcessor(IgniteImpl node) {
+ RaftServer raftServer = node.raftManager().server();
+ RpcServer<?> rpcServer = getFieldValue(raftServer,
JraftServerImpl.class, "rpcServer");
+ Map<String, RpcProcessor<?>> processors = getFieldValue(rpcServer,
IgniteRpcServer.class, "processors");
+
+ AppendEntriesRequestProcessor originalProcessor =
+ (AppendEntriesRequestProcessor)
processors.get(AppendEntriesRequest.class.getName());
+ Executor appenderExecutor = getFieldValue(originalProcessor,
RpcRequestProcessor.class, "executor");
+ RaftMessagesFactory raftMessagesFactory =
getFieldValue(originalProcessor, RpcRequestProcessor.class, "msgFactory");
+
+ FaultyAppendEntriesRequestProcessor blockingProcessor = new
FaultyAppendEntriesRequestProcessor(
+ appenderExecutor,
+ raftMessagesFactory
+ );
+
+ rpcServer.registerProcessor(blockingProcessor);
+
+ return blockingProcessor;
+ }
+
+ private static class FaultyAppendEntriesRequestProcessor extends
AppendEntriesRequestProcessor {
+ private volatile @Nullable String partId;
+
+ FaultyAppendEntriesRequestProcessor(Executor executor,
RaftMessagesFactory msgFactory) {
+ super(executor, msgFactory);
+ }
+
+ @Override
+ public Message processRequest0(RaftServerService service,
AppendEntriesRequest request, RpcRequestClosure done) {
+ boolean isHeartbeat = JRaftUtils.isHeartbeatRequest(request);
+
+ if (partId != null && partId.equals(request.groupId()) &&
!isHeartbeat) {
+ // This response puts replicator to endless retry loop.
+ return RaftRpcFactory.DEFAULT //
+ .newResponse(done.getMsgFactory(), RaftError.EINTERNAL,
+ "Fail AppendEntries on '%s'.",
request.groupId());
+ }
+
+ return super.processRequest0(service, request, done);
+ }
+
+ void setFaultyGroup(@Nullable ZonePartitionId partId) {
+ this.partId = partId == null ? null : partId.toString();
+ }
+
+ @Override
+ public String interest() {
+ return AppendEntriesRequestImpl.class.getName();
+ }
+ }
+
+ private static List<ZonePartitionId> getPartitions(IgniteImpl server) {
+ Catalog catalog =
server.catalogManager().catalog(server.catalogManager().latestCatalogVersion());
+ CatalogZoneDescriptor zoneDescriptor =
catalog.zone(ZONE_NAME.toUpperCase());
+ List<ZonePartitionId> replicationGroupIds = zonePartitionIds(server,
zoneDescriptor.id());
+ return replicationGroupIds;
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index fb33446034a..a0cd0633951 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -43,6 +43,7 @@ import static
org.apache.ignite.internal.util.CompletableFutures.emptyCollection
import static
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.IgniteUtils.findAny;
import static org.apache.ignite.internal.util.IgniteUtils.findFirst;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
@@ -170,6 +171,7 @@ import
org.apache.ignite.internal.table.distributed.TableUtils;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.replicator.handlers.BuildIndexReplicaRequestHandler;
import org.apache.ignite.internal.table.metrics.TableMetricSource;
+import org.apache.ignite.internal.tx.DelayedAckException;
import org.apache.ignite.internal.tx.Lock;
import org.apache.ignite.internal.tx.LockKey;
import org.apache.ignite.internal.tx.LockManager;
@@ -2296,7 +2298,13 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
);
}
- CompletableFuture<UUID> repFut =
applyCmdWithExceptionHandling(cmd).thenApply(res -> cmd.txId());
+ CompletableFuture<UUID> repFut =
applyCmdWithExceptionHandling(cmd).handle((r, e) -> {
+ if (e != null) {
+ throw new DelayedAckException(cmd.txId(),
unwrapCause(e));
+ }
+
+ return cmd.txId();
+ });
return completedFuture(new CommandApplicationResult(null,
repFut));
}
@@ -2429,7 +2437,13 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
);
}
- CompletableFuture<UUID> repFut =
applyCmdWithExceptionHandling(cmd).thenApply(res -> cmd.txId());
+ CompletableFuture<UUID> repFut =
applyCmdWithExceptionHandling(cmd).handle((r, e) -> {
+ if (e != null) {
+ throw new DelayedAckException(cmd.txId(), unwrapCause(e));
+ }
+
+ return cmd.txId();
+ });
return completedFuture(new CommandApplicationResult(null, repFut));
} else {
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/DelayedAckException.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/DelayedAckException.java
new file mode 100644
index 00000000000..decab36b942
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/DelayedAckException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_DELAYED_ACK_ERR;
+
+import java.util.UUID;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+
+/**
+ * Holds the transaction id and the cause for delayed replication ack failure.
+ */
+public class DelayedAckException extends IgniteInternalException {
+ private static final long serialVersionUID = 0L;
+
+ private final UUID txId;
+
+ /**
+ * Create the exception with txId and caus.
+ *
+ * @param txId The transaction id.
+ * @param cause The cause.
+ */
+ public DelayedAckException(UUID txId, Throwable cause) {
+ super(TX_DELAYED_ACK_ERR, "Failed to commit the transaction due to
failed replication acknowledgement [txId=" + txId + ']', cause);
+ this.txId = txId;
+ }
+
+ /**
+ * Get the transaction id.
+ *
+ * @return The id.
+ */
+ public UUID txId() {
+ return txId;
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
index 35af89caa08..da5311e8da7 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
@@ -46,6 +46,8 @@ public class TransactionExceptionMapperProvider implements
IgniteExceptionMapper
err -> new
MismatchingTransactionOutcomeException(err.traceId(), err.code(),
err.getMessage(), err)));
mappers.add(unchecked(IncompatibleSchemaAbortException.class,
err -> new IncompatibleSchemaException(err.traceId(),
err.code(), err.getMessage(), err)));
+ mappers.add(unchecked(DelayedAckException.class,
+ err -> new TransactionException(err.traceId(), err.code(),
err.getMessage(), err.getCause())));
return mappers;
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
index 5e8f4e4511f..c6c79c1c89c 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
@@ -170,7 +170,26 @@ public class TransactionInflights {
// Avoid completion under lock.
if (tuple != null) {
- tuple.onInflightsRemoved();
+ tuple.onInflightRemoved(tuple.err);
+ }
+ }
+
+ void removeInflight(UUID txId, Throwable cause) {
+ // Can be null if tx was aborted and inflights were removed from the
collection.
+ TxContext tuple = txCtxMap.computeIfPresent(txId, (uuid, ctx) -> {
+ if (cause != null && ctx.err == null) {
+ ctx.err = cause; // Retain only first exception.
+ }
+
+ // Update inflight counter after assigning error value to avoid
issues with visibility.
+ ctx.removeInflight(txId);
+
+ return ctx;
+ });
+
+ // Avoid completion under lock.
+ if (tuple != null) {
+ tuple.onInflightRemoved(tuple.err);
}
}
@@ -251,6 +270,7 @@ public class TransactionInflights {
abstract static class TxContext {
volatile long inflights = 0; // Updated under lock.
+ Throwable err;
boolean addInflight() {
if (isTxFinishing()) {
@@ -269,7 +289,7 @@ public class TransactionInflights {
inflights--;
}
- abstract void onInflightsRemoved();
+ abstract void onInflightRemoved(@Nullable Throwable t);
abstract void finishTx(@Nullable Map<ReplicationGroupId,
PendingTxPartitionEnlistment> enlistedGroups);
@@ -293,7 +313,7 @@ public class TransactionInflights {
}
@Override
- public void onInflightsRemoved() {
+ void onInflightRemoved(Throwable t) {
// No-op.
}
@@ -337,12 +357,29 @@ public class TransactionInflights {
}
CompletableFuture<Void> performFinish(boolean commit,
Function<Boolean, CompletableFuture<Void>> finishAction) {
- waitReadyToFinish(commit).whenComplete((ignoredReadyToFinish,
readyException) -> {
+ waitReadyToFinish(commit).whenComplete((ignored, readyException)
-> {
try {
- CompletableFuture<Void> actionFut =
finishAction.apply(commit && readyException == null);
+ if (commit) {
+ if (readyException == null) {
+ CompletableFuture<Void> actionFut =
finishAction.apply(true);
+
+ actionFut.whenComplete((ignoredFinishActionResult,
finishException) ->
+ completeFinishInProgressFuture(true, null,
finishException));
+ } else {
+ // If we got ready exception, that means some of
enlisted partitions could be broken/unavailable.
+ // Respond to caller with the commit failure
immediately to reduce potential unavailability window.
+ completeFinishInProgressFuture(true,
readyException, null);
+
+ finishAction.apply(false);
+ }
+
+ return;
+ }
+
+ CompletableFuture<Void> actionFut =
finishAction.apply(false);
actionFut.whenComplete((ignoredFinishActionResult,
finishException) ->
- completeFinishInProgressFuture(commit,
readyException, finishException));
+ completeFinishInProgressFuture(false,
readyException, finishException));
} catch (Throwable err) {
completeFinishInProgressFuture(commit, readyException,
err);
}
@@ -407,8 +444,13 @@ public class TransactionInflights {
}
private CompletableFuture<Void> waitNoInflights() {
+ // no new inflights are possible due to locked tx for update.
if (inflights == 0) {
- waitRepFut.complete(null);
+ if (err != null) {
+ waitRepFut.completeExceptionally(err);
+ } else {
+ waitRepFut.complete(null);
+ }
}
return waitRepFut;
}
@@ -418,9 +460,13 @@ public class TransactionInflights {
}
@Override
- public void onInflightsRemoved() {
+ void onInflightRemoved(@Nullable Throwable t) {
if (inflights == 0 && finishInProgressFuture != null) {
- waitRepFut.complete(null);
+ if (t == null) {
+ waitRepFut.complete(null);
+ } else {
+ waitRepFut.completeExceptionally(t);
+ }
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 83231bc39a0..f366de3c722 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -97,6 +97,7 @@ import
org.apache.ignite.internal.replicator.message.ReplicaResponse;
import org.apache.ignite.internal.systemview.api.SystemView;
import org.apache.ignite.internal.systemview.api.SystemViewProvider;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.tx.DelayedAckException;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.InternalTxOptions;
import org.apache.ignite.internal.tx.LocalRwTxCounter;
@@ -1213,8 +1214,21 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
return;
}
- // Ignore error responses here. A transaction will be rolled back in
other place.
+ // Only removing the failed inflight here. A transaction will be
rolled back in other place.
if (message instanceof ErrorReplicaResponse) {
+ ErrorReplicaResponse response = (ErrorReplicaResponse) message;
+
+ Throwable err = response.throwable();
+
+ Throwable cause = ExceptionUtils.unwrapCause(err);
+
+ if (cause instanceof DelayedAckException) {
+ // Keep compatibility.
+ DelayedAckException err0 = (DelayedAckException) cause;
+
+ transactionInflights.removeInflight(err0.txId(), err0);
+ }
+
return;
}