This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d93bc6f16e IGNITE-18358 IGN-TX-5 on concurrent transactional single
key load (#1563)
d93bc6f16e is described below
commit d93bc6f16ee7dc35490940451a5ac82320fcff25
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Fri Jan 27 13:34:12 2023 +0400
IGNITE-18358 IGN-TX-5 on concurrent transactional single key load (#1563)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 17 ++--
.../ignite/internal/replicator/ReplicaService.java | 24 ++---
.../runner/app/ItTableApiContractTest.java | 113 +++++++++++----------
.../distributed/storage/InternalTableImpl.java | 68 +++++++------
.../apache/ignite/internal/tx/LockException.java | 6 +-
5 files changed, 119 insertions(+), 109 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index dcdb73e759..f9a7a38024 100755
--- a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -265,29 +265,26 @@ public class ErrorGroups {
/** Error of unexpected tx state on state change. */
public static final int TX_UNEXPECTED_STATE_ERR =
TX_ERR_GROUP.registerErrorCode(3);
- /** Failed to release a lock on a key. */
- public static final int RELEASE_LOCK_ERR =
TX_ERR_GROUP.registerErrorCode(4);
-
/** Failed to acquire a lock on a key due to a conflict. */
- public static final int ACQUIRE_LOCK_ERR =
TX_ERR_GROUP.registerErrorCode(5);
+ public static final int ACQUIRE_LOCK_ERR =
TX_ERR_GROUP.registerErrorCode(4);
/** Failed to acquire a lock on a key within a timeout. */
- public static final int ACQUIRE_LOCK_TIMEOUT_ERR =
TX_ERR_GROUP.registerErrorCode(6);
+ public static final int ACQUIRE_LOCK_TIMEOUT_ERR =
TX_ERR_GROUP.registerErrorCode(5);
/** Failed to commit a transaction. */
- public static final int TX_COMMIT_ERR =
TX_ERR_GROUP.registerErrorCode(7);
+ public static final int TX_COMMIT_ERR =
TX_ERR_GROUP.registerErrorCode(6);
/** Failed to rollback a transaction. */
- public static final int TX_ROLLBACK_ERR =
TX_ERR_GROUP.registerErrorCode(8);
+ public static final int TX_ROLLBACK_ERR =
TX_ERR_GROUP.registerErrorCode(7);
/** Failed to enlist read-write operation into read-only transaction.
*/
- public static final int TX_FAILED_READ_WRITE_OPERATION_ERR =
TX_ERR_GROUP.registerErrorCode(9);
+ public static final int TX_FAILED_READ_WRITE_OPERATION_ERR =
TX_ERR_GROUP.registerErrorCode(8);
/** The error happens when the replica is not ready to handle a
request. */
- public static final int TX_REPLICA_UNAVAILABLE_ERR =
TX_ERR_GROUP.registerErrorCode(10);
+ public static final int TX_REPLICA_UNAVAILABLE_ERR =
TX_ERR_GROUP.registerErrorCode(9);
/** Tx state storage rebalancing error. */
- public static final int TX_STATE_STORAGE_REBALANCE_ERR =
TX_ERR_GROUP.registerErrorCode(11);
+ public static final int TX_STATE_STORAGE_REBALANCE_ERR =
TX_ERR_GROUP.registerErrorCode(10);
}
/** Replicator error group. */
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index 2b9ea6817c..e5c36e1dab 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -96,13 +96,13 @@ public class ReplicaService {
if (throwable instanceof TimeoutException) {
res.get().completeExceptionally(new
ReplicationTimeoutException(req.groupId()));
+ } else {
+ res.get().completeExceptionally(withCause(
+ ReplicationException::new,
+ REPLICA_COMMON_ERR,
+ "Failed to process replica request
[replicaGroupId=" + req.groupId() + ']',
+ throwable));
}
-
- res.get().completeExceptionally(withCause(
- ReplicationException::new,
- REPLICA_COMMON_ERR,
- "Failed to process replica request [replicaGroupId=" +
req.groupId() + ']',
- throwable));
} else {
assert response instanceof ReplicaResponse : "Unexpected
message response [resp=" + response + ']';
@@ -134,13 +134,13 @@ public class ReplicaService {
if (throwable0 instanceof
TimeoutException) {
res.get().completeExceptionally(errResp.throwable());
+ } else {
+
res.get().completeExceptionally(withCause(
+ ReplicationException::new,
+ REPLICA_COMMON_ERR,
+ "Failed to process replica
request [replicaGroupId=" + req.groupId() + ']',
+ throwable0));
}
-
- res.get().completeExceptionally(withCause(
- ReplicationException::new,
- REPLICA_COMMON_ERR,
- "Failed to process replica request
[replicaGroupId=" + req.groupId() + ']',
- throwable0));
} else {
res.get().thenCompose(ignore ->
sendToReplica(node, req));
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
index a746ab279b..7e2248502c 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -32,9 +33,14 @@ import
org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.lang.TableAlreadyExistsException;
import org.apache.ignite.lang.TableNotFoundException;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.TransactionException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
@@ -73,9 +79,53 @@ public class ItTableApiContractTest extends
AbstractBasicIntegrationTest {
*/
@AfterEach
void afterTest() {
- if (ignite.tables().table(TABLE_NAME) != null) {
- await(tableManager().dropTableAsync(TABLE_NAME));
- }
+ sql("DROP TABLE IF EXISTS " + TABLE_NAME);
+ }
+
+ /**
+ * Executes before test.
+ */
+ @BeforeEach
+ void beforeTest() {
+ sql("CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (name VARCHAR
PRIMARY KEY, balance INT NOT NULL)");
+ }
+
+ /**
+ * The test invokes various API methods on KeyValue and Record views when
key is locked.
+ * The expected behavior all the invocations lead to a transaction
exception due to the key is locked.
+ */
+ @Test
+ public void tableTransactionExceptionContract() {
+ KeyValueView<String, Integer> kv =
ignite.tables().table(TABLE_NAME).keyValueView(String.class, Integer.class);
+
+ var tx = ignite.transactions().begin();
+
+ kv.put(tx, "k1", 1);
+
+ assertThrowsExactly(TransactionException.class, () -> kv.put(null,
"k1", 2));
+ assertThrowsExactly(TransactionException.class, () -> kv.get(null,
"k1"));
+ assertThrowsExactly(TransactionException.class, () -> kv.remove(null,
"k1"));
+ assertThrowsExactly(TransactionException.class, () -> kv.remove(null,
"k1", 1));
+ assertThrowsExactly(TransactionException.class, () ->
kv.contains(null, "k1"));
+ assertThrowsExactly(TransactionException.class, () -> kv.replace(null,
"k1", 2));
+
+ tx.rollback();
+
+ RecordView<Tuple> recordView =
ignite.tables().table(TABLE_NAME).recordView();
+
+ tx = ignite.transactions().begin();
+
+ recordView.insert(tx, Tuple.create().set("name", "k1").set("balance",
1));
+
+ assertThrowsExactly(TransactionException.class, () ->
recordView.insert(null, Tuple.create().set("name", "k1").set("balance", 2)));
+ assertThrowsExactly(TransactionException.class, () ->
recordView.upsert(null, Tuple.create().set("name", "k1").set("balance", 2)));
+ assertThrowsExactly(TransactionException.class, () ->
recordView.get(null, Tuple.create().set("name", "k1")));
+ assertThrowsExactly(TransactionException.class, () ->
recordView.delete(null, Tuple.create().set("name", "k1")));
+ assertThrowsExactly(TransactionException.class,
+ () -> recordView.deleteExact(null, Tuple.create().set("name",
"k1").set("balance", 1)));
+ assertThrowsExactly(TransactionException.class, () ->
recordView.replace(null, Tuple.create().set("name", "k1").set("balance", 2)));
+
+ tx.rollback();
}
/**
@@ -85,15 +135,6 @@ public class ItTableApiContractTest extends
AbstractBasicIntegrationTest {
*/
@Test
public void testDropTable() throws Exception {
- await(tableManager().createTableAsync(TABLE_NAME, tableChange ->
convert(SchemaBuilders.tableBuilder(SCHEMA, TABLE_NAME)
- .columns(
- SchemaBuilders.column("key", ColumnType.INT64).build(),
- SchemaBuilders.column("val",
ColumnType.string()).build())
- .withPrimaryKey("key")
- .build(), tableChange)
- .changeReplicas(2)
- .changePartitions(10)));
-
CompletableFuture<Void> dropTblFut1 =
tableManager().dropTableAsync(TABLE_NAME);
dropTblFut1.get();
@@ -112,18 +153,9 @@ public class ItTableApiContractTest extends
AbstractBasicIntegrationTest {
*/
@Test
public void testAlterTable() throws Exception {
- await(tableManager().createTableAsync(TABLE_NAME, tableChange ->
convert(SchemaBuilders.tableBuilder(SCHEMA, TABLE_NAME)
- .columns(
- SchemaBuilders.column("key", ColumnType.INT64).build(),
- SchemaBuilders.column("val",
ColumnType.string()).build())
- .withPrimaryKey("key")
- .build(), tableChange)
- .changeReplicas(2)
- .changePartitions(10)));
-
await(tableManager().alterTableAsync(TABLE_NAME, chng -> {
chng.changeColumns(cols ->
- cols.create("NAME", colChg ->
convert(SchemaBuilders.column("name", ColumnType.string()).asNullable(true)
+ cols.create("NAME_1", colChg ->
convert(SchemaBuilders.column("NAME_1", ColumnType.string()).asNullable(true)
.withDefaultValue("default").build(), colChg)));
return true;
}));
@@ -134,7 +166,7 @@ public class ItTableApiContractTest extends
AbstractBasicIntegrationTest {
assertThrows(TableNotFoundException.class, () ->
await(tableManager().alterTableAsync(TABLE_NAME + "_not_exist", chng -> {
chng.changeColumns(cols ->
- cols.create("NAME", colChg ->
convert(SchemaBuilders.column("name", ColumnType.string()).asNullable(true)
+ cols.create("NAME_1", colChg ->
convert(SchemaBuilders.column("NAME_1", ColumnType.string()).asNullable(true)
.withDefaultValue("default").build(), colChg)));
return true;
})));
@@ -147,19 +179,10 @@ public class ItTableApiContractTest extends
AbstractBasicIntegrationTest {
*/
@Test
public void testAlterTableAsync() throws Exception {
- await(tableManager().createTableAsync(TABLE_NAME, tableChange ->
convert(SchemaBuilders.tableBuilder(SCHEMA, TABLE_NAME)
- .columns(
- SchemaBuilders.column("key", ColumnType.INT64).build(),
- SchemaBuilders.column("val",
ColumnType.string()).build())
- .withPrimaryKey("key")
- .build(), tableChange)
- .changeReplicas(2)
- .changePartitions(10)));
-
CompletableFuture<Void> altTblFut1 =
tableManager().alterTableAsync(TABLE_NAME,
chng -> {
chng.changeColumns(cols ->
- cols.create("NAME", colChg ->
convert(SchemaBuilders.column("NAME",
+ cols.create("NAME_1", colChg ->
convert(SchemaBuilders.column("NAME_1",
ColumnType.string()).asNullable(true).withDefaultValue("default").build(),
colChg)));
return true;
});
@@ -167,7 +190,7 @@ public class ItTableApiContractTest extends
AbstractBasicIntegrationTest {
CompletableFuture<Void> altTblFut2 =
tableManager().alterTableAsync(TABLE_NAME + "_not_exist",
chng -> {
chng.changeColumns(cols ->
- cols.create("NAME", colChg ->
convert(SchemaBuilders.column("NAME",
+ cols.create("NAME_1", colChg ->
convert(SchemaBuilders.column("NAME_1",
ColumnType.string()).asNullable(true).withDefaultValue("default").build(),
colChg)));
return true;
});
@@ -188,15 +211,7 @@ public class ItTableApiContractTest extends
AbstractBasicIntegrationTest {
*/
@Test
public void testCreateTable() throws Exception {
- Table table = await(tableManager().createTableAsync(TABLE_NAME,
- tableChange -> convert(SchemaBuilders.tableBuilder(SCHEMA,
TABLE_NAME)
- .columns(
- SchemaBuilders.column("key",
ColumnType.INT64).build(),
- SchemaBuilders.column("val",
ColumnType.string()).build())
- .withPrimaryKey("key")
- .build(), tableChange)
- .changeReplicas(2)
- .changePartitions(10)));
+ Table table = ignite.tables().table(TABLE_NAME);
assertNotNull(table);
@@ -219,17 +234,7 @@ public class ItTableApiContractTest extends
AbstractBasicIntegrationTest {
*/
@Test
public void testCreateTableAsync() throws Exception {
- CompletableFuture<Table> tableFut1 = tableManager()
- .createTableAsync(TABLE_NAME, tableChange ->
convert(SchemaBuilders.tableBuilder(SCHEMA, TABLE_NAME)
- .columns(
- SchemaBuilders.column("key",
ColumnType.INT64).build(),
- SchemaBuilders.column("val",
ColumnType.string()).build())
- .withPrimaryKey("key")
- .build(), tableChange)
- .changeReplicas(2)
- .changePartitions(10));
-
- assertNotNull(tableFut1.get());
+ assertNotNull(ignite.tables().table(TABLE_NAME));
CompletableFuture<Table> tableFut2 = tableManager()
.createTableAsync(TABLE_NAME, tableChange ->
convert(SchemaBuilders.tableBuilder(SCHEMA, TABLE_NAME)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index b9ffec9220..554498ac9d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.table.distributed.storage;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
+import static org.apache.ignite.lang.ErrorGroups.Common.UNEXPECTED_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_REPLICA_UNAVAILABLE_ERR;
@@ -72,9 +74,11 @@ import
org.apache.ignite.internal.table.distributed.replication.request.ReadWrit
import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteFiveFunction;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
@@ -461,31 +465,28 @@ public class InternalTableImpl implements InternalTable {
* @return The future.
*/
private <T> CompletableFuture<T> postEnlist(CompletableFuture<T> fut,
boolean implicit, InternalTransaction tx0) {
- return fut.handle(new BiFunction<T, Throwable, CompletableFuture<T>>()
{
- @Override
- public CompletableFuture<T> apply(T r, Throwable e) {
- if (e != null) {
- Throwable e0 = wrapReplicationException((RuntimeException)
e);
+ return fut.handle((BiFunction<T, Throwable, CompletableFuture<T>>) (r,
e) -> {
+ if (e != null) {
+ RuntimeException e0 = wrapReplicationException(e);
- return tx0.rollbackAsync().handle((ignored, err) -> {
+ return tx0.rollbackAsync().handle((ignored, err) -> {
- if (err != null) {
- e0.addSuppressed(err);
- }
- throw (RuntimeException) e0;
- }); // Preserve failed state.
- } else {
- tx0.enlistResultFuture(fut);
-
- if (implicit) {
- return tx0.commitAsync()
- .exceptionally(ex -> {
- throw
wrapReplicationException((RuntimeException) ex);
- })
- .thenApply(ignored -> r);
- } else {
- return completedFuture(r);
+ if (err != null) {
+ e0.addSuppressed(err);
}
+ throw e0;
+ }); // Preserve failed state.
+ } else {
+ tx0.enlistResultFuture(fut);
+
+ if (implicit) {
+ return tx0.commitAsync()
+ .exceptionally(ex -> {
+ throw wrapReplicationException(ex);
+ })
+ .thenApply(ignored -> r);
+ } else {
+ return completedFuture(r);
}
}
}).thenCompose(x -> x);
@@ -1365,19 +1366,28 @@ public class InternalTableImpl implements InternalTable
{
}
/**
- * Wraps {@link ReplicationException} or {@link ConnectException} with
{@link TransactionException}.
+ * Casts any exception type to a client exception, wherein {@link
ReplicationException} and {@link LockException} are wrapped
+ * to {@link TransactionException}, but another exceptions are wrapped to
a common exception.
+ * The method does not wrap an exception if the exception already inherits
type of {@link RuntimeException}.
*
- * @param e {@link ReplicationException} or {@link CompletionException}
with cause {@link ConnectException} or {@link TimeoutException}
- * @return {@link TransactionException}
+ * @param e An instance exception to cast to client side one.
+ * @return {@link IgniteException} An instance of client side exception.
*/
- private RuntimeException wrapReplicationException(RuntimeException e) {
+ private RuntimeException wrapReplicationException(Throwable e) {
+ if (e instanceof CompletionException) {
+ e = e.getCause();
+ }
+
RuntimeException e0;
- if (e instanceof ReplicationException || e.getCause() instanceof
ReplicationException || e.getCause() instanceof ConnectException
- || e.getCause() instanceof TimeoutException) {
+ if (e instanceof ReplicationException || e instanceof ConnectException
|| e instanceof TimeoutException) {
e0 = withCause(TransactionException::new,
TX_REPLICA_UNAVAILABLE_ERR, e);
+ } else if (e instanceof LockException) {
+ e0 = withCause(TransactionException::new, ACQUIRE_LOCK_ERR, e);
+ } else if (!(e instanceof RuntimeException)) {
+ e0 = withCause(IgniteException::new, UNEXPECTED_ERR, e);
} else {
- e0 = e;
+ e0 = (RuntimeException) e;
}
return e0;
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
index e43167b72e..2bf529f6dc 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
@@ -26,8 +26,7 @@ public class LockException extends
TransactionInternalCheckedException {
/**
* Creates a new instance of LockException with the given message.
*
- * @param code Full error code. {@link
org.apache.ignite.lang.ErrorGroups.Transactions#RELEASE_LOCK_ERR},
- * {@link
org.apache.ignite.lang.ErrorGroups.Transactions#ACQUIRE_LOCK_ERR},
+ * @param code Full error code. {{@link
org.apache.ignite.lang.ErrorGroups.Transactions#ACQUIRE_LOCK_ERR},
* {@link
org.apache.ignite.lang.ErrorGroups.Transactions#ACQUIRE_LOCK_TIMEOUT_ERR},
* @param msg The detail message.
*/
@@ -39,8 +38,7 @@ public class LockException extends
TransactionInternalCheckedException {
* Creates a new exception of LockException with the given trace id, error
code, detail message and cause.
*
* @param traceId Unique identifier of this exception.
- * @param code Full error code. {@link
org.apache.ignite.lang.ErrorGroups.Transactions#RELEASE_LOCK_ERR},
- * {@link
org.apache.ignite.lang.ErrorGroups.Transactions#ACQUIRE_LOCK_ERR}
+ * @param code Full error code. {{@link
org.apache.ignite.lang.ErrorGroups.Transactions#ACQUIRE_LOCK_ERR},
* {@link
org.apache.ignite.lang.ErrorGroups.Transactions#ACQUIRE_LOCK_TIMEOUT_ERR},
* @param message Detail message.
* @param cause Optional nested exception (can be {@code null}).