This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 57fd6e1873 IGNITE-23013 Support IgniteSql transparency with respect to 
node restart (#4279)
57fd6e1873 is described below

commit 57fd6e18737e1c2f7df2e830724cd6c28eed334e
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Aug 26 10:29:54 2024 +0400

    IGNITE-23013 Support IgniteSql transparency with respect to node restart 
(#4279)
---
 .../internal/app/ApiReferencesTestUtils.java       |   3 +
 .../ignite/internal/app/AsyncApiOperation.java     |  14 +-
 .../app/ItShutDownServerApiReferencesTest.java     |   9 +-
 .../org/apache/ignite/internal/app/References.java |  12 ++
 .../ignite/internal/app/SyncApiOperation.java      |  28 +++-
 .../internal/restart/RestartProofIgnite.java       |   9 +-
 .../internal/restart/RestartProofIgniteSql.java    | 162 +++++++++++++++++++++
 7 files changed, 226 insertions(+), 11 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ApiReferencesTestUtils.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ApiReferencesTestUtils.java
index 16e7ce8c97..b3b9479c96 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ApiReferencesTestUtils.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ApiReferencesTestUtils.java
@@ -27,4 +27,7 @@ class ApiReferencesTestUtils {
     static final Tuple KEY_TUPLE = Tuple.create().set("id", 1);
     static final Tuple VALUE_TUPLE = Tuple.create().set("val", "one");
     static final Tuple FULL_TUPLE = Tuple.create().set("id", 1).set("val", 
"one");
+
+    static final String SELECT_IDS_QUERY = "SELECT id FROM " + TEST_TABLE_NAME;
+    static final String UPDATE_QUERY = "UPDATE " + TEST_TABLE_NAME + " SET val 
= val WHERE id = ?";
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/AsyncApiOperation.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/AsyncApiOperation.java
index 694dd388a4..f94df5e7ce 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/AsyncApiOperation.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/AsyncApiOperation.java
@@ -19,7 +19,9 @@ package org.apache.ignite.internal.app;
 
 import static org.apache.ignite.internal.app.ApiReferencesTestUtils.FULL_TUPLE;
 import static org.apache.ignite.internal.app.ApiReferencesTestUtils.KEY_TUPLE;
+import static 
org.apache.ignite.internal.app.ApiReferencesTestUtils.SELECT_IDS_QUERY;
 import static 
org.apache.ignite.internal.app.ApiReferencesTestUtils.TEST_TABLE_NAME;
+import static 
org.apache.ignite.internal.app.ApiReferencesTestUtils.UPDATE_QUERY;
 import static 
org.apache.ignite.internal.app.ApiReferencesTestUtils.VALUE_TUPLE;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
@@ -30,6 +32,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import org.apache.ignite.internal.streamer.SimplePublisher;
 import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
 
@@ -116,7 +119,16 @@ enum AsyncApiOperation {
     TRANSACTIONS_BEGIN(refs -> refs.transactions.beginAsync()),
     TRANSACTIONS_BEGIN_WITH_OPTS(refs -> refs.transactions.beginAsync(null)),
     TRANSACTIONS_RUN_IN_TRANSACTION(refs -> 
refs.transactions.runInTransactionAsync(tx -> nullCompletedFuture())),
-    TRANSACTIONS_RUN_IN_TRANSACTION_WITH_OPTS(refs -> 
refs.transactions.runInTransactionAsync(tx -> nullCompletedFuture(), null));
+    TRANSACTIONS_RUN_IN_TRANSACTION_WITH_OPTS(refs -> 
refs.transactions.runInTransactionAsync(tx -> nullCompletedFuture(), null)),
+
+    SQL_EXECUTE(refs -> refs.sql.executeAsync(null, SELECT_IDS_QUERY)),
+    SQL_EXECUTE_STATEMENT(refs -> refs.sql.executeAsync(null, 
refs.selectIdsStatement)),
+    // TODO: IGNITE-18695 - uncomment the following 2 lines.
+    // SQL_EXECUTE_WITH_MAPPER(refs -> refs.sql.executeAsync(null, 
Mapper.of(Integer.class), SELECT_IDS_QUERY)),
+    // SQL_EXECUTE_STATEMENT_WITH_MAPPER(refs -> refs.sql.executeAsync(null, 
Mapper.of(Integer.class), refs.selectIdsStatement)),
+    SQL_EXECUTE_BATCH(refs -> refs.sql.executeBatchAsync(null, UPDATE_QUERY, 
BatchedArguments.of(999))),
+    SQL_EXECUTE_BATCH_STATEMENT(refs -> refs.sql.executeBatchAsync(null, 
refs.updateStatement, BatchedArguments.of(999))),
+    SQL_EXECUTE_SCRIPT(refs -> refs.sql.executeScriptAsync(SELECT_IDS_QUERY));
 
     private final Function<References, CompletableFuture<?>> action;
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ItShutDownServerApiReferencesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ItShutDownServerApiReferencesTest.java
index 15d51fae61..3355a0b567 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ItShutDownServerApiReferencesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/ItShutDownServerApiReferencesTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
@@ -57,8 +58,12 @@ class ItShutDownServerApiReferencesTest extends 
ClusterPerClassIntegrationTest {
     @ParameterizedTest
     @EnumSource(SyncApiOperation.class)
     void syncOperationsThrowAfterShutdown(SyncApiOperation operation) {
-        IgniteException ex = assertThrows(IgniteException.class, () -> 
operation.execute(beforeShutdown));
-        assertThat(ex.getMessage(), is("The node is already shut down."));
+        if (operation.worksAfterShutdown()) {
+            assertDoesNotThrow(() -> operation.execute(beforeShutdown));
+        } else {
+            IgniteException ex = assertThrows(IgniteException.class, () -> 
operation.execute(beforeShutdown));
+            assertThat(ex.getMessage(), is("The node is already shut down."));
+        }
     }
 
     @ParameterizedTest
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/References.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/References.java
index c91dae875f..16efeddb7d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/References.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/References.java
@@ -18,10 +18,14 @@
 package org.apache.ignite.internal.app;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.app.ApiReferencesTestUtils.SELECT_IDS_QUERY;
 import static 
org.apache.ignite.internal.app.ApiReferencesTestUtils.TEST_TABLE_NAME;
+import static 
org.apache.ignite.internal.app.ApiReferencesTestUtils.UPDATE_QUERY;
 
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteServer;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.Statement;
 import org.apache.ignite.table.IgniteTables;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.RecordView;
@@ -39,6 +43,7 @@ class References {
 
     final IgniteTables tables;
     final IgniteTransactions transactions;
+    final IgniteSql sql;
 
     final Table table; // From table().
     final Table tableFromTableAsync;
@@ -55,11 +60,15 @@ class References {
 
     final PartitionManager partitionManager;
 
+    final Statement selectIdsStatement;
+    final Statement updateStatement;
+
     References(IgniteServer server) throws Exception {
         ignite = server.api();
 
         tables = ignite.tables();
         transactions = ignite.transactions();
+        sql = ignite.sql();
 
         table = tables.table(TEST_TABLE_NAME);
         tableFromTableAsync = tables.tableAsync(TEST_TABLE_NAME).get(10, 
SECONDS);
@@ -75,5 +84,8 @@ class References {
         mappedRecordView = table.recordView(Mapper.of(Record.class));
 
         partitionManager = table.partitionManager();
+
+        selectIdsStatement = sql.createStatement(SELECT_IDS_QUERY);
+        updateStatement = sql.createStatement(UPDATE_QUERY);
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/SyncApiOperation.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/SyncApiOperation.java
index 0f2e2c9988..8f6ff55d20 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/SyncApiOperation.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/app/SyncApiOperation.java
@@ -19,12 +19,15 @@ package org.apache.ignite.internal.app;
 
 import static org.apache.ignite.internal.app.ApiReferencesTestUtils.FULL_TUPLE;
 import static org.apache.ignite.internal.app.ApiReferencesTestUtils.KEY_TUPLE;
+import static 
org.apache.ignite.internal.app.ApiReferencesTestUtils.SELECT_IDS_QUERY;
 import static 
org.apache.ignite.internal.app.ApiReferencesTestUtils.TEST_TABLE_NAME;
+import static 
org.apache.ignite.internal.app.ApiReferencesTestUtils.UPDATE_QUERY;
 import static 
org.apache.ignite.internal.app.ApiReferencesTestUtils.VALUE_TUPLE;
 
 import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
+import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.table.mapper.Mapper;
 
 /**
@@ -47,9 +50,9 @@ enum SyncApiOperation {
     TABLE_KV_VIEW(refs -> refs.table.keyValueView()),
     TABLE_TYPED_KV_VIEW(refs -> refs.table.keyValueView(Integer.class, 
String.class)),
     TABLE_MAPPED_KV_VIEW(refs -> 
refs.table.keyValueView(Mapper.of(Integer.class), Mapper.of(String.class))),
-    TABLE_RECORDVIEW(refs -> refs.table.recordView()),
-    TABLE_TYPED_RECORDVIEW(refs -> refs.table.recordView(Record.class)),
-    TABLE_MAPPED_RECORDVIEW(refs -> 
refs.table.recordView(Mapper.of(Record.class))),
+    TABLE_RECORD_VIEW(refs -> refs.table.recordView()),
+    TABLE_TYPED_RECORD_VIEW(refs -> refs.table.recordView(Record.class)),
+    TABLE_MAPPED_RECORD_VIEW(refs -> 
refs.table.recordView(Mapper.of(Record.class))),
     TABLE_PARTITION_MANAGER(refs -> refs.table.partitionManager()),
 
     TABLE_FROM_TABLE_ASYNC_PUT(refs -> 
refs.tableFromTableAsync.keyValueView().put(null, KEY_TUPLE, VALUE_TUPLE)),
@@ -110,7 +113,18 @@ enum SyncApiOperation {
     TRANSACTIONS_RUN_CONSUMER_IN_TRANSACTION(refs -> 
refs.transactions.runInTransaction(tx -> {})),
     TRANSACTIONS_RUN_CONSUMER_IN_TRANSACTION_WITH_OPTS(refs -> 
refs.transactions.runInTransaction(tx -> {}, null)),
     TRANSACTIONS_RUN_FUNCTION_IN_TRANSACTION(refs -> 
refs.transactions.runInTransaction(tx -> null)),
-    TRANSACTIONS_RUN_FUNCTION_IN_TRANSACTION_WITH_OPTS(refs -> 
refs.transactions.runInTransaction(tx -> null, null));
+    TRANSACTIONS_RUN_FUNCTION_IN_TRANSACTION_WITH_OPTS(refs -> 
refs.transactions.runInTransaction(tx -> null, null)),
+
+    SQL_CREATE_STATEMENT(refs -> refs.sql.createStatement(SELECT_IDS_QUERY)),
+    SQL_STATEMENT_BUILDER(refs -> refs.sql.statementBuilder()),
+    SQL_EXECUTE(refs -> refs.sql.execute(null, SELECT_IDS_QUERY)),
+    SQL_EXECUTE_STATEMENT(refs -> refs.sql.execute(null, 
refs.selectIdsStatement)),
+    // TODO: IGNITE-18695 - uncomment the following 2 lines.
+    // SQL_EXECUTE_WITH_MAPPER(refs -> refs.sql.execute(null, 
Mapper.of(Integer.class), SELECT_IDS_QUERY)),
+    // SQL_EXECUTE_STATEMENT_WITH_MAPPER(refs -> refs.sql.execute(null, 
Mapper.of(Integer.class), refs.selectIdsStatement)),
+    SQL_EXECUTE_BATCH(refs -> refs.sql.executeBatch(null, UPDATE_QUERY, 
BatchedArguments.of(999))),
+    SQL_EXECUTE_BATCH_STATEMENT(refs -> refs.sql.executeBatch(null, 
refs.updateStatement, BatchedArguments.of(999))),
+    SQL_EXECUTE_SCRIPT(refs -> refs.sql.executeScript(SELECT_IDS_QUERY));
 
     private final Consumer<References> action;
 
@@ -121,4 +135,10 @@ enum SyncApiOperation {
     void execute(References references) {
         action.accept(references);
     }
+
+    boolean worksAfterShutdown() {
+        return this == IGNITE_TABLES
+                || this == IGNITE_TRANSACTIONS
+                || this == IGNITE_SQL;
+    }
 }
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgnite.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgnite.java
index cee6c829cf..d86617527e 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgnite.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgnite.java
@@ -39,6 +39,7 @@ public class RestartProofIgnite implements Ignite, Wrapper {
 
     private final IgniteTables tables;
     private final IgniteTransactions transactions;
+    private final IgniteSql sql;
 
     /**
      * Constructor.
@@ -48,6 +49,7 @@ public class RestartProofIgnite implements Ignite, Wrapper {
 
         tables = new RestartProofIgniteTables(attachmentLock);
         transactions = new RestartProofIgniteTransactions(attachmentLock);
+        sql = new RestartProofIgniteSql(attachmentLock);
     }
 
     @Override
@@ -57,18 +59,17 @@ public class RestartProofIgnite implements Ignite, Wrapper {
 
     @Override
     public IgniteTables tables() {
-        return attachmentLock.attached(ignite -> tables);
+        return tables;
     }
 
     @Override
     public IgniteTransactions transactions() {
-        return attachmentLock.attached(ignite -> transactions);
+        return transactions;
     }
 
     @Override
     public IgniteSql sql() {
-        // TODO: IGNITE-23013 - add a wrapper.
-        return attachmentLock.attached(Ignite::sql);
+        return sql;
     }
 
     @Override
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java
new file mode 100644
index 0000000000..e120ebfa19
--- /dev/null
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.restart;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.internal.wrapper.Wrappers;
+import org.apache.ignite.sql.BatchedArguments;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reference to {@link IgniteSql} under a swappable {@link Ignite} instance. 
When a restart happens, this switches to
+ * the new Ignite instance.
+ *
+ * <p>API operations on this are linearized with respect to node restarts. 
Normally (except for situations when timeouts trigger), user
+ * operations will not interact with detached objects.
+ */
+// TODO; IGNITE-23064 - make returned cursors restart-proof.
+class RestartProofIgniteSql implements IgniteSql, Wrapper {
+    private final IgniteAttachmentLock attachmentLock;
+
+    RestartProofIgniteSql(IgniteAttachmentLock attachmentLock) {
+        this.attachmentLock = attachmentLock;
+    }
+
+    @Override
+    public Statement createStatement(String query) {
+        return attachmentLock.attached(ignite -> 
ignite.sql().createStatement(query));
+    }
+
+    @Override
+    public StatementBuilder statementBuilder() {
+        return attachmentLock.attached(ignite -> 
ignite.sql().statementBuilder());
+    }
+
+    @Override
+    public ResultSet<SqlRow> execute(@Nullable Transaction transaction, String 
query, @Nullable Object... arguments) {
+        return attachmentLock.attached(ignite -> 
ignite.sql().execute(transaction, query, arguments));
+    }
+
+    @Override
+    public ResultSet<SqlRow> execute(@Nullable Transaction transaction, 
Statement statement, @Nullable Object... arguments) {
+        return attachmentLock.attached(ignite -> 
ignite.sql().execute(transaction, statement, arguments));
+    }
+
+    @Override
+    public <T> ResultSet<T> execute(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            String query,
+            @Nullable Object... arguments
+    ) {
+        return attachmentLock.attached(ignite -> 
ignite.sql().execute(transaction, mapper, query, arguments));
+    }
+
+    @Override
+    public <T> ResultSet<T> execute(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
+        return attachmentLock.attached(ignite -> 
ignite.sql().execute(transaction, mapper, statement, arguments));
+    }
+
+    @Override
+    public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+            @Nullable Transaction transaction,
+            String query,
+            @Nullable Object... arguments
+    ) {
+        return attachmentLock.attachedAsync(ignite -> 
ignite.sql().executeAsync(transaction, query, arguments));
+    }
+
+    @Override
+    public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+            @Nullable Transaction transaction,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
+        return attachmentLock.attachedAsync(ignite -> 
ignite.sql().executeAsync(transaction, statement, arguments));
+    }
+
+    @Override
+    public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            String query,
+            @Nullable Object... arguments
+    ) {
+        return attachmentLock.attachedAsync(ignite -> 
ignite.sql().executeAsync(transaction, mapper, query, arguments));
+    }
+
+    @Override
+    public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
+        return attachmentLock.attachedAsync(ignite -> 
ignite.sql().executeAsync(transaction, mapper, statement, arguments));
+    }
+
+    @Override
+    public long[] executeBatch(@Nullable Transaction transaction, String 
dmlQuery, BatchedArguments batch) {
+        return attachmentLock.attached(ignite -> 
ignite.sql().executeBatch(transaction, dmlQuery, batch));
+    }
+
+    @Override
+    public long[] executeBatch(@Nullable Transaction transaction, Statement 
dmlStatement, BatchedArguments batch) {
+        return attachmentLock.attached(ignite -> 
ignite.sql().executeBatch(transaction, dmlStatement, batch));
+    }
+
+    @Override
+    public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction 
transaction, String query, BatchedArguments batch) {
+        return attachmentLock.attachedAsync(ignite -> 
ignite.sql().executeBatchAsync(transaction, query, batch));
+    }
+
+    @Override
+    public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction 
transaction, Statement statement, BatchedArguments batch) {
+        return attachmentLock.attachedAsync(ignite -> 
ignite.sql().executeBatchAsync(transaction, statement, batch));
+    }
+
+    @Override
+    public void executeScript(String query, @Nullable Object... arguments) {
+        attachmentLock.consumeAttached(ignite -> 
ignite.sql().executeScript(query, arguments));
+    }
+
+    @Override
+    public CompletableFuture<Void> executeScriptAsync(String query, @Nullable 
Object... arguments) {
+        return attachmentLock.attachedAsync(ignite -> 
ignite.sql().executeScriptAsync(query, arguments));
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> classToUnwrap) {
+        return attachmentLock.attached(ignite -> Wrappers.unwrap(ignite.sql(), 
classToUnwrap));
+    }
+}

Reply via email to