This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3efece8245 IGNITE-19919 Sql. ResultSet#close should close implicit
transaction if any (#2423)
3efece8245 is described below
commit 3efece824520e4345cdd89014ebe6b867c18442d
Author: Max Zhuravkov <[email protected]>
AuthorDate: Fri Aug 11 16:43:27 2023 +0300
IGNITE-19919 Sql. ResultSet#close should close implicit transaction if any
(#2423)
---
.../internal/sql/api/ItSqlAsynchronousApiTest.java | 41 ++++++++
.../internal/sql/api/ItSqlSynchronousApiTest.java | 53 ++++++++--
.../internal/sql/engine/AsyncSqlCursorImpl.java | 4 +
.../sql/engine/AsyncSqlCursorImplTest.java | 117 +++++++++++++++++++++
.../sql/engine/framework/NoOpTransaction.java | 24 ++++-
5 files changed, 227 insertions(+), 12 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 694a28e8af..24b5b297db 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -21,6 +21,7 @@ import static
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIn
import static
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsTableScan;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.lang.ErrorGroups.Sql.CONSTRAINT_VIOLATION_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.CURSOR_CLOSED_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
@@ -766,6 +767,46 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
IntStream.range(0, ex.updateCounters().length).forEach(i ->
assertEquals(1, ex.updateCounters()[i]));
}
+ @Test
+ public void resultSetCloseShouldFinishImplicitTransaction() throws
InterruptedException {
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
+ IgniteSql sql = igniteSql();
+
+ Session ses = sql.sessionBuilder().defaultPageSize(2).build();
+ CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null,
"SELECT * FROM TEST");
+
+ AsyncResultSet<SqlRow> ars = f.join();
+ // There should be a pending transaction since not all data was read.
+ boolean txStarted = waitForCondition(() -> txManager().pending() == 1,
5000);
+ assertTrue(txStarted, "No pending transactions");
+
+ ars.closeAsync().join();
+ assertEquals(0, txManager().pending(), "Expected no pending
transactions");
+ }
+
+ @Test
+ public void resultSetFullReadShouldFinishImplicitTransaction() {
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
+ IgniteSql sql = igniteSql();
+
+ // Fetch all data in one read.
+ Session ses = sql.sessionBuilder().defaultPageSize(100).build();
+ CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null,
"SELECT * FROM TEST");
+
+ AsyncResultSet<SqlRow> ars = f.join();
+ assertFalse(ars.hasMorePages());
+
+ assertEquals(0, txManager().pending(), "Expected no pending
transactions");
+ }
+
private static void checkDdl(boolean expectedApplied, Session ses, String
sql, Transaction tx) {
CompletableFuture<AsyncResultSet<SqlRow>> fut = ses.executeAsync(
tx,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index 0a8f3f4d81..dc1bf88eb7 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.api;
import static
org.apache.ignite.internal.sql.api.ItSqlAsynchronousApiTest.assertThrowsPublicException;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.lang.ErrorGroups.Sql.CURSOR_CLOSED_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_NO_RESULT_SET_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.RUNTIME_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_PARSE_ERR;
@@ -49,6 +50,7 @@ import org.apache.ignite.lang.IndexNotFoundException;
import org.apache.ignite.lang.TableAlreadyExistsException;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.sql.BatchedArguments;
+import org.apache.ignite.sql.CursorClosedException;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.NoRowSetExpectedException;
import org.apache.ignite.sql.ResultSet;
@@ -305,15 +307,14 @@ public class ItSqlSynchronousApiTest extends
ClusterPerClassIntegrationTest {
assertThrowsPublicException(rs::next,
NoRowSetExpectedException.class, QUERY_NO_RESULT_SET_ERR, "Query has no result
set");
}
- // TODO unmute after https://issues.apache.org/jira/browse/IGNITE-19919
// Cursor closed error.
- // {
- // ResultSet rs = ses.execute(null, "SELECT * FROM TEST");
- // Thread.sleep(300); // ResultSetImpl fetches next page in
background, wait to it to complete to avoid flakiness.
- // rs.close();
- // assertThrowsPublicException(() ->
rs.forEachRemaining(Object::hashCode),
- // CursorClosedException.class, CURSOR_CLOSED_ERR, null);
- // }
+ {
+ ResultSet rs = ses.execute(null, "SELECT * FROM TEST");
+ Thread.sleep(300); // ResultSetImpl fetches next page in
background, wait to it to complete to avoid flakiness.
+ rs.close();
+ assertThrowsPublicException(() ->
rs.forEachRemaining(Object::hashCode),
+ CursorClosedException.class, CURSOR_CLOSED_ERR, null);
+ }
}
/**
@@ -416,6 +417,42 @@ public class ItSqlSynchronousApiTest extends
ClusterPerClassIntegrationTest {
IntStream.range(0, batchEx.updateCounters().length).forEach(i ->
assertEquals(1, batchEx.updateCounters()[i]));
}
+ @Test
+ public void resultSetCloseShouldFinishImplicitTransaction() {
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
+ IgniteSql sql = igniteSql();
+ Session ses = sql.sessionBuilder().defaultPageSize(2).build();
+
+ ResultSet<?> rs = ses.execute(null, "SELECT * FROM TEST");
+ assertEquals(1, txManager().pending());
+ rs.close();
+ assertEquals(0, txManager().pending(), "Expected no pending
transactions");
+ }
+
+ @Test
+ public void resultSetFullReadShouldFinishImplicitTransaction() {
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ sql("INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
+ IgniteSql sql = igniteSql();
+
+ // Fetch all data in one read.
+ Session ses = sql.sessionBuilder().defaultPageSize(100).build();
+ ResultSet<SqlRow> rs = ses.execute(null, "SELECT * FROM TEST");
+
+ while (rs.hasNext()) {
+ rs.next();
+ }
+
+ assertEquals(0, txManager().pending(), "Expected no pending
transactions");
+ }
+
private static void checkDdl(boolean expectedApplied, Session ses, String
sql) {
ResultSet res = ses.execute(
null,
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
index 2be580ace8..99251fa8cd 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
@@ -91,6 +91,10 @@ public class AsyncSqlCursorImpl<T> implements
AsyncSqlCursor<T> {
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> closeAsync() {
+ // Commit implicit transaction, if any.
+ if (implicitTx != null) {
+ implicitTx.commit();
+ }
return dataCursor.closeAsync();
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java
new file mode 100644
index 0000000000..dec2b80a98
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImplTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.sql.api.ResultSetMetadataImpl;
+import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
+import org.apache.ignite.internal.sql.engine.exec.AsyncWrapper;
+import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
+import org.apache.ignite.lang.ErrorGroups.Common;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.junit.jupiter.api.Named;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for {@link AsyncSqlCursorImpl}.
+ */
+public class AsyncSqlCursorImplTest {
+
+ private static final ResultSetMetadata RESULT_SET_METADATA = new
ResultSetMetadataImpl(Collections.emptyList());
+
+ /** Cursor should trigger commit of implicit transaction (if any) only if
data is fully read. */
+ @ParameterizedTest
+ @MethodSource("transactions")
+ public void testTriggerCommitAfterDataIsFullyRead(NoOpTransaction
implicitTx) {
+ List<Integer> list = List.of(1, 2, 3);
+
+ AsyncSqlCursorImpl<Integer> cursor = new
AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx,
+ new
AsyncWrapper<>(CompletableFuture.completedFuture(list.iterator()),
Runnable::run));
+
+ int requestRows = 2;
+ BatchedResult<Integer> in1 =
cursor.requestNextAsync(requestRows).join();
+ assertEquals(in1.items(), list.subList(0, requestRows));
+
+ if (implicitTx != null) {
+ CompletableFuture<Void> f = implicitTx.commitFuture();
+ assertFalse(f.isDone(), "Implicit transaction should have not been
committed because there is more data.");
+ }
+
+ BatchedResult<Integer> in2 =
cursor.requestNextAsync(requestRows).join();
+ assertEquals(in2.items(), list.subList(requestRows, list.size()));
+
+ if (implicitTx != null) {
+ CompletableFuture<Void> f = implicitTx.commitFuture();
+ assertTrue(f.isDone(), "Implicit transaction should been committed
because there is no more data");
+ }
+ }
+
+ /** Exception on read should trigger rollback of implicit transaction, if
any. */
+ @ParameterizedTest
+ @MethodSource("transactions")
+ public void testExceptionRollbacksImplicitTx(NoOpTransaction implicitTx) {
+ IgniteException err = new IgniteException(Common.INTERNAL_ERR);
+
+ AsyncSqlCursorImpl<Integer> cursor = new
AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx,
+ new AsyncWrapper<>(CompletableFuture.failedFuture(err),
Runnable::run));
+
+ CompletionException t = assertThrows(CompletionException.class, () ->
cursor.requestNextAsync(1).join());
+
+ if (implicitTx != null) {
+ CompletableFuture<Void> f = implicitTx.rollbackFuture();
+ assertTrue(f.isDone(), "Implicit transaction should have been
rolled back: " + f);
+ }
+
+ IgniteException igniteErr = assertInstanceOf(IgniteException.class,
t.getCause());
+ assertEquals(err.codeAsString(), igniteErr.codeAsString());
+ }
+
+ /** Cursor close should trigger commit of implicit transaction, if any. */
+ @ParameterizedTest
+ @MethodSource("transactions")
+ public void testCloseCommitsImplicitTx(NoOpTransaction implicitTx) {
+ AsyncCursor<Integer> data = new AsyncWrapper<>(List.of(1, 2, 3,
4).iterator());
+ AsyncSqlCursorImpl<Integer> cursor = new
AsyncSqlCursorImpl<>(SqlQueryType.QUERY, RESULT_SET_METADATA, implicitTx, data);
+ cursor.closeAsync().join();
+
+ if (implicitTx != null) {
+ CompletableFuture<Void> f = implicitTx.commitFuture();
+ assertTrue(f.isDone(), "Implicit transaction should have been
committed: " + f);
+ }
+ }
+
+ private static Stream<Arguments> transactions() {
+ return Stream.of(
+ Arguments.of(Named.named("implicit-tx",
NoOpTransaction.readOnly("TX"))),
+ Arguments.of(Named.named("no implicit-tx", null))
+ );
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
index 2ab456accf..e9e4d59eb3 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
@@ -46,6 +46,10 @@ public final class NoOpTransaction implements
InternalTransaction {
private final boolean readOnly;
+ private final CompletableFuture<Void> commitFut = new
CompletableFuture<>();
+
+ private final CompletableFuture<Void> rollbackFut = new
CompletableFuture<>();
+
/** Creates a read-write transaction. */
public static NoOpTransaction readWrite(String name) {
return new NoOpTransaction(name, false);
@@ -84,22 +88,24 @@ public final class NoOpTransaction implements
InternalTransaction {
@Override
public void commit() throws TransactionException {
-
+ commitAsync().join();
}
@Override
public CompletableFuture<Void> commitAsync() {
- return CompletableFuture.completedFuture(null);
+ commitFut.complete(null);
+ return commitFut;
}
@Override
public void rollback() throws TransactionException {
-
+ rollbackAsync().join();
}
@Override
public CompletableFuture<Void> rollbackAsync() {
- return CompletableFuture.completedFuture(null);
+ rollbackFut.complete(null);
+ return rollbackFut;
}
@Override
@@ -155,4 +161,14 @@ public final class NoOpTransaction implements
InternalTransaction {
public void enlistResultFuture(CompletableFuture<?> resultFuture) {
resultFuture.complete(null);
}
+
+ /** Returns a {@link CompletableFuture} that completes when this
transaction commits. */
+ public CompletableFuture<Void> commitFuture() {
+ return commitFut;
+ }
+
+ /** Returns a {@link CompletableFuture} that completes when this
transaction rollbacks. */
+ public CompletableFuture<Void> rollbackFuture() {
+ return rollbackFut;
+ }
}