This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 43859a1f7e IGNITE-17998 Sql. Close the cursor synchronously when the
session is closed (#1330)
43859a1f7e is described below
commit 43859a1f7ef8ef761ca3da8587a3d5c7b073b841
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Wed Nov 16 13:11:31 2022 +0300
IGNITE-17998 Sql. Close the cursor synchronously when the session is closed
(#1330)
---
.../ignite/internal/sql/api/ItCommonApiTest.java | 9 +++--
.../internal/sql/api/ItSqlAsynchronousApiTest.java | 5 ++-
.../sql/engine/exec/ExecutionServiceImpl.java | 39 ++++++++++++++--------
3 files changed, 32 insertions(+), 21 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
index 4ede4c48c4..6b46dc5b76 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.sql.api;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.lang.ErrorGroups.Sql;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -26,7 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
-import org.apache.ignite.sql.CursorClosedException;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.Session;
@@ -78,16 +79,14 @@ public class ItCommonApiTest extends
AbstractBasicIntegrationTest {
assertEquals(Sql.SESSION_NOT_FOUND_ERR, ex.code());
// already started query should fail due to session has been expired
- ex = assertThrows(CursorClosedException.class, () -> {
+ assertThrowsWithCause(() -> {
while (rs1.hasNext()) {
rs1.next();
}
- });
+ }, ExecutionCancelledException.class);
rs1.close();
- assertEquals(Sql.CURSOR_CLOSED_ERR, ex.code());
-
// second session could proceed with execution
while (rs2.hasNext()) {
rs2.next();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 6b651caef0..418d8941a9 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.client.sql.ClientSql;
import org.apache.ignite.internal.sql.api.ColumnMetadataImpl.ColumnOriginImpl;
import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
@@ -76,7 +77,6 @@ import org.apache.ignite.table.Table;
import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.tx.Transaction;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -577,7 +577,6 @@ public class ItSqlAsynchronousApiTest extends
AbstractBasicIntegrationTest {
}
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17998")
@Test
public void closeSession() throws ExecutionException, InterruptedException
{
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
@@ -597,7 +596,7 @@ public class ItSqlAsynchronousApiTest extends
AbstractBasicIntegrationTest {
assertThrowsWithCause(
() -> ars0.fetchNextPage().toCompletableFuture().get(),
- SqlException.class
+ ExecutionCancelledException.class
);
assertThrowsWithCause(
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index e1cd92462c..81c70e51f4 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -197,7 +197,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
assert old == null;
- ctx.cancel().add(() -> queryManager.close(false));
+ ctx.cancel().add(() -> queryManager.close(true));
return queryManager.execute(plan);
}
@@ -622,20 +622,9 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
return cancelFut.thenApply(Function.identity());
}
- CompletableFuture<Void> start = new CompletableFuture<>();
+ CompletableFuture<Void> start = closeExecNode(cancel);
start
- .thenCompose(none -> {
- if (!root.completeExceptionally(new
ExecutionCancelledException()) && !root.isCompletedExceptionally()) {
- if (cancel) {
- return root.thenAccept(root ->
root.onError(new ExecutionCancelledException()));
- }
-
- return root.thenCompose(AsyncRootNode::closeAsync);
- }
-
- return CompletableFuture.completedFuture(null);
- })
.thenCompose(tmp -> {
Map<String, List<CompletableFuture<?>>>
requestsPerNode = new HashMap<>();
for (Map.Entry<RemoteFragmentKey,
CompletableFuture<Void>> entry : remoteFragmentInitCompletion.entrySet()) {
@@ -693,6 +682,30 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
return cancelFut.thenApply(Function.identity());
}
+
+ /**
+ * Synchronously closes the tree's execution iterator.
+ *
+ * @param cancel Forces execution to terminate with {@link
ExecutionCancelledException}.
+ * @return Completable future that should run asynchronously.
+ */
+ private CompletableFuture<Void> closeExecNode(boolean cancel) {
+ CompletableFuture<Void> fut = new CompletableFuture<>();
+
+ if (!root.completeExceptionally(new ExecutionCancelledException())
&& !root.isCompletedExceptionally()) {
+ AsyncRootNode<RowT, List<Object>> node = root.getNow(null);
+
+ if (!cancel) {
+ CompletableFuture<Void> closeFut = node.closeAsync();
+
+ return fut.thenCompose(v -> closeFut);
+ }
+
+ node.onError(new ExecutionCancelledException());
+ }
+
+ return fut;
+ }
}
@FunctionalInterface