This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f10ab88c85 IGNITE-20416 Retry when schema change is detected during
implicit transaction (#3584)
f10ab88c85 is described below
commit f10ab88c85266990e29b64b713c2076be51e6dcd
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu Apr 11 12:07:41 2024 +0400
IGNITE-20416 Retry when schema change is detected during implicit
transaction (#3584)
---
.../Table/SchemaSynchronizationTest.cs | 5 -
.../ItSchemaSyncAndImplicitTransactionsTest.java | 290 +++++++++++++++++++++
.../schemasync/ItSchemaSyncSingleNodeTest.java | 8 +-
.../ignite/internal/table/AbstractTableView.java | 29 ++-
.../replicator/IncompatibleSchemaException.java | 3 +-
5 files changed, 319 insertions(+), 16 deletions(-)
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
index 7bc9cb63c4..13e09dff51 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -339,7 +339,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase
[Values(true, false)] bool insertNewColumn,
[Values(true, false)] bool withRemove)
{
- using var metricListener = new MetricsTests.Listener();
await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName}
(KEY bigint PRIMARY KEY)");
var table = await Client.Tables.GetTableAsync(TestTableName);
@@ -369,10 +368,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase
yield return DataStreamerItem.Create(GetTuple(i));
}
- // Wait for background streaming to complete.
- // TODO: Remove this workaround when IGNITE-20416 is fixed.
- metricListener.AssertMetricGreaterOrEqual("streamer-items-sent",
6, 3000);
-
// Update schema.
// New schema has a new column with a default value, so it is not
required to provide it in the streamed data.
await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName}
ADD COLUMN VAL varchar DEFAULT 'FOO'");
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndImplicitTransactionsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndImplicitTransactionsTest.java
new file mode 100644
index 0000000000..061d5a0b20
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndImplicitTransactionsTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests about Schema Sync interaction with implicit transactions.
+ */
+@SuppressWarnings("resource")
+class ItSchemaSyncAndImplicitTransactionsTest extends
ClusterPerClassIntegrationTest {
+ private static final int NODES_TO_START = 1;
+
+ private static final String TABLE_NAME = "test";
+
+ private static final int ITERATIONS = 100;
+
+ private static final int BATCH_SIZE = 10;
+
+ @Override
+ protected int initialNodes() {
+ return NODES_TO_START;
+ }
+
+ @BeforeEach
+ void createTable() {
+ executeUpdate(
+ "CREATE TABLE " + TABLE_NAME + " (id int, val varchar NOT
NULL, PRIMARY KEY USING HASH (id))",
+ CLUSTER.aliveNode().sql()
+ );
+ }
+
+ @AfterEach
+ void dropTable() {
+ executeUpdate("DROP TABLE IF EXISTS " + TABLE_NAME,
CLUSTER.aliveNode().sql());
+ }
+
+ private static void makeCompatibleChangeTo(String tableName) {
+ executeUpdate("ALTER TABLE " + tableName + " ALTER COLUMN val DROP NOT
NULL", CLUSTER.aliveNode().sql());
+ }
+
+ @ParameterizedTest
+ @EnumSource(BinaryRecordViewOperation.class)
+ @DisplayName("Concurrent schema changes are transparent for implicit
transactions via binary record views")
+ public void
schemaChangesTransparencyForBinaryRecordView(BinaryRecordViewOperation
operation) {
+ RecordView<Tuple> view =
CLUSTER.aliveNode().tables().table(TABLE_NAME).recordView();
+
+ CompletableFuture<Void> operationsFuture = runAsync(() -> {
+ for (int i = 0; i < ITERATIONS; i++) {
+ operation.execute(i, view);
+ }
+ });
+
+ makeCompatibleChangeTo(TABLE_NAME);
+
+ assertThat(operationsFuture, willCompleteSuccessfully());
+ }
+
+ @ParameterizedTest
+ @EnumSource(PlainRecordViewOperation.class)
+ @DisplayName("Concurrent schema changes are transparent for implicit
transactions via plain record views")
+ public void
schemaChangesTransparencyForPlainRecordView(PlainRecordViewOperation operation)
{
+ RecordView<Record> view =
CLUSTER.aliveNode().tables().table(TABLE_NAME).recordView(Record.class);
+
+ CompletableFuture<Void> operationsFuture = runAsync(() -> {
+ for (int i = 0; i < ITERATIONS; i++) {
+ operation.execute(i, view);
+ }
+ });
+
+ makeCompatibleChangeTo(TABLE_NAME);
+
+ assertThat(operationsFuture, willCompleteSuccessfully());
+ }
+
+ @ParameterizedTest
+ @EnumSource(BinaryKvViewOperation.class)
+ @DisplayName("Concurrent schema changes are transparent for implicit
transactions via binary KV views")
+ public void schemaChangesTransparencyForBinaryKvView(BinaryKvViewOperation
operation) {
+ KeyValueView<Tuple, Tuple> view =
CLUSTER.aliveNode().tables().table(TABLE_NAME).keyValueView();
+
+ CompletableFuture<Void> operationsFuture = runAsync(() -> {
+ for (int i = 0; i < ITERATIONS; i++) {
+ operation.execute(i, view);
+ }
+ });
+
+ makeCompatibleChangeTo(TABLE_NAME);
+
+ assertThat(operationsFuture, willCompleteSuccessfully());
+ }
+
+ @ParameterizedTest
+ @EnumSource(PlainKvViewOperation.class)
+ @DisplayName("Concurrent schema changes are transparent for implicit
transactions via plain KV views")
+ public void schemaChangesTransparencyForPlainKvView(PlainKvViewOperation
operation) {
+ KeyValueView<Integer, String> view =
CLUSTER.aliveNode().tables().table(TABLE_NAME).keyValueView(Integer.class,
String.class);
+
+ CompletableFuture<Void> operationsFuture = runAsync(() -> {
+ for (int i = 0; i < ITERATIONS; i++) {
+ operation.execute(i, view);
+ }
+ });
+
+ makeCompatibleChangeTo(TABLE_NAME);
+
+ assertThat(operationsFuture, willCompleteSuccessfully());
+ }
+
+ private static Tuple keyTuple(int id) {
+ return Tuple.create().set("id", id);
+ }
+
+ private static List<Tuple> keyTuples(int base) {
+ return IntStream.range(base, base + BATCH_SIZE)
+ .mapToObj(ItSchemaSyncAndImplicitTransactionsTest::keyTuple)
+ .collect(toList());
+ }
+
+ private enum BinaryRecordViewOperation {
+ SINGLE_READ((view, base) -> view.get(null, keyTuple(base))),
+ MULTI_PARTITION_READ((view, base) -> view.getAll(null,
keyTuples(base))),
+ SINGLE_WRITE((view, base) -> view.upsert(null, fullTuple(base))),
+ MULTI_PARTITION_WRITE((view, base) -> view.upsertAll(null,
fullTuples(base)));
+
+ private final BiConsumer<RecordView<Tuple>, Integer> action;
+
+ BinaryRecordViewOperation(BiConsumer<RecordView<Tuple>, Integer>
action) {
+ this.action = action;
+ }
+
+ void execute(int base, RecordView<Tuple> view) {
+ action.accept(view, base);
+ }
+
+ private static Tuple fullTuple(int id) {
+ return Tuple.create().set("id", id).set("val",
Integer.toString(id));
+ }
+
+ private static List<Tuple> fullTuples(int base) {
+ return IntStream.range(base, base + BATCH_SIZE)
+ .mapToObj(BinaryRecordViewOperation::fullTuple)
+ .collect(toList());
+ }
+ }
+
+ @SuppressWarnings({"FieldCanBeLocal", "unused"})
+ private static class Record {
+ private int id;
+ private @Nullable String val;
+
+ private Record() {
+ }
+
+ private Record(int id, @Nullable String val) {
+ this.id = id;
+ this.val = val;
+ }
+ }
+
+ private enum PlainRecordViewOperation {
+ SINGLE_READ((view, base) -> view.get(null, keyRecord(base))),
+ MULTI_PARTITION_READ((view, base) -> view.getAll(null,
keyRecords(base))),
+ SINGLE_WRITE((view, base) -> view.upsert(null, fullRecord(base))),
+ MULTI_PARTITION_WRITE((view, base) -> view.upsertAll(null,
fullRecords(base)));
+
+ private final BiConsumer<RecordView<Record>, Integer> action;
+
+ PlainRecordViewOperation(BiConsumer<RecordView<Record>, Integer>
action) {
+ this.action = action;
+ }
+
+ void execute(int base, RecordView<Record> view) {
+ action.accept(view, base);
+ }
+
+ private static Record keyRecord(int id) {
+ return new Record(id, null);
+ }
+
+ private static List<Record> keyRecords(int base) {
+ return IntStream.range(base, base + BATCH_SIZE)
+ .mapToObj(PlainRecordViewOperation::keyRecord)
+ .collect(toList());
+ }
+
+ private static Record fullRecord(int id) {
+ return new Record(id, Integer.toString(id));
+ }
+
+ private static List<Record> fullRecords(int base) {
+ return IntStream.range(base, base + BATCH_SIZE)
+ .mapToObj(PlainRecordViewOperation::fullRecord)
+ .collect(toList());
+ }
+ }
+
+ private enum BinaryKvViewOperation {
+ SINGLE_READ((view, base) -> view.get(null, keyTuple(base))),
+ MULTI_PARTITION_READ((view, base) -> view.getAll(null,
keyTuples(base))),
+ SINGLE_WRITE((view, base) -> view.put(null, keyTuple(base),
valueTuple(base))),
+ MULTI_PARTITION_WRITE((view, base) -> view.putAll(null,
mapOfTuples(base)));
+
+ private final BiConsumer<KeyValueView<Tuple, Tuple>, Integer> action;
+
+ BinaryKvViewOperation(BiConsumer<KeyValueView<Tuple, Tuple>, Integer>
action) {
+ this.action = action;
+ }
+
+ void execute(int base, KeyValueView<Tuple, Tuple> view) {
+ action.accept(view, base);
+ }
+
+ private static Tuple valueTuple(int id) {
+ return Tuple.create().set("val", Integer.toString(id));
+ }
+
+ private static Map<Tuple, Tuple> mapOfTuples(int base) {
+ return IntStream.range(base, base + BATCH_SIZE)
+ .boxed()
+
.collect(toMap(ItSchemaSyncAndImplicitTransactionsTest::keyTuple,
BinaryKvViewOperation::valueTuple));
+ }
+ }
+
+ private enum PlainKvViewOperation {
+ SINGLE_READ((view, base) -> view.get(null, base)),
+ MULTI_PARTITION_READ((view, base) -> view.getAll(null, keys(base))),
+ SINGLE_WRITE((view, base) -> view.put(null, base,
Integer.toString(base))),
+ MULTI_PARTITION_WRITE((view, base) -> view.putAll(null,
mapToValues(base)));
+
+ private final BiConsumer<KeyValueView<Integer, String>, Integer>
action;
+
+ PlainKvViewOperation(BiConsumer<KeyValueView<Integer, String>,
Integer> action) {
+ this.action = action;
+ }
+
+ void execute(int base, KeyValueView<Integer, String> view) {
+ action.accept(view, base);
+ }
+
+ private static List<Integer> keys(int base) {
+ return IntStream.range(base, base + BATCH_SIZE)
+ .boxed()
+ .collect(toList());
+ }
+
+ private static Map<Integer, String> mapToValues(int base) {
+ return IntStream.range(base, base + BATCH_SIZE)
+ .boxed()
+ .collect(toMap(identity(), n -> Integer.toString(n)));
+ }
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
index 206670eced..eef367cfcf 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
@@ -120,15 +120,11 @@ class ItSchemaSyncSingleNodeTest extends
ClusterPerTestIntegrationTest {
}
private void createTable() {
- cluster.doInSession(0, session -> {
- executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int, val
varchar, PRIMARY KEY USING HASH (id))", session);
- });
+ executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int, val varchar,
PRIMARY KEY USING HASH (id))", node.sql());
}
private void alterTable(String tableName) {
- cluster.doInSession(0, session -> {
- executeUpdate("ALTER TABLE " + tableName + " ADD COLUMN added
int", session);
- });
+ executeUpdate("ALTER TABLE " + tableName + " ADD COLUMN added int",
node.sql());
}
private static void putPreExistingValueTo(Table table) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 9954734794..bfffdf5956 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -41,6 +41,7 @@ import
org.apache.ignite.internal.table.criteria.CursorAdapter;
import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor;
import org.apache.ignite.internal.table.criteria.SqlSerializer;
import org.apache.ignite.internal.table.criteria.SqlSerializer.Builder;
+import
org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException;
import
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.tx.InternalTransaction;
@@ -161,21 +162,41 @@ abstract class AbstractTableView<R> implements
CriteriaQuerySource<R> {
return schemaVersionFuture
.thenCompose(schemaVersion -> action.act(schemaVersion)
.handle((res, ex) -> {
+ if (ex == null) {
+ return completedFuture(res);
+ }
+
if
(isOrCausedBy(InternalSchemaVersionMismatchException.class, ex)) {
+ // There is no transaction, and table version
was changed between taking the table version (that was used
+ // to marshal inputs and would be used to
unmarshal outputs) and starting an implicit transaction
+ // in InternalTable. A transaction must always
work with binary rows of the same table version matching the
+ // version corresponding to the transaction
creation moment, so this mismatch is not tolerable: we need
+ // to retry the operation here.
+
assert tx == null : "Only for implicit
transactions a retry might be requested";
- assert previousSchemaVersion == null ||
!Objects.equals(schemaVersion, previousSchemaVersion)
- : "Same schema version (" +
schemaVersion
- + ") on a retry: something is wrong,
is this caused by the test setup?";
+
assertSchemaVersionIncreased(previousSchemaVersion, schemaVersion);
// Repeat.
+ return withSchemaSync(tx, schemaVersion,
action);
+ } else if (tx == null &&
isOrCausedBy(IncompatibleSchemaException.class, ex)) {
+ // Table version was changed while we were
executing an implicit transaction (between it had been created
+ // and the moment when the operation actually
touched the partition), let's retry.
+
assertSchemaVersionIncreased(previousSchemaVersion, schemaVersion);
+
return withSchemaSync(tx, schemaVersion,
action);
} else {
- return ex == null ? completedFuture(res) :
CompletableFuture.<T>failedFuture(ex);
+ return CompletableFuture.<T>failedFuture(ex);
}
}))
.thenCompose(identity());
}
+ private static void assertSchemaVersionIncreased(@Nullable Integer
previousSchemaVersion, Integer schemaVersion) {
+ assert previousSchemaVersion == null || !Objects.equals(schemaVersion,
previousSchemaVersion)
+ : "Same schema version (" + schemaVersion
+ + ") on a retry: something is wrong, is this caused by the
test setup?";
+ }
+
/**
* Map columns to it's names.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaException.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaException.java
index f553110951..04480f784c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaException.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/IncompatibleSchemaException.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.replicator;
+import
org.apache.ignite.internal.replicator.exception.ExpectedReplicationException;
import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.tx.TransactionException;
@@ -24,7 +25,7 @@ import org.apache.ignite.tx.TransactionException;
* Thrown when, during an attempt to execute a transactional operation, it
turns out that the operation cannot be executed
* because an incompatible schema change has happened.
*/
-public class IncompatibleSchemaException extends TransactionException {
+public class IncompatibleSchemaException extends TransactionException
implements ExpectedReplicationException {
public IncompatibleSchemaException(String message) {
super(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR, message);
}