This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 57fd6e1873 IGNITE-23013 Support IgniteSql transparency with respect to
node restart (#4279)
57fd6e1873 is described below
commit 57fd6e18737e1c2f7df2e830724cd6c28eed334e
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Aug 26 10:29:54 2024 +0400
IGNITE-23013 Support IgniteSql transparency with respect to node restart
(#4279)
---
.../internal/app/ApiReferencesTestUtils.java | 3 +
.../ignite/internal/app/AsyncApiOperation.java | 14 +-
.../app/ItShutDownServerApiReferencesTest.java | 9 +-
.../org/apache/ignite/internal/app/References.java | 12 ++
.../ignite/internal/app/SyncApiOperation.java | 28 +++-
.../internal/restart/RestartProofIgnite.java | 9 +-
.../internal/restart/RestartProofIgniteSql.java | 162 +++++++++++++++++++++
7 files changed, 226 insertions(+), 11 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ApiReferencesTestUtils.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ApiReferencesTestUtils.java
index 16e7ce8c97..b3b9479c96 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ApiReferencesTestUtils.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ApiReferencesTestUtils.java
@@ -27,4 +27,7 @@ class ApiReferencesTestUtils {
static final Tuple KEY_TUPLE = Tuple.create().set("id", 1);
static final Tuple VALUE_TUPLE = Tuple.create().set("val", "one");
static final Tuple FULL_TUPLE = Tuple.create().set("id", 1).set("val",
"one");
+
+ static final String SELECT_IDS_QUERY = "SELECT id FROM " + TEST_TABLE_NAME;
+ static final String UPDATE_QUERY = "UPDATE " + TEST_TABLE_NAME + " SET val
= val WHERE id = ?";
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/AsyncApiOperation.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/AsyncApiOperation.java
index 694dd388a4..f94df5e7ce 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/AsyncApiOperation.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/AsyncApiOperation.java
@@ -19,7 +19,9 @@ package org.apache.ignite.internal.app;
import static org.apache.ignite.internal.app.ApiReferencesTestUtils.FULL_TUPLE;
import static org.apache.ignite.internal.app.ApiReferencesTestUtils.KEY_TUPLE;
+import static
org.apache.ignite.internal.app.ApiReferencesTestUtils.SELECT_IDS_QUERY;
import static
org.apache.ignite.internal.app.ApiReferencesTestUtils.TEST_TABLE_NAME;
+import static
org.apache.ignite.internal.app.ApiReferencesTestUtils.UPDATE_QUERY;
import static
org.apache.ignite.internal.app.ApiReferencesTestUtils.VALUE_TUPLE;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -30,6 +32,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.internal.streamer.SimplePublisher;
import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
@@ -116,7 +119,16 @@ enum AsyncApiOperation {
TRANSACTIONS_BEGIN(refs -> refs.transactions.beginAsync()),
TRANSACTIONS_BEGIN_WITH_OPTS(refs -> refs.transactions.beginAsync(null)),
TRANSACTIONS_RUN_IN_TRANSACTION(refs ->
refs.transactions.runInTransactionAsync(tx -> nullCompletedFuture())),
- TRANSACTIONS_RUN_IN_TRANSACTION_WITH_OPTS(refs ->
refs.transactions.runInTransactionAsync(tx -> nullCompletedFuture(), null));
+ TRANSACTIONS_RUN_IN_TRANSACTION_WITH_OPTS(refs ->
refs.transactions.runInTransactionAsync(tx -> nullCompletedFuture(), null)),
+
+ SQL_EXECUTE(refs -> refs.sql.executeAsync(null, SELECT_IDS_QUERY)),
+ SQL_EXECUTE_STATEMENT(refs -> refs.sql.executeAsync(null,
refs.selectIdsStatement)),
+ // TODO: IGNITE-18695 - uncomment the following 2 lines.
+ // SQL_EXECUTE_WITH_MAPPER(refs -> refs.sql.executeAsync(null,
Mapper.of(Integer.class), SELECT_IDS_QUERY)),
+ // SQL_EXECUTE_STATEMENT_WITH_MAPPER(refs -> refs.sql.executeAsync(null,
Mapper.of(Integer.class), refs.selectIdsStatement)),
+ SQL_EXECUTE_BATCH(refs -> refs.sql.executeBatchAsync(null, UPDATE_QUERY,
BatchedArguments.of(999))),
+ SQL_EXECUTE_BATCH_STATEMENT(refs -> refs.sql.executeBatchAsync(null,
refs.updateStatement, BatchedArguments.of(999))),
+ SQL_EXECUTE_SCRIPT(refs -> refs.sql.executeScriptAsync(SELECT_IDS_QUERY));
private final Function<References, CompletableFuture<?>> action;
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ItShutDownServerApiReferencesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ItShutDownServerApiReferencesTest.java
index 15d51fae61..3355a0b567 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ItShutDownServerApiReferencesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ItShutDownServerApiReferencesTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
@@ -57,8 +58,12 @@ class ItShutDownServerApiReferencesTest extends
ClusterPerClassIntegrationTest {
@ParameterizedTest
@EnumSource(SyncApiOperation.class)
void syncOperationsThrowAfterShutdown(SyncApiOperation operation) {
- IgniteException ex = assertThrows(IgniteException.class, () ->
operation.execute(beforeShutdown));
- assertThat(ex.getMessage(), is("The node is already shut down."));
+ if (operation.worksAfterShutdown()) {
+ assertDoesNotThrow(() -> operation.execute(beforeShutdown));
+ } else {
+ IgniteException ex = assertThrows(IgniteException.class, () ->
operation.execute(beforeShutdown));
+ assertThat(ex.getMessage(), is("The node is already shut down."));
+ }
}
@ParameterizedTest
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/References.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/References.java
index c91dae875f..16efeddb7d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/References.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/References.java
@@ -18,10 +18,14 @@
package org.apache.ignite.internal.app;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.internal.app.ApiReferencesTestUtils.SELECT_IDS_QUERY;
import static
org.apache.ignite.internal.app.ApiReferencesTestUtils.TEST_TABLE_NAME;
+import static
org.apache.ignite.internal.app.ApiReferencesTestUtils.UPDATE_QUERY;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteServer;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.Statement;
import org.apache.ignite.table.IgniteTables;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
@@ -39,6 +43,7 @@ class References {
final IgniteTables tables;
final IgniteTransactions transactions;
+ final IgniteSql sql;
final Table table; // From table().
final Table tableFromTableAsync;
@@ -55,11 +60,15 @@ class References {
final PartitionManager partitionManager;
+ final Statement selectIdsStatement;
+ final Statement updateStatement;
+
References(IgniteServer server) throws Exception {
ignite = server.api();
tables = ignite.tables();
transactions = ignite.transactions();
+ sql = ignite.sql();
table = tables.table(TEST_TABLE_NAME);
tableFromTableAsync = tables.tableAsync(TEST_TABLE_NAME).get(10,
SECONDS);
@@ -75,5 +84,8 @@ class References {
mappedRecordView = table.recordView(Mapper.of(Record.class));
partitionManager = table.partitionManager();
+
+ selectIdsStatement = sql.createStatement(SELECT_IDS_QUERY);
+ updateStatement = sql.createStatement(UPDATE_QUERY);
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/SyncApiOperation.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/SyncApiOperation.java
index 0f2e2c9988..8f6ff55d20 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/SyncApiOperation.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/SyncApiOperation.java
@@ -19,12 +19,15 @@ package org.apache.ignite.internal.app;
import static org.apache.ignite.internal.app.ApiReferencesTestUtils.FULL_TUPLE;
import static org.apache.ignite.internal.app.ApiReferencesTestUtils.KEY_TUPLE;
+import static
org.apache.ignite.internal.app.ApiReferencesTestUtils.SELECT_IDS_QUERY;
import static
org.apache.ignite.internal.app.ApiReferencesTestUtils.TEST_TABLE_NAME;
+import static
org.apache.ignite.internal.app.ApiReferencesTestUtils.UPDATE_QUERY;
import static
org.apache.ignite.internal.app.ApiReferencesTestUtils.VALUE_TUPLE;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
+import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.table.mapper.Mapper;
/**
@@ -47,9 +50,9 @@ enum SyncApiOperation {
TABLE_KV_VIEW(refs -> refs.table.keyValueView()),
TABLE_TYPED_KV_VIEW(refs -> refs.table.keyValueView(Integer.class,
String.class)),
TABLE_MAPPED_KV_VIEW(refs ->
refs.table.keyValueView(Mapper.of(Integer.class), Mapper.of(String.class))),
- TABLE_RECORDVIEW(refs -> refs.table.recordView()),
- TABLE_TYPED_RECORDVIEW(refs -> refs.table.recordView(Record.class)),
- TABLE_MAPPED_RECORDVIEW(refs ->
refs.table.recordView(Mapper.of(Record.class))),
+ TABLE_RECORD_VIEW(refs -> refs.table.recordView()),
+ TABLE_TYPED_RECORD_VIEW(refs -> refs.table.recordView(Record.class)),
+ TABLE_MAPPED_RECORD_VIEW(refs ->
refs.table.recordView(Mapper.of(Record.class))),
TABLE_PARTITION_MANAGER(refs -> refs.table.partitionManager()),
TABLE_FROM_TABLE_ASYNC_PUT(refs ->
refs.tableFromTableAsync.keyValueView().put(null, KEY_TUPLE, VALUE_TUPLE)),
@@ -110,7 +113,18 @@ enum SyncApiOperation {
TRANSACTIONS_RUN_CONSUMER_IN_TRANSACTION(refs ->
refs.transactions.runInTransaction(tx -> {})),
TRANSACTIONS_RUN_CONSUMER_IN_TRANSACTION_WITH_OPTS(refs ->
refs.transactions.runInTransaction(tx -> {}, null)),
TRANSACTIONS_RUN_FUNCTION_IN_TRANSACTION(refs ->
refs.transactions.runInTransaction(tx -> null)),
- TRANSACTIONS_RUN_FUNCTION_IN_TRANSACTION_WITH_OPTS(refs ->
refs.transactions.runInTransaction(tx -> null, null));
+ TRANSACTIONS_RUN_FUNCTION_IN_TRANSACTION_WITH_OPTS(refs ->
refs.transactions.runInTransaction(tx -> null, null)),
+
+ SQL_CREATE_STATEMENT(refs -> refs.sql.createStatement(SELECT_IDS_QUERY)),
+ SQL_STATEMENT_BUILDER(refs -> refs.sql.statementBuilder()),
+ SQL_EXECUTE(refs -> refs.sql.execute(null, SELECT_IDS_QUERY)),
+ SQL_EXECUTE_STATEMENT(refs -> refs.sql.execute(null,
refs.selectIdsStatement)),
+ // TODO: IGNITE-18695 - uncomment the following 2 lines.
+ // SQL_EXECUTE_WITH_MAPPER(refs -> refs.sql.execute(null,
Mapper.of(Integer.class), SELECT_IDS_QUERY)),
+ // SQL_EXECUTE_STATEMENT_WITH_MAPPER(refs -> refs.sql.execute(null,
Mapper.of(Integer.class), refs.selectIdsStatement)),
+ SQL_EXECUTE_BATCH(refs -> refs.sql.executeBatch(null, UPDATE_QUERY,
BatchedArguments.of(999))),
+ SQL_EXECUTE_BATCH_STATEMENT(refs -> refs.sql.executeBatch(null,
refs.updateStatement, BatchedArguments.of(999))),
+ SQL_EXECUTE_SCRIPT(refs -> refs.sql.executeScript(SELECT_IDS_QUERY));
private final Consumer<References> action;
@@ -121,4 +135,10 @@ enum SyncApiOperation {
void execute(References references) {
action.accept(references);
}
+
+ boolean worksAfterShutdown() {
+ return this == IGNITE_TABLES
+ || this == IGNITE_TRANSACTIONS
+ || this == IGNITE_SQL;
+ }
}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgnite.java
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgnite.java
index cee6c829cf..d86617527e 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgnite.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgnite.java
@@ -39,6 +39,7 @@ public class RestartProofIgnite implements Ignite, Wrapper {
private final IgniteTables tables;
private final IgniteTransactions transactions;
+ private final IgniteSql sql;
/**
* Constructor.
@@ -48,6 +49,7 @@ public class RestartProofIgnite implements Ignite, Wrapper {
tables = new RestartProofIgniteTables(attachmentLock);
transactions = new RestartProofIgniteTransactions(attachmentLock);
+ sql = new RestartProofIgniteSql(attachmentLock);
}
@Override
@@ -57,18 +59,17 @@ public class RestartProofIgnite implements Ignite, Wrapper {
@Override
public IgniteTables tables() {
- return attachmentLock.attached(ignite -> tables);
+ return tables;
}
@Override
public IgniteTransactions transactions() {
- return attachmentLock.attached(ignite -> transactions);
+ return transactions;
}
@Override
public IgniteSql sql() {
- // TODO: IGNITE-23013 - add a wrapper.
- return attachmentLock.attached(Ignite::sql);
+ return sql;
}
@Override
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java
new file mode 100644
index 0000000000..e120ebfa19
--- /dev/null
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.restart;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.internal.wrapper.Wrappers;
+import org.apache.ignite.sql.BatchedArguments;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reference to {@link IgniteSql} under a swappable {@link Ignite} instance.
When a restart happens, this switches to
+ * the new Ignite instance.
+ *
+ * <p>API operations on this are linearized with respect to node restarts.
Normally (except for situations when timeouts trigger), user
+ * operations will not interact with detached objects.
+ */
+// TODO; IGNITE-23064 - make returned cursors restart-proof.
+class RestartProofIgniteSql implements IgniteSql, Wrapper {
+ private final IgniteAttachmentLock attachmentLock;
+
+ RestartProofIgniteSql(IgniteAttachmentLock attachmentLock) {
+ this.attachmentLock = attachmentLock;
+ }
+
+ @Override
+ public Statement createStatement(String query) {
+ return attachmentLock.attached(ignite ->
ignite.sql().createStatement(query));
+ }
+
+ @Override
+ public StatementBuilder statementBuilder() {
+ return attachmentLock.attached(ignite ->
ignite.sql().statementBuilder());
+ }
+
+ @Override
+ public ResultSet<SqlRow> execute(@Nullable Transaction transaction, String
query, @Nullable Object... arguments) {
+ return attachmentLock.attached(ignite ->
ignite.sql().execute(transaction, query, arguments));
+ }
+
+ @Override
+ public ResultSet<SqlRow> execute(@Nullable Transaction transaction,
Statement statement, @Nullable Object... arguments) {
+ return attachmentLock.attached(ignite ->
ignite.sql().execute(transaction, statement, arguments));
+ }
+
+ @Override
+ public <T> ResultSet<T> execute(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ String query,
+ @Nullable Object... arguments
+ ) {
+ return attachmentLock.attached(ignite ->
ignite.sql().execute(transaction, mapper, query, arguments));
+ }
+
+ @Override
+ public <T> ResultSet<T> execute(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
+ return attachmentLock.attached(ignite ->
ignite.sql().execute(transaction, mapper, statement, arguments));
+ }
+
+ @Override
+ public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+ @Nullable Transaction transaction,
+ String query,
+ @Nullable Object... arguments
+ ) {
+ return attachmentLock.attachedAsync(ignite ->
ignite.sql().executeAsync(transaction, query, arguments));
+ }
+
+ @Override
+ public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+ @Nullable Transaction transaction,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
+ return attachmentLock.attachedAsync(ignite ->
ignite.sql().executeAsync(transaction, statement, arguments));
+ }
+
+ @Override
+ public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ String query,
+ @Nullable Object... arguments
+ ) {
+ return attachmentLock.attachedAsync(ignite ->
ignite.sql().executeAsync(transaction, mapper, query, arguments));
+ }
+
+ @Override
+ public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
+ return attachmentLock.attachedAsync(ignite ->
ignite.sql().executeAsync(transaction, mapper, statement, arguments));
+ }
+
+ @Override
+ public long[] executeBatch(@Nullable Transaction transaction, String
dmlQuery, BatchedArguments batch) {
+ return attachmentLock.attached(ignite ->
ignite.sql().executeBatch(transaction, dmlQuery, batch));
+ }
+
+ @Override
+ public long[] executeBatch(@Nullable Transaction transaction, Statement
dmlStatement, BatchedArguments batch) {
+ return attachmentLock.attached(ignite ->
ignite.sql().executeBatch(transaction, dmlStatement, batch));
+ }
+
+ @Override
+ public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction
transaction, String query, BatchedArguments batch) {
+ return attachmentLock.attachedAsync(ignite ->
ignite.sql().executeBatchAsync(transaction, query, batch));
+ }
+
+ @Override
+ public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction
transaction, Statement statement, BatchedArguments batch) {
+ return attachmentLock.attachedAsync(ignite ->
ignite.sql().executeBatchAsync(transaction, statement, batch));
+ }
+
+ @Override
+ public void executeScript(String query, @Nullable Object... arguments) {
+ attachmentLock.consumeAttached(ignite ->
ignite.sql().executeScript(query, arguments));
+ }
+
+ @Override
+ public CompletableFuture<Void> executeScriptAsync(String query, @Nullable
Object... arguments) {
+ return attachmentLock.attachedAsync(ignite ->
ignite.sql().executeScriptAsync(query, arguments));
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> classToUnwrap) {
+ return attachmentLock.attached(ignite -> Wrappers.unwrap(ignite.sql(),
classToUnwrap));
+ }
+}