This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c44bbbb229 IGNITE-20387: Remap most exceptions to SqlExceptions for
SQL API (#2613)
c44bbbb229 is described below
commit c44bbbb229d44737fdb99fa29c3a8b4d5e4cc248
Author: ygerzhedovich <[email protected]>
AuthorDate: Mon Oct 16 16:50:50 2023 +0300
IGNITE-20387: Remap most exceptions to SqlExceptions for SQL API (#2613)
---
.../internal/lang/IgniteExceptionMapperUtil.java | 33 +-
.../ignite/internal/sql/SyncResultSetAdapter.java | 4 +-
.../dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs | 14 +-
.../dotnet/Apache.Ignite/Internal/Sql/Sql.cs | 2 +-
.../internal/runner/app/ItTablesApiTest.java | 31 +-
...nchronousApiTest.java => ItSqlApiBaseTest.java} | 587 +++++++-------
.../internal/sql/api/ItSqlAsynchronousApiTest.java | 851 ++-------------------
.../sql/api/ItSqlClientAsynchronousApiTest.java | 6 +
.../sql/api/ItSqlClientSynchronousApiTest.java | 24 +-
.../internal/sql/api/ItSqlSynchronousApiTest.java | 574 ++------------
.../internal/sql/engine/ItCreateTableDdlTest.java | 6 +-
.../internal/lang/SqlExceptionMapperUtil.java | 64 ++
.../ignite/internal/sql/api/SessionImpl.java | 12 +-
.../internal/sql/engine/AsyncSqlCursorImpl.java | 4 +-
.../sql/engine/prepare/PrepareServiceImpl.java | 4 +-
.../internal/lang/SqlExceptionMapperUtilTest.java | 78 ++
.../internal/sql/engine/util/SqlTestUtils.java | 8 +
17 files changed, 623 insertions(+), 1679 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java
b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java
index e5b43059aa..fbae12e574 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java
@@ -52,8 +52,8 @@ public class IgniteExceptionMapperUtil {
*
* @param mapper Exception mapper from internal exception to a public one.
* @param registeredMappings Already registered mappings.
- * @throws IgniteException If a mapper for the given {@code clazz} already
registered,
- * or {@code clazz} represents Java standard exception like {@link
NullPointerException}, {@link IllegalArgumentException}.
+ * @throws IgniteException If a mapper for the given {@code clazz} already
registered, or {@code clazz} represents Java standard
+ * exception like {@link NullPointerException}, {@link
IllegalArgumentException}.
*/
static void registerMapping(
IgniteExceptionMapper<?, ?> mapper,
@@ -87,23 +87,30 @@ public class IgniteExceptionMapperUtil {
if (origin instanceof AssertionError) {
return new IgniteException(INTERNAL_ERR, origin);
}
-
return origin;
}
- IgniteExceptionMapper<? extends Exception, ? extends Exception> m =
EXCEPTION_CONVERTERS.get(origin.getClass());
+ Throwable res;
+
+ // Try to find appropriate mapper, moving from original class to
supper-classes step by step.
+ Class exceptionClass = origin.getClass();
+ IgniteExceptionMapper<? extends Exception, ? extends Exception> m;
+ while ((m = EXCEPTION_CONVERTERS.get(exceptionClass)) == null &&
exceptionClass != Throwable.class) {
+ exceptionClass = exceptionClass.getSuperclass();
+ }
+
if (m != null) {
- Exception mapped = map(m, origin);
+ res = map(m, origin);
- assert mapped instanceof IgniteException || mapped instanceof
IgniteCheckedException :
- "Unexpected mapping of internal exception to a public one
[origin=" + origin + ", mapped=" + mapped + ']';
+ assert res instanceof IgniteException || res instanceof
IgniteCheckedException :
+ "Unexpected mapping of internal exception to a public one
[origin=" + origin + ", mapped=" + res + ']';
- return mapped;
+ } else {
+ res = origin;
}
- if (origin instanceof IgniteException || origin instanceof
IgniteCheckedException) {
-
- return origin;
+ if (res instanceof IgniteException || res instanceof
IgniteCheckedException) {
+ return res;
}
// There are no exception mappings for the given exception. This case
should be considered as internal error.
@@ -111,8 +118,8 @@ public class IgniteExceptionMapperUtil {
}
/**
- * Returns a new CompletableFuture that, when the given {@code origin}
future completes exceptionally,
- * maps the origin's exception to a public Ignite exception if it is
needed.
+ * Returns a new CompletableFuture that, when the given {@code origin}
future completes exceptionally, maps the origin's exception to a
+ * public Ignite exception if it is needed.
*
* @param origin The future to use to create a new stage.
* @param <T> Type os result.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/sql/SyncResultSetAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/sql/SyncResultSetAdapter.java
index 0854513ad3..3acd814f7b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/sql/SyncResultSetAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/sql/SyncResultSetAdapter.java
@@ -32,7 +32,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Synchronous wrapper over {@link org.apache.ignite.sql.async.AsyncResultSet}.
*/
-class SyncResultSetAdapter<T> implements ResultSet<T> {
+public class SyncResultSetAdapter<T> implements ResultSet<T> {
/** Wrapped async result set. */
private final AsyncResultSet<T> ars;
@@ -44,7 +44,7 @@ class SyncResultSetAdapter<T> implements ResultSet<T> {
*
* @param ars Asynchronous result set.
*/
- SyncResultSetAdapter(AsyncResultSet<T> ars) {
+ public SyncResultSetAdapter(AsyncResultSet<T> ars) {
assert ars != null;
this.ars = ars;
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
index 8b5ff71468..7fa4842d28 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
@@ -334,18 +334,14 @@ namespace Apache.Ignite.Tests.Sql
public void TestInvalidSqlThrowsException()
{
var ex = Assert.ThrowsAsync<SqlException>(async () => await
Client.Sql.ExecuteAsync(null, "select x from bad"));
- StringAssert.Contains("Invalid query, check inner exceptions for
details: select x from bad", ex!.Message);
- var innerEx = ex.InnerException;
- Assert.IsInstanceOf<SqlException>(innerEx);
- StringAssert.Contains("From line 1, column 15 to line 1, column
17: Object 'BAD' not found", innerEx!.Message);
+ StringAssert.Contains("From line 1, column 15 to line 1, column
17: Object 'BAD' not found", ex!.Message);
}
[Test]
public void TestCreateTableExistsThrowsException()
{
- // TODO: IGNITE-20388 Fix it
- var ex = Assert.ThrowsAsync<IgniteException>(
+ var ex = Assert.ThrowsAsync<SqlException>(
async () => await Client.Sql.ExecuteAsync(null, "CREATE TABLE
TEST(ID INT PRIMARY KEY)"));
StringAssert.Contains("Table with name 'PUBLIC.TEST' already
exists", ex!.Message);
@@ -354,8 +350,7 @@ namespace Apache.Ignite.Tests.Sql
[Test]
public void TestAlterTableNotFoundThrowsException()
{
- // TODO: IGNITE-20388 Fix it
- var ex = Assert.ThrowsAsync<IgniteException>(
+ var ex = Assert.ThrowsAsync<SqlException>(
async () => await Client.Sql.ExecuteAsync(null, "ALTER TABLE
NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR"));
StringAssert.Contains("Table with name 'PUBLIC.NOT_EXISTS_TABLE'
not found", ex!.Message);
@@ -364,11 +359,10 @@ namespace Apache.Ignite.Tests.Sql
[Test]
public void TestAlterTableColumnExistsThrowsException()
{
- // TODO: IGNITE-20388 Fix it
var ex = Assert.ThrowsAsync<SqlException>(
async () => await Client.Sql.ExecuteAsync(null, "ALTER TABLE
TEST ADD COLUMN ID INT"));
- StringAssert.Contains("Invalid query, check inner exceptions for
details: ALTER TABLE TEST ADD COLUMN ID INT", ex!.Message);
+ StringAssert.Contains("Failed to validate query. Column with name
'ID' already exists", ex!.Message);
}
[Test]
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
index ddc3735734..cac8f1529b 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
@@ -144,7 +144,7 @@ namespace Apache.Ignite.Internal.Sql
// ResultSet will dispose the pooled buffer.
return new ResultSet<T>(socket, buf, rowReaderFactory);
}
- catch (SqlException e) when (e.Code ==
ErrorGroups.Sql.StmtValidation || e.Code == ErrorGroups.Sql.StmtParse)
+ catch (SqlException e) when (e.Code == ErrorGroups.Sql.StmtParse)
{
throw new SqlException(
e.TraceId,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index 7a4c8ff7c3..8b72c52abf 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.runner.app;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static
org.apache.ignite.internal.test.WatchListenerInhibitor.metastorageEventsInhibitor;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
@@ -42,7 +43,6 @@ import org.apache.ignite.InitParameters;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.IndexExistsValidationException;
import org.apache.ignite.internal.catalog.TableExistsValidationException;
-import org.apache.ignite.internal.catalog.TableNotFoundValidationException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TableImpl;
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.test.WatchListenerInhibitor;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.sql.Session;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
@@ -140,8 +141,10 @@ public class ItTablesApiTest extends IgniteAbstractTest {
Table tbl = createTable(ignite0, TABLE_NAME);
- // TODO: IGNITE-20388 Fix it
- assertThrowsWithCause(() -> createTable(ignite0, TABLE_NAME),
TableExistsValidationException.class);
+ assertThrowsSqlException(
+ Sql.STMT_VALIDATION_ERR,
+ "Table with name 'PUBLIC.TBL1' already exists",
+ () -> createTable(ignite0, TABLE_NAME));
assertEquals(tbl, createTableIfNotExists(ignite0, TABLE_NAME));
}
@@ -152,7 +155,7 @@ public class ItTablesApiTest extends IgniteAbstractTest {
* @throws Exception If failed.
*/
@Test
- public void testTableAlreadyCreatedFromLaggedNode() throws Exception {
+ public void testTableAlreadyCreatedFromLaggedNode() {
clusterNodes.forEach(ign ->
assertNull(ign.tables().table(TABLE_NAME)));
Ignite ignite0 = clusterNodes.get(0);
@@ -170,8 +173,10 @@ public class ItTablesApiTest extends IgniteAbstractTest {
for (Ignite ignite : clusterNodes) {
if (ignite != ignite1) {
- // TODO: IGNITE-20388 Fix it
- assertThrowsWithCause(() -> createTable(ignite, TABLE_NAME),
TableExistsValidationException.class);
+ assertThrowsSqlException(
+ Sql.STMT_VALIDATION_ERR,
+ "Table with name 'PUBLIC.TBL1' already exists",
+ () -> createTable(ignite, TABLE_NAME));
assertNotNull(createTableIfNotExists(ignite, TABLE_NAME));
}
@@ -182,7 +187,6 @@ public class ItTablesApiTest extends IgniteAbstractTest {
ignite1Inhibitor.stopInhibit();
- // TODO: IGNITE-20388 Fix it
assertThat(createTblFut,
willThrowWithCauseOrSuppressed(TableExistsValidationException.class));
assertThat(createTblIfNotExistsFut, willCompleteSuccessfully());
}
@@ -328,8 +332,10 @@ public class ItTablesApiTest extends IgniteAbstractTest {
for (Ignite ignite : clusterNodes) {
if (ignite != ignite1) {
- // TODO: IGNITE-20388 Fix it
- assertThrowsWithCause(() -> addColumn(ignite, TABLE_NAME),
CatalogValidationException.class);
+ assertThrowsSqlException(
+ Sql.STMT_VALIDATION_ERR,
+ "Failed to validate query. Column with name 'VALINT3'
already exists",
+ () -> addColumn(ignite, TABLE_NAME));
}
}
@@ -337,7 +343,6 @@ public class ItTablesApiTest extends IgniteAbstractTest {
ignite1Inhibitor.stopInhibit();
- // TODO: IGNITE-20388 Fix it
assertThat(addColFut,
willThrowWithCauseOrSuppressed(CatalogValidationException.class));
}
@@ -400,8 +405,10 @@ public class ItTablesApiTest extends IgniteAbstractTest {
assertNull(((IgniteTablesInternal) ignite.tables()).table(tblId));
- // TODO: IGNITE-20388 Fix it
- assertThrowsWithCause(() -> dropTable(ignite, TABLE_NAME),
TableNotFoundValidationException.class);
+ assertThrowsSqlException(
+ Sql.STMT_VALIDATION_ERR,
+ "Table with name 'PUBLIC.TBL1' not found",
+ () -> dropTable(ignite, TABLE_NAME));
dropTableIfExists(ignite, TABLE_NAME);
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
similarity index 67%
copy from
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
copy to
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
index 069db0aab4..5f94a23702 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
@@ -19,41 +19,28 @@ package org.apache.ignite.internal.sql.api;
import static
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
import static
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsTableScan;
+import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.asStream;
import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
-import org.apache.ignite.internal.client.sql.ClientSql;
import org.apache.ignite.internal.sql.api.ColumnMetadataImpl.ColumnOriginImpl;
import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.lang.ColumnAlreadyExistsException;
import org.apache.ignite.lang.ColumnNotFoundException;
import org.apache.ignite.lang.ErrorGroups;
@@ -71,27 +58,30 @@ import org.apache.ignite.sql.ColumnType;
import org.apache.ignite.sql.CursorClosedException;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.NoRowSetExpectedException;
+import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.ResultSetMetadata;
import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.SqlBatchException;
import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.SqlRow;
-import org.apache.ignite.sql.async.AsyncResultSet;
import org.apache.ignite.table.Table;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionOptions;
import org.hamcrest.Matcher;
-import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
/**
- * Tests for asynchronous SQL API.
+ * Tests for SQL API.
+ * Tests will be run through synchronous, asynchronous API and client entry
points.
+ * By default, any SQL API test should be added to the base class and use
special provided methods to interact
+ * with the API in a API-type-independent manner. For any API-specific test,
should be used the appropriate subclass.
*/
-@SuppressWarnings("ThrowableNotThrown")
-public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
- private static final int ROW_COUNT = 16;
+public abstract class ItSqlApiBaseTest extends ClusterPerClassIntegrationTest {
+ protected static final int ROW_COUNT = 16;
@AfterEach
public void dropTables() {
@@ -238,31 +228,11 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
);
}
- @Test
- public void dml() {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- IgniteSql sql = igniteSql();
- Session ses = sql.createSession();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
- }
-
- checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1);
-
- checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
- }
-
/** Check all transactions are processed correctly even with case of sql
Exception raised. */
@Test
public void implicitTransactionsStates() {
IgniteSql sql = igniteSql();
- if (sql instanceof ClientSql) {
- return;
- }
-
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
Session ses = sql.createSession();
@@ -270,59 +240,22 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
TxManager txManager = txManager();
for (int i = 0; i < ROW_COUNT; ++i) {
- CompletableFuture<AsyncResultSet<SqlRow>> fut =
ses.executeAsync(null, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)", i, i);
-
- AsyncResultSet asyncRes = null;
-
- try {
- asyncRes = await(fut);
- } catch (Throwable ignore) {
- // No op.
- }
-
- if (asyncRes != null) {
- await(asyncRes.closeAsync());
- }
+ assertThrowsSqlException(
+ Sql.STMT_VALIDATION_ERR,
+ "Table with name 'PUBLIC.TEST' already exists",
+ () -> execute(ses, "CREATE TABLE TEST(ID INT PRIMARY KEY,
VAL0 INT)")
+ );
}
// No new transactions through ddl.
assertEquals(0, txManager.pending());
}
- /** Check correctness of explicit transaction rollback. */
- @Test
- public void checkExplicitTxRollback() {
- IgniteSql sql = igniteSql();
-
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- Session ses = sql.createSession();
-
- // Outer tx with further commit.
- Transaction outerTx = igniteTx().begin();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", outerTx, i, i);
- }
-
- await(outerTx.rollbackAsync());
-
- AsyncResultSet rs = await(ses.executeAsync(null, "SELECT VAL0 FROM
TEST ORDER BY VAL0"));
-
- assertEquals(0, StreamSupport.stream(rs.currentPage().spliterator(),
false).count());
-
- await(rs.closeAsync());
- }
-
/** Check correctness of implicit and explicit transactions. */
@Test
public void checkTransactionsWithDml() {
IgniteSql sql = igniteSql();
- if (sql instanceof ClientSql) {
- return;
- }
-
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
Session ses = sql.createSession();
@@ -339,36 +272,36 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
Transaction outerTx = igniteTx().begin();
for (int i = ROW_COUNT; i < 2 * ROW_COUNT; ++i) {
- checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", outerTx, i, i);
+ checkDml(1, outerTx, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
}
- outerTx.commit();
+ commit(outerTx);
// Outdated tx.
Transaction outerTx0 = outerTx;
- //ToDo: IGNITE-20387 , here should be used assertThrowsSqlException
method with code and message `"Transaction is already finished"
- IgniteException e = assertThrows(IgniteException.class,
- () -> checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)",
outerTx0, ROW_COUNT, Integer.MAX_VALUE));
- assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR,
e.code());
+ assertThrowsSqlException(
+ Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR,
+ "Transaction is already finished",
+ () -> checkDml(1, outerTx0, ses, "INSERT INTO TEST VALUES (?,
?)", ROW_COUNT, Integer.MAX_VALUE));
assertThrowsSqlException(
Sql.CONSTRAINT_VIOLATION_ERR,
"PK unique constraint is violated",
() -> checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)",
ROW_COUNT, Integer.MAX_VALUE));
- AsyncResultSet rs = await(ses.executeAsync(null, "SELECT VAL0 FROM
TEST ORDER BY VAL0"));
+ ResultSet<SqlRow> rs = executeForRead(ses, "SELECT VAL0 FROM TEST
ORDER BY VAL0");
- assertEquals(2 * ROW_COUNT,
StreamSupport.stream(rs.currentPage().spliterator(), false).count());
+ assertEquals(2 * ROW_COUNT, asStream(rs).count());
- rs.closeAsync();
+ rs.close();
outerTx = igniteTx().begin();
- rs = await(ses.executeAsync(outerTx, "SELECT VAL0 FROM TEST ORDER BY
VAL0"));
+ rs = executeForRead(ses, outerTx, "SELECT VAL0 FROM TEST ORDER BY
VAL0");
- assertEquals(2 * ROW_COUNT,
StreamSupport.stream(rs.currentPage().spliterator(), false).count());
+ assertEquals(2 * ROW_COUNT, asStream(rs).count());
- rs.closeAsync();
+ rs.close();
outerTx.commit();
@@ -381,6 +314,32 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
assertEquals(0, txManagerInternal.pending());
}
+ /** Check correctness of explicit transaction rollback. */
+ @Test
+ public void checkExplicitTxRollback() {
+ IgniteSql sql = igniteSql();
+
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+ Session ses = sql.createSession();
+
+ // Outer tx with further commit.
+ Transaction outerTx = igniteTx().begin();
+
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ checkDml(1, outerTx, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
+ rollback(outerTx);
+
+ ResultSet<SqlRow> rs = executeForRead(ses, "SELECT VAL0 FROM TEST
ORDER BY VAL0");
+
+ asStream(rs);
+ assertEquals(0, asStream(rs).count());
+
+ rs.close();
+ }
+
/** Check correctness of rw and ro transactions for table scan. */
@Test
public void checkMixedTransactionsForTable() {
@@ -391,6 +350,7 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
checkMixedTransactions(planMatcher);
}
+
/** Check correctness of rw and ro transactions for index scan. */
@Test
public void checkMixedTransactionsForIndex() throws Exception {
@@ -405,10 +365,6 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
private void checkMixedTransactions(Matcher<String> planMatcher) {
IgniteSql sql = igniteSql();
- if (sql instanceof ClientSql) {
- return;
- }
-
Session ses = sql.createSession();
for (int i = 0; i < ROW_COUNT; ++i) {
@@ -432,11 +388,11 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
assertQuery(outerTx, query).matches(planMatcher).check();
- AsyncResultSet rs = await(ses.executeAsync(outerTx, query));
+ ResultSet<SqlRow> rs = executeForRead(ses, outerTx, query);
- assertEquals(ROW_COUNT,
StreamSupport.stream(rs.currentPage().spliterator(), false).count());
+ assertEquals(ROW_COUNT, asStream(rs).count());
- rs.closeAsync();
+ rs.close();
if (outerTx != null) {
if (commit) {
@@ -447,29 +403,6 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
}
}
- @Test
- public void select() {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- IgniteSql sql = igniteSql();
- Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT /
4).build();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
- }
-
- TestPageProcessor pageProc = new TestPageProcessor(4);
- await(ses.executeAsync(null, "SELECT ID FROM
TEST").thenCompose(pageProc));
-
- Set<Integer> rs = pageProc.result().stream().map(r ->
r.intValue(0)).collect(Collectors.toSet());
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- assertTrue(rs.remove(i), "Results invalid: " + pageProc.result());
- }
-
- assertTrue(rs.isEmpty());
- }
-
@Test
public void metadata() {
sql("CREATE TABLE TEST(COL0 BIGINT PRIMARY KEY, COL1 VARCHAR NOT
NULL)");
@@ -477,9 +410,9 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
IgniteSql sql = igniteSql();
Session ses = sql.sessionBuilder().build();
- ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", 1L, "some string");
+ execute(ses, "INSERT INTO TEST VALUES (?, ?)", 1L, "some string");
- AsyncResultSet<SqlRow> rs = await(ses.executeAsync(null, "SELECT COL1,
COL0 FROM TEST"));
+ ResultSet<SqlRow> rs = executeForRead(ses, "SELECT COL1, COL0 FROM
TEST");
// Validate columns metadata.
ResultSetMetadata meta = rs.metadata();
@@ -508,11 +441,10 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
// Validate result columns types.
assertTrue(rs.hasRowSet());
- assertEquals(1, rs.currentPageSize());
- SqlRow row = rs.currentPage().iterator().next();
+ SqlRow row = rs.next();
- await(rs.closeAsync());
+ rs.close();
assertInstanceOf(meta.columns().get(0).valueClass(), row.value(0));
assertInstanceOf(meta.columns().get(1).valueClass(), row.value(1));
@@ -523,9 +455,9 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
IgniteSql sql = igniteSql();
Session ses = sql.sessionBuilder().build();
- AsyncResultSet<SqlRow> ars = await(ses.executeAsync(null, "SELECT 1 as
COL_A, 2 as COL_B"));
+ ResultSet<SqlRow> rs = executeForRead(ses, "SELECT 1 as COL_A, 2 as
COL_B");
- SqlRow r = CollectionUtils.first(ars.currentPage());
+ SqlRow r = rs.next();
assertEquals(2, r.columnCount());
assertEquals(0, r.columnIndex("COL_A"));
@@ -548,59 +480,42 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
assertThrowsWithCause(() -> r.intValue(-2),
IndexOutOfBoundsException.class);
assertThrowsWithCause(() -> r.intValue(10),
IndexOutOfBoundsException.class);
- await(ars.closeAsync());
+ rs.close();
}
@Test
- public void pageSequence() {
+ public void closeSession() {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
IgniteSql sql = igniteSql();
- Session ses = sql.sessionBuilder().defaultPageSize(1).build();
+ Session ses = sql.sessionBuilder().defaultPageSize(2).build();
for (int i = 0; i < ROW_COUNT; ++i) {
- ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+ execute(ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
}
- AsyncResultSet<SqlRow> ars0 = await(ses.executeAsync(null, "SELECT ID
FROM TEST ORDER BY ID"));
- var p0 = ars0.currentPage();
- AsyncResultSet<SqlRow> ars1 = await(ars0.fetchNextPage());
- var p1 = ars1.currentPage();
- AsyncResultSet<SqlRow> ars2 =
await(ars1.fetchNextPage().toCompletableFuture());
- var p2 = ars2.currentPage();
- AsyncResultSet<SqlRow> ars3 = await(ars1.fetchNextPage());
- var p3 = ars3.currentPage();
- AsyncResultSet<SqlRow> ars4 = await(ars0.fetchNextPage());
- var p4 = ars4.currentPage();
-
- assertSame(ars0, ars1);
- assertSame(ars0, ars2);
- assertSame(ars0, ars3);
- assertSame(ars0, ars4);
+ ResultSet rs = executeForRead(ses, "SELECT ID FROM TEST");
- List<SqlRow> res = Stream.of(p0, p1, p2, p3, p4)
- .flatMap(p -> StreamSupport.stream(p.spliterator(), false))
- .collect(Collectors.toList());
+ ses.close();
- TestPageProcessor pageProc = new TestPageProcessor(ROW_COUNT -
res.size());
- await(ars4.fetchNextPage().thenCompose(pageProc));
-
- res.addAll(pageProc.result());
+ SqlException sqlEx = assertThrowsSqlException(
+ Sql.EXECUTION_CANCELLED_ERR,
+ "The query was cancelled while executing",
+ () -> rs.forEachRemaining(System.out::println));
- for (int i = 0; i < ROW_COUNT; ++i) {
- assertEquals(i, res.get(i).intValue(0));
- }
+ assertTrue(IgniteTestUtils.hasCause(sqlEx,
QueryCancelledException.class, null));
+ assertThrowsSqlException(Sql.SESSION_CLOSED_ERR, "Session is closed",
() -> execute(ses, "SELECT ID FROM TEST"));
}
@Test
- public void errors() {
+ public void errors() throws InterruptedException {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT NOT NULL)");
IgniteSql sql = igniteSql();
- Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT /
2).build();
+ Session ses = sql.sessionBuilder().defaultPageSize(2).build();
for (int i = 0; i < ROW_COUNT; ++i) {
- ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+ execute(ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
}
// Parse error.
@@ -628,87 +543,71 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
// No result set error.
{
- AsyncResultSet ars = await(ses.executeAsync(null, "CREATE TABLE
TEST3 (ID INT PRIMARY KEY)"));
+ ResultSet rs = executeForRead(ses, "CREATE TABLE TEST3 (ID INT
PRIMARY KEY)");
assertThrowsSqlException(
NoRowSetExpectedException.class,
Sql.QUERY_NO_RESULT_SET_ERR, "Query has no result set",
- () -> await(ars.fetchNextPage()));
+ () -> rs.next());
}
// Cursor closed error.
{
- AsyncResultSet ars = await(ses.executeAsync(null, "SELECT * FROM
TEST"));
- await(ars.closeAsync());
+ ResultSet rs = executeForRead(ses, "SELECT * FROM TEST");
+ Thread.sleep(300); // ResultSetImpl fetches next page in
background, wait to it to complete to avoid flakiness.
+ rs.close();
assertThrowsSqlException(
CursorClosedException.class,
Sql.CURSOR_CLOSED_ERR,
"Cursor is closed",
- () -> await(ars.fetchNextPage()));
+ () -> rs.forEachRemaining(Object::hashCode));
}
}
- /**
- * DDL is non-transactional.
- */
@Test
- public void ddlInTransaction() {
- Session ses = igniteSql().createSession();
+ public void dml() {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
- {
- Transaction tx = igniteTx().begin();
- try {
- assertThrowsSqlException(
- Sql.STMT_VALIDATION_ERR,
- "DDL doesn't support transactions.",
- () -> await(ses.executeAsync(tx, "CREATE TABLE
TEST2(ID INT PRIMARY KEY, VAL0 INT)"))
- );
- } finally {
- tx.rollback();
- }
- }
- {
- Transaction tx = igniteTx().begin();
- AsyncResultSet<SqlRow> res = await(ses.executeAsync(tx, "INSERT
INTO TEST VALUES (?, ?)", -1, -1));
- assertEquals(1, res.affectedRows());
+ IgniteSql sql = igniteSql();
+ Session ses = sql.createSession();
- assertThrowsSqlException(
- Sql.STMT_VALIDATION_ERR,
- "DDL doesn't support transactions.",
- () -> await(ses.executeAsync(tx, "CREATE TABLE TEST2(ID
INT PRIMARY KEY, VAL0 INT)"))
- );
- tx.commit();
+ TxManager txManager = txManager();
- assertTrue(await(ses.executeAsync(null, "SELECT ID FROM TEST WHERE
ID = -1")).currentPage().iterator().hasNext());
+ int txPrevCnt = txManager.finished();
+
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
}
+
+ assertEquals(ROW_COUNT, txManager.finished() - txPrevCnt);
+ // No new transactions through ddl.
+ assertEquals(0, txManager.pending());
+
+ checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1);
+
+ checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
}
@Test
- public void closeSession() {
+ public void select() {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
IgniteSql sql = igniteSql();
- Session ses = sql.sessionBuilder().defaultPageSize(2).build();
+ Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT /
4).build();
for (int i = 0; i < ROW_COUNT; ++i) {
ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
}
- AsyncResultSet ars0 = await(ses.executeAsync(null, "SELECT ID FROM
TEST"));
+ ResultProcessor resultProcessor = execute(4, ses, "SELECT ID FROM
TEST");
- await(ses.closeAsync());
+ Set<Integer> rs = resultProcessor.result().stream().map(r ->
r.intValue(0)).collect(Collectors.toSet());
- // Fetched page is available after cancel.
- ars0.currentPage();
-
- SqlException sqlEx = assertThrowsSqlException(
- Sql.EXECUTION_CANCELLED_ERR,
- "The query was cancelled while executing",
- () -> await(ars0.fetchNextPage()));
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ assertTrue(rs.remove(i), "Results invalid: " +
resultProcessor.result());
+ }
- assertTrue(IgniteTestUtils.hasCause(sqlEx,
QueryCancelledException.class, null));
- assertThrowsSqlException(Sql.SESSION_CLOSED_ERR, "Session is closed",
() -> await(ses.executeAsync(null, "SELECT ID FROM TEST")));
+ assertTrue(rs.isEmpty());
}
@Test
@@ -724,7 +623,7 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
args.add(i, i);
}
- long[] batchRes = await(ses.executeBatchAsync(null, "INSERT INTO TEST
VALUES (?, ?)", args));
+ long[] batchRes = executeBatch(ses, "INSERT INTO TEST VALUES (?, ?)",
args);
Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r));
@@ -737,13 +636,13 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
SqlBatchException.class,
Sql.STMT_VALIDATION_ERR,
"Invalid SQL statement type",
- () -> await(ses.executeBatchAsync(null, "SELECT * FROM TEST",
args)));
+ () -> executeBatch(ses, "SELECT * FROM TEST", args));
assertThrowsSqlException(
SqlBatchException.class,
Sql.STMT_VALIDATION_ERR,
"Invalid SQL statement type",
- () -> await(ses.executeBatchAsync(null, "CREATE TABLE TEST1(ID
INT PRIMARY KEY, VAL0 INT)", args)));
+ () -> executeBatch(ses, "CREATE TABLE TEST1(ID INT PRIMARY
KEY, VAL0 INT)", args));
}
@Test
@@ -769,33 +668,99 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
SqlBatchException.class,
Sql.CONSTRAINT_VIOLATION_ERR,
"PK unique constraint is violated",
- () -> await(ses.executeBatchAsync(null, "INSERT INTO TEST
VALUES (?, ?)", args))
+ () -> executeBatch(ses, "INSERT INTO TEST VALUES (?, ?)", args)
);
assertEquals(err, ex.updateCounters().length);
IntStream.range(0, ex.updateCounters().length).forEach(i ->
assertEquals(1, ex.updateCounters()[i]));
}
- @Test
- public void resultSetCloseShouldFinishImplicitTransaction() throws
InterruptedException {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+ @ParameterizedTest
+ @ValueSource(strings = {
+ "INSERT INTO tst VALUES (2, ?)",
+ "SELECT * FROM tst WHERE id = ? "
+ })
+ public void runtimeErrorInDmlCausesTransactionToFail(String query) {
+ sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)");
+ sql("INSERT INTO tst VALUES (?,?)", 1, 1);
+
+ try (Session ses = igniteSql().createSession()) {
+ Transaction tx = igniteTx().begin();
+ String dmlQuery = "UPDATE tst SET val = val/(val - ?) + 1";
+
+ assertThrowsSqlException(
+ Sql.RUNTIME_ERR,
+ "/ by zero",
+ () -> execute(tx, ses, dmlQuery, 1).affectedRows());
+
+ IgniteException err = assertThrows(IgniteException.class, () -> {
+ ResultSet<SqlRow> rs = executeForRead(ses, tx, query, 2);
+ if (rs.hasRowSet()) {
+ assertTrue(rs.hasNext());
+ } else {
+ assertTrue(rs.wasApplied());
+ }
+ });
+
+ assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR,
err.code(), err.toString());
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ "INSERT INTO tst VALUES (2, ?)",
+ "SELECT * FROM tst WHERE id = ? "
+ })
+ public void runtimeErrorInQueryCausesTransactionToFail(String query) {
+ sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)");
+
+ sql("INSERT INTO tst VALUES (?,?)", 1, 1);
+
+ try (Session ses = igniteSql().createSession()) {
+ Transaction tx = igniteTx().begin();
+
+ assertThrowsSqlException(
+ Sql.RUNTIME_ERR,
+ "/ by zero",
+ () -> execute(tx, ses, "SELECT val/? FROM tst WHERE id=?",
0, 1));
+
+ IgniteException err = assertThrows(IgniteException.class, () -> {
+ ResultSet<SqlRow> rs = executeForRead(ses, tx, query, 2);
+ if (rs.hasRowSet()) {
+ assertTrue(rs.hasNext());
+ } else {
+ assertTrue(rs.wasApplied());
+ }
+ });
+
+ assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR,
err.code(), err.toString());
+ }
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-20534")
+ @Test
+ public void testLockIsNotReleasedAfterTxRollback() {
IgniteSql sql = igniteSql();
- Session ses = sql.sessionBuilder().defaultPageSize(2).build();
- for (int i = 0; i < ROW_COUNT; ++i) {
- ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+ try (Session ses = sql.createSession()) {
+ checkDdl(true, ses, "CREATE TABLE IF NOT EXISTS tst(id INTEGER
PRIMARY KEY, val INTEGER)");
}
- CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null,
"SELECT * FROM TEST");
+ try (Session session = sql.createSession()) {
+ Transaction tx = igniteTx().begin();
+
+ assertThrows(RuntimeException.class, () -> execute(tx, session,
"SELECT 1/0"));
+ tx.rollback();
+ session.execute(tx, "INSERT INTO tst VALUES (1, 1)");
+ }
- AsyncResultSet<SqlRow> ars = f.join();
- // There should be a pending transaction since not all data was read.
- boolean txStarted = waitForCondition(() -> txManager().pending() == 1,
5000);
- assertTrue(txStarted, "No pending transactions");
+ try (Session session = sql.createSession()) {
+ Transaction tx = igniteTx().begin(new
TransactionOptions().readOnly(false));
- ars.closeAsync().join();
- assertEquals(0, txManager().pending(), "Expected no pending
transactions");
+ execute(tx, session, "INSERT INTO tst VALUES (1, 1)");
+ tx.commit();
+ }
}
@Test
@@ -803,127 +768,141 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
IgniteSql sql = igniteSql();
-
// Fetch all data in one read.
Session ses = sql.sessionBuilder().defaultPageSize(100).build();
-
for (int i = 0; i < ROW_COUNT; ++i) {
- ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
+ execute(ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
}
- CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null,
"SELECT * FROM TEST");
-
- AsyncResultSet<SqlRow> ars = f.join();
- assertFalse(ars.hasMorePages());
+ execute(1, ses, "SELECT * FROM TEST");
assertEquals(0, txManager().pending(), "Expected no pending
transactions");
}
- private static void checkDdl(boolean expectedApplied, Session ses, String
sql, Transaction tx) {
- CompletableFuture<AsyncResultSet<SqlRow>> fut = ses.executeAsync(
- tx,
- sql
- );
+ /**
+ * DDL is non-transactional.
+ */
+ @Test
+ public void ddlInTransaction() {
+ Session ses = igniteSql().createSession();
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
- AsyncResultSet<SqlRow> asyncRes = await(fut);
+ {
+ Transaction tx = igniteTx().begin();
+ try {
+ assertThrowsSqlException(
+ Sql.STMT_VALIDATION_ERR,
+ "DDL doesn't support transactions.",
+ () -> execute(tx, ses, "CREATE TABLE TEST2(ID INT
PRIMARY KEY, VAL0 INT)")
+ );
+ } finally {
+ tx.rollback();
+ }
+ }
+ {
+ Transaction tx = igniteTx().begin();
+ ResultProcessor result = execute(tx, ses, "INSERT INTO TEST VALUES
(?, ?)", -1, -1);
+ assertEquals(1, result.affectedRows());
- assertEquals(expectedApplied, asyncRes.wasApplied());
- assertFalse(asyncRes.hasMorePages());
- assertFalse(asyncRes.hasRowSet());
- assertEquals(-1, asyncRes.affectedRows());
+ assertThrowsSqlException(
+ Sql.STMT_VALIDATION_ERR,
+ "DDL doesn't support transactions.",
+ () -> ses.execute(tx, "CREATE TABLE TEST2(ID INT PRIMARY
KEY, VAL0 INT)")
+ );
+ tx.commit();
- assertNull(asyncRes.metadata());
+ assertEquals(1, execute(ses, "SELECT ID FROM TEST WHERE ID =
-1").result().size());
+ }
- await(asyncRes.closeAsync());
+ assertEquals(0, txManager().pending());
}
- private static void checkDdl(boolean expectedApplied, Session ses, String
sql) {
- checkDdl(expectedApplied, ses, sql, null);
+ @Test
+ public void resultSetCloseShouldFinishImplicitTransaction() {
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+ IgniteSql sql = igniteSql();
+ Session ses = sql.sessionBuilder().defaultPageSize(2).build();
+
+ for (int i = 0; i < ROW_COUNT; ++i) {
+ execute(ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
+ }
+
+ ResultSet<?> rs = executeForRead(ses, "SELECT * FROM TEST");
+ assertEquals(1, txManager().pending());
+ rs.close();
+ assertEquals(0, txManager().pending(), "Expected no pending
transactions");
+ }
+
+ protected ResultSet<SqlRow> executeForRead(Session ses, String query) {
+ return executeForRead(ses, null, query);
}
- private static <T extends IgniteException> T checkError(Class<T> expCls,
Integer code, String msg, Session ses, String sql,
+ protected abstract ResultSet<SqlRow> executeForRead(Session ses,
Transaction tx, String query, Object... args);
+
+ protected <T extends IgniteException> T checkError(Class<T> expCls,
Integer code, String msg, Session ses, String sql,
Object... args) {
- return assertThrowsPublicException(() -> await(ses.executeAsync(null,
sql, args)), expCls, code, msg);
+ T ex = assertThrows(expCls, () -> execute(ses, sql, args));
+
+ if (code != null) {
+ assertEquals(new IgniteException(code).codeAsString(),
ex.codeAsString());
+ }
+
+ if (msg != null) {
+ assertThat(ex.getMessage(), containsString(msg));
+ }
+
+ return ex;
}
- private static SqlException checkSqlError(
+ protected SqlException checkSqlError(
int code,
String msg,
Session ses,
String sql,
Object... args
) {
- return assertThrowsSqlException(code, msg, () ->
await(ses.executeAsync(null, sql, args)));
+ return assertThrowsSqlException(code, msg, () -> execute(ses, sql,
args));
}
- protected static void checkDml(int expectedAffectedRows, Session ses,
String sql, Transaction tx, Object... args) {
- AsyncResultSet asyncRes = await(ses.executeAsync(tx, sql, args));
+ protected abstract long[] executeBatch(Session ses, String sql,
BatchedArguments args);
- assertFalse(asyncRes.wasApplied());
- assertFalse(asyncRes.hasMorePages());
- assertFalse(asyncRes.hasRowSet());
- assertEquals(expectedAffectedRows, asyncRes.affectedRows());
+ protected abstract ResultProcessor execute(Integer expectedPages,
Transaction tx, Session ses, String sql, Object... args);
- assertNull(asyncRes.metadata());
-
- await(asyncRes.closeAsync());
+ protected ResultProcessor execute(int expectedPages, Session ses, String
sql, Object... args) {
+ return execute(expectedPages, null, ses, sql, args);
}
- private static void checkDml(int expectedAffectedRows, Session ses, String
sql, Object... args) {
- checkDml(expectedAffectedRows, ses, sql, null, args);
+ protected ResultProcessor execute(Transaction tx, Session ses, String sql,
Object... args) {
+ return execute(null, tx, ses, sql, args);
}
- static <T extends IgniteException> T assertThrowsPublicException(
- RunnableX executable,
- Class<T> expCls,
- @Nullable Integer code,
- @Nullable String msgPart
- ) {
- T ex = assertThrows(expCls, executable::run);
-
- if (code != null) {
- assertEquals(new IgniteException(code).codeAsString(),
ex.codeAsString());
- }
-
- if (msgPart != null) {
- assertThat(ex.getMessage(), containsString(msgPart));
- }
-
- return ex;
+ protected ResultProcessor execute(Session ses, String sql, Object... args)
{
+ return execute(null, null, ses, sql, args);
}
- static class TestPageProcessor implements
- Function<AsyncResultSet<SqlRow>,
CompletionStage<AsyncResultSet<SqlRow>>> {
- private int expectedPages;
+ protected abstract void rollback(Transaction outerTx);
- private final List<SqlRow> res = new ArrayList<>();
+ protected abstract void commit(Transaction outerTx);
- TestPageProcessor(int expectedPages) {
- this.expectedPages = expectedPages;
- }
+ protected void checkDml(int expectedAffectedRows, Transaction tx, Session
ses, String sql, Object... args) {
- @Override
- public CompletionStage<AsyncResultSet<SqlRow>>
apply(AsyncResultSet<SqlRow> rs) {
- expectedPages--;
+ }
- assertTrue(rs.hasRowSet());
- assertFalse(rs.wasApplied());
- assertEquals(-1L, rs.affectedRows());
- assertEquals(expectedPages > 0, rs.hasMorePages(),
- "hasMorePages(): [expected=" + (expectedPages > 0) + ",
actual=" + rs.hasMorePages() + ']');
+ protected void checkDml(int expectedAffectedRows, Session ses, String sql,
Object... args) {
+ checkDml(expectedAffectedRows, null, ses, sql, args);
+ }
- rs.currentPage().forEach(res::add);
+ protected void checkDdl(boolean expectedApplied, Session ses, String sql) {
+ checkDdl(expectedApplied, ses, sql, null);
+ }
- if (rs.hasMorePages()) {
- return rs.fetchNextPage().thenCompose(this);
- }
+ protected abstract void checkDdl(boolean expectedApplied, Session ses,
String sql, Transaction tx);
- return rs.closeAsync().thenApply(v -> rs);
- }
+ /** Represent result of running SQL query to hide implementation specific
for different version of tests. */
+ protected interface ResultProcessor {
+ List<SqlRow> result();
- public List<SqlRow> result() {
- //noinspection AssignmentOrReturnOfFieldWithMutableType
- return res;
- }
+ long affectedRows();
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 069db0aab4..77d45cc8f6 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -17,540 +17,36 @@
package org.apache.ignite.internal.sql.api;
-import static
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
-import static
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsTableScan;
-import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
-import org.apache.ignite.internal.catalog.commands.CatalogUtils;
-import org.apache.ignite.internal.client.sql.ClientSql;
-import org.apache.ignite.internal.sql.api.ColumnMetadataImpl.ColumnOriginImpl;
-import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest;
-import org.apache.ignite.internal.sql.engine.QueryCancelledException;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
-import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.util.CollectionUtils;
-import org.apache.ignite.lang.ColumnAlreadyExistsException;
-import org.apache.ignite.lang.ColumnNotFoundException;
-import org.apache.ignite.lang.ErrorGroups;
-import org.apache.ignite.lang.ErrorGroups.Index;
-import org.apache.ignite.lang.ErrorGroups.Sql;
-import org.apache.ignite.lang.ErrorGroups.Transactions;
-import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IndexAlreadyExistsException;
-import org.apache.ignite.lang.IndexNotFoundException;
-import org.apache.ignite.lang.TableAlreadyExistsException;
-import org.apache.ignite.lang.TableNotFoundException;
+import org.apache.ignite.internal.sql.SyncResultSetAdapter;
import org.apache.ignite.sql.BatchedArguments;
-import org.apache.ignite.sql.ColumnMetadata;
-import org.apache.ignite.sql.ColumnType;
-import org.apache.ignite.sql.CursorClosedException;
import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.sql.NoRowSetExpectedException;
-import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.SqlBatchException;
-import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.async.AsyncResultSet;
-import org.apache.ignite.table.Table;
import org.apache.ignite.tx.Transaction;
-import org.apache.ignite.tx.TransactionOptions;
-import org.hamcrest.Matcher;
-import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
* Tests for asynchronous SQL API.
*/
@SuppressWarnings("ThrowableNotThrown")
-public class ItSqlAsynchronousApiTest extends ClusterPerClassIntegrationTest {
- private static final int ROW_COUNT = 16;
-
- @AfterEach
- public void dropTables() {
- for (Table t : CLUSTER_NODES.get(0).tables().tables()) {
- sql("DROP TABLE " + t.name());
- }
- }
-
- @Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-20096")
- public void ddl() throws Exception {
- IgniteSql sql = igniteSql();
- Session ses = sql.createSession();
-
- // CREATE TABLE
- checkDdl(true, ses, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
- checkError(
- TableAlreadyExistsException.class,
- ErrorGroups.Table.TABLE_ALREADY_EXISTS_ERR,
- "Table already exists [name=\"PUBLIC\".\"TEST\"]",
- ses,
- "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"
- );
- checkSqlError(
- ErrorGroups.Table.TABLE_DEFINITION_ERR,
- "Can't create table with duplicate columns: ID, VAL, VAL",
- ses,
- "CREATE TABLE TEST1(ID INT PRIMARY KEY, VAL INT, VAL INT)"
- );
- checkDdl(false, ses, "CREATE TABLE IF NOT EXISTS TEST(ID INT PRIMARY
KEY, VAL VARCHAR)");
-
- // ADD COLUMN
- checkDdl(true, ses, "ALTER TABLE TEST ADD COLUMN VAL1 VARCHAR");
- checkError(
- TableNotFoundException.class,
- ErrorGroups.Table.TABLE_NOT_FOUND_ERR,
- "The table does not exist
[name=\"PUBLIC\".\"NOT_EXISTS_TABLE\"]",
- ses,
- "ALTER TABLE NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR"
- );
- checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE ADD
COLUMN VAL1 VARCHAR");
- checkError(
- ColumnAlreadyExistsException.class,
- ErrorGroups.Table.COLUMN_ALREADY_EXISTS_ERR,
- "Column already exists [name=\"VAL1\"]",
- ses,
- "ALTER TABLE TEST ADD COLUMN VAL1 INT"
- );
-
- // CREATE INDEX
- checkDdl(true, ses, "CREATE INDEX TEST_IDX ON TEST(VAL0)");
- checkError(
- IndexAlreadyExistsException.class,
- Index.INDEX_ALREADY_EXISTS_ERR,
- "Index already exists [name=\"PUBLIC\".\"TEST_IDX\"]",
- ses,
- "CREATE INDEX TEST_IDX ON TEST(VAL1)"
- );
- checkDdl(false, ses, "CREATE INDEX IF NOT EXISTS TEST_IDX ON
TEST(VAL1)");
-
- // TODO: IGNITE-19150 We are waiting for schema synchronization to
avoid races to create and destroy indexes
- waitForIndexBuild("TEST", "TEST_IDX");
-
- checkDdl(true, ses, "DROP INDEX TESt_iDX");
- checkDdl(true, ses, "CREATE INDEX TEST_IDX1 ON TEST(VAL0)");
- checkDdl(true, ses, "CREATE INDEX TEST_IDX2 ON TEST(VAL0)");
- checkDdl(true, ses, "CREATE INDEX TEST_IDX3 ON TEST(ID, VAL0, VAL1)");
- checkSqlError(
- Index.INVALID_INDEX_DEFINITION_ERR,
- "Can't create index on duplicate columns: VAL0, VAL0",
- ses,
- "CREATE INDEX TEST_IDX4 ON TEST(VAL0, VAL0)"
- );
-
- checkSqlError(
- Sql.STMT_VALIDATION_ERR,
- "Can`t delete column(s). Column VAL1 is used by indexes
[TEST_IDX3].",
- ses,
- "ALTER TABLE TEST DROP COLUMN val1"
- );
-
- SqlException ex = checkSqlError(
- Sql.STMT_VALIDATION_ERR,
- "Can`t delete column(s).",
- ses,
- "ALTER TABLE TEST DROP COLUMN (val0, val1)"
- );
-
- String msg = ex.getMessage();
- String explainMsg = "Unexpected error message: " + msg;
-
- assertTrue(msg.contains("Column VAL0 is used by indexes ["),
explainMsg);
- assertTrue(msg.contains("TEST_IDX1") && msg.contains("TEST_IDX2") &&
msg.contains("TEST_IDX3"), explainMsg);
- assertTrue(msg.contains("Column VAL1 is used by indexes [TEST_IDX3]"),
explainMsg);
-
- checkSqlError(
- Sql.STMT_VALIDATION_ERR,
- "Can`t delete column, belongs to primary key: [name=ID]",
- ses,
- "ALTER TABLE TEST DROP COLUMN id"
- );
-
- // TODO: IGNITE-19150 We are waiting for schema synchronization to
avoid races to create and destroy indexes
- waitForIndexBuild("TEST", "TEST_IDX3");
- checkDdl(true, ses, "DROP INDEX TESt_iDX3");
-
- // DROP COLUMNS
- checkDdl(true, ses, "ALTER TABLE TEST DROP COLUMN VAL1");
- checkError(
- TableNotFoundException.class,
- ErrorGroups.Table.TABLE_NOT_FOUND_ERR,
- "The table does not exist
[name=\"PUBLIC\".\"NOT_EXISTS_TABLE\"]",
- ses,
- "ALTER TABLE NOT_EXISTS_TABLE DROP COLUMN VAL1"
- );
- checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE DROP
COLUMN VAL1");
- checkError(
- ColumnNotFoundException.class,
- ErrorGroups.Table.COLUMN_NOT_FOUND_ERR,
- "Column does not exist [tableName=\"PUBLIC\".\"TEST\",
columnName=\"VAL1\"]",
- ses,
- "ALTER TABLE TEST DROP COLUMN VAL1"
- );
-
- // DROP TABLE
- checkDdl(false, ses, "DROP TABLE IF EXISTS NOT_EXISTS_TABLE");
-
- checkDdl(true, ses, "DROP TABLE TEST");
- checkError(
- TableNotFoundException.class,
- ErrorGroups.Table.TABLE_NOT_FOUND_ERR,
- "The table does not exist [name=\"PUBLIC\".\"TEST\"]",
- ses,
- "DROP TABLE TEST"
- );
-
- checkDdl(false, ses, "DROP INDEX IF EXISTS TEST_IDX");
-
- checkError(
- IndexNotFoundException.class,
- Index.INDEX_NOT_FOUND_ERR,
- "Index does not exist [name=\"PUBLIC\".\"TEST_IDX\"]", ses,
- "DROP INDEX TEST_IDX"
- );
- }
-
- @Test
- public void dml() {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- IgniteSql sql = igniteSql();
- Session ses = sql.createSession();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
- }
-
- checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1);
-
- checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
- }
-
- /** Check all transactions are processed correctly even with case of sql
Exception raised. */
- @Test
- public void implicitTransactionsStates() {
- IgniteSql sql = igniteSql();
-
- if (sql instanceof ClientSql) {
- return;
- }
-
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- Session ses = sql.createSession();
-
- TxManager txManager = txManager();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- CompletableFuture<AsyncResultSet<SqlRow>> fut =
ses.executeAsync(null, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)", i, i);
-
- AsyncResultSet asyncRes = null;
-
- try {
- asyncRes = await(fut);
- } catch (Throwable ignore) {
- // No op.
- }
-
- if (asyncRes != null) {
- await(asyncRes.closeAsync());
- }
- }
-
- // No new transactions through ddl.
- assertEquals(0, txManager.pending());
- }
-
- /** Check correctness of explicit transaction rollback. */
- @Test
- public void checkExplicitTxRollback() {
- IgniteSql sql = igniteSql();
-
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- Session ses = sql.createSession();
-
- // Outer tx with further commit.
- Transaction outerTx = igniteTx().begin();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", outerTx, i, i);
- }
-
- await(outerTx.rollbackAsync());
-
- AsyncResultSet rs = await(ses.executeAsync(null, "SELECT VAL0 FROM
TEST ORDER BY VAL0"));
-
- assertEquals(0, StreamSupport.stream(rs.currentPage().spliterator(),
false).count());
-
- await(rs.closeAsync());
- }
-
- /** Check correctness of implicit and explicit transactions. */
- @Test
- public void checkTransactionsWithDml() {
- IgniteSql sql = igniteSql();
-
- if (sql instanceof ClientSql) {
- return;
- }
-
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- Session ses = sql.createSession();
-
- TxManager txManagerInternal = txManager();
-
- int txPrevCnt = txManagerInternal.finished();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
- }
-
- // Outer tx with further commit.
- Transaction outerTx = igniteTx().begin();
-
- for (int i = ROW_COUNT; i < 2 * ROW_COUNT; ++i) {
- checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", outerTx, i, i);
- }
-
- outerTx.commit();
-
- // Outdated tx.
- Transaction outerTx0 = outerTx;
- //ToDo: IGNITE-20387 , here should be used assertThrowsSqlException
method with code and message `"Transaction is already finished"
- IgniteException e = assertThrows(IgniteException.class,
- () -> checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)",
outerTx0, ROW_COUNT, Integer.MAX_VALUE));
- assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR,
e.code());
-
- assertThrowsSqlException(
- Sql.CONSTRAINT_VIOLATION_ERR,
- "PK unique constraint is violated",
- () -> checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)",
ROW_COUNT, Integer.MAX_VALUE));
-
- AsyncResultSet rs = await(ses.executeAsync(null, "SELECT VAL0 FROM
TEST ORDER BY VAL0"));
-
- assertEquals(2 * ROW_COUNT,
StreamSupport.stream(rs.currentPage().spliterator(), false).count());
-
- rs.closeAsync();
-
- outerTx = igniteTx().begin();
-
- rs = await(ses.executeAsync(outerTx, "SELECT VAL0 FROM TEST ORDER BY
VAL0"));
-
- assertEquals(2 * ROW_COUNT,
StreamSupport.stream(rs.currentPage().spliterator(), false).count());
-
- rs.closeAsync();
-
- outerTx.commit();
-
- checkDml(2 * ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1);
-
- checkDml(2 * ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
-
- assertEquals(ROW_COUNT + 1 + 1 + 1 + 1 + 1 + 1,
txManagerInternal.finished() - txPrevCnt);
-
- assertEquals(0, txManagerInternal.pending());
- }
-
- /** Check correctness of rw and ro transactions for table scan. */
- @Test
- public void checkMixedTransactionsForTable() {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- Matcher<String> planMatcher = containsTableScan("PUBLIC", "TEST");
-
- checkMixedTransactions(planMatcher);
- }
-
- /** Check correctness of rw and ro transactions for index scan. */
- @Test
- public void checkMixedTransactionsForIndex() throws Exception {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
- sql("CREATE INDEX TEST_IDX ON TEST(VAL0)");
-
- Matcher<String> planMatcher = containsIndexScan("PUBLIC", "TEST",
"TEST_IDX");
-
- checkMixedTransactions(planMatcher);
- }
-
- private void checkMixedTransactions(Matcher<String> planMatcher) {
- IgniteSql sql = igniteSql();
-
- if (sql instanceof ClientSql) {
- return;
- }
-
- Session ses = sql.createSession();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- sql("INSERT INTO TEST VALUES (?, ?)", i, i);
- }
-
- List<Boolean> booleanList = List.of(Boolean.TRUE, Boolean.FALSE);
- for (boolean roTx : booleanList) {
- for (boolean commit : booleanList) {
- for (boolean explicit : booleanList) {
- checkTx(ses, roTx, commit, explicit, planMatcher);
- }
- }
- }
- }
-
- private void checkTx(Session ses, boolean readOnly, boolean commit,
boolean explicit, Matcher<String> planMatcher) {
- Transaction outerTx = explicit ? (readOnly ? igniteTx().begin(new
TransactionOptions().readOnly(true)) : igniteTx().begin()) : null;
-
- String query = "SELECT VAL0 FROM TEST ORDER BY VAL0";
-
- assertQuery(outerTx, query).matches(planMatcher).check();
-
- AsyncResultSet rs = await(ses.executeAsync(outerTx, query));
-
- assertEquals(ROW_COUNT,
StreamSupport.stream(rs.currentPage().spliterator(), false).count());
-
- rs.closeAsync();
-
- if (outerTx != null) {
- if (commit) {
- outerTx.commit();
- } else {
- outerTx.rollback();
- }
- }
- }
-
- @Test
- public void select() {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- IgniteSql sql = igniteSql();
- Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT /
4).build();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
- }
-
- TestPageProcessor pageProc = new TestPageProcessor(4);
- await(ses.executeAsync(null, "SELECT ID FROM
TEST").thenCompose(pageProc));
-
- Set<Integer> rs = pageProc.result().stream().map(r ->
r.intValue(0)).collect(Collectors.toSet());
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- assertTrue(rs.remove(i), "Results invalid: " + pageProc.result());
- }
-
- assertTrue(rs.isEmpty());
- }
-
- @Test
- public void metadata() {
- sql("CREATE TABLE TEST(COL0 BIGINT PRIMARY KEY, COL1 VARCHAR NOT
NULL)");
-
- IgniteSql sql = igniteSql();
- Session ses = sql.sessionBuilder().build();
-
- ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", 1L, "some string");
-
- AsyncResultSet<SqlRow> rs = await(ses.executeAsync(null, "SELECT COL1,
COL0 FROM TEST"));
-
- // Validate columns metadata.
- ResultSetMetadata meta = rs.metadata();
-
- assertNotNull(meta);
- assertEquals(-1, meta.indexOf("COL"));
- assertEquals(0, meta.indexOf("COL1"));
- assertEquals(1, meta.indexOf("COL0"));
-
- checkMetadata(new ColumnMetadataImpl(
- "COL1",
- ColumnType.STRING,
- CatalogUtils.DEFAULT_VARLEN_LENGTH,
- ColumnMetadata.UNDEFINED_SCALE,
- false,
- new ColumnOriginImpl("PUBLIC", "TEST", "COL1")),
- meta.columns().get(0));
- checkMetadata(new ColumnMetadataImpl(
- "COL0",
- ColumnType.INT64,
- 19,
- 0,
- false,
- new ColumnOriginImpl("PUBLIC", "TEST", "COL0")),
- meta.columns().get(1));
-
- // Validate result columns types.
- assertTrue(rs.hasRowSet());
- assertEquals(1, rs.currentPageSize());
-
- SqlRow row = rs.currentPage().iterator().next();
-
- await(rs.closeAsync());
-
- assertInstanceOf(meta.columns().get(0).valueClass(), row.value(0));
- assertInstanceOf(meta.columns().get(1).valueClass(), row.value(1));
- }
-
- @Test
- public void sqlRow() {
- IgniteSql sql = igniteSql();
- Session ses = sql.sessionBuilder().build();
-
- AsyncResultSet<SqlRow> ars = await(ses.executeAsync(null, "SELECT 1 as
COL_A, 2 as COL_B"));
-
- SqlRow r = CollectionUtils.first(ars.currentPage());
-
- assertEquals(2, r.columnCount());
- assertEquals(0, r.columnIndex("COL_A"));
- assertEquals(0, r.columnIndex("col_a"));
- assertEquals(1, r.columnIndex("COL_B"));
- assertEquals(-1, r.columnIndex("notExistColumn"));
-
- assertEquals(1, r.intValue("COL_A"));
- assertEquals(1, r.intValue("COL_a"));
- assertEquals(2, r.intValue("COL_B"));
-
- assertThrowsWithCause(
- () -> r.intValue("notExistColumn"),
- IllegalArgumentException.class,
- "Column doesn't exist [name=notExistColumn]"
- );
-
- assertEquals(1, r.intValue(0));
- assertEquals(2, r.intValue(1));
- assertThrowsWithCause(() -> r.intValue(-2),
IndexOutOfBoundsException.class);
- assertThrowsWithCause(() -> r.intValue(10),
IndexOutOfBoundsException.class);
-
- await(ars.closeAsync());
- }
-
+public class ItSqlAsynchronousApiTest extends ItSqlApiBaseTest {
@Test
public void pageSequence() {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
@@ -582,7 +78,7 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
.flatMap(p -> StreamSupport.stream(p.spliterator(), false))
.collect(Collectors.toList());
- TestPageProcessor pageProc = new TestPageProcessor(ROW_COUNT -
res.size());
+ AsyncResultProcessor pageProc = new AsyncResultProcessor(ROW_COUNT -
res.size());
await(ars4.fetchNextPage().thenCompose(pageProc));
res.addAll(pageProc.result());
@@ -592,234 +88,50 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
}
}
- @Test
- public void errors() {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT NOT NULL)");
-
- IgniteSql sql = igniteSql();
- Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT /
2).build();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
- }
-
- // Parse error.
- checkSqlError(Sql.STMT_PARSE_ERR, "Failed to parse query", ses,
"SELECT ID FROM");
-
- // Validation errors.
- checkSqlError(Sql.STMT_VALIDATION_ERR, "Column 'VAL0' does not allow
NULLs", ses,
- "INSERT INTO TEST VALUES (2, NULL)");
-
- checkSqlError(Sql.STMT_VALIDATION_ERR, "Object 'NOT_EXISTING_TABLE'
not found", ses,
- "SELECT * FROM NOT_EXISTING_TABLE");
-
- checkSqlError(Sql.STMT_VALIDATION_ERR, "Column 'NOT_EXISTING_COLUMN'
not found", ses,
- "SELECT NOT_EXISTING_COLUMN FROM TEST");
-
- checkSqlError(Sql.STMT_VALIDATION_ERR, "Multiple statements are not
allowed", ses, "SELECT 1; SELECT 2");
-
- checkSqlError(Sql.STMT_VALIDATION_ERR, "Table without PRIMARY KEY is
not supported", ses,
- "CREATE TABLE TEST2 (VAL INT)");
-
- // Execute error.
- checkSqlError(Sql.RUNTIME_ERR, "/ by zero", ses, "SELECT 1 / ?", 0);
- checkSqlError(Sql.RUNTIME_ERR, "/ by zero", ses, "UPDATE TEST SET val0
= val0/(val0 - ?) + " + ROW_COUNT, 0);
- checkSqlError(Sql.RUNTIME_ERR, "negative substring length not
allowed", ses, "SELECT SUBSTRING('foo', 1, -3)");
-
- // No result set error.
- {
- AsyncResultSet ars = await(ses.executeAsync(null, "CREATE TABLE
TEST3 (ID INT PRIMARY KEY)"));
- assertThrowsSqlException(
- NoRowSetExpectedException.class,
- Sql.QUERY_NO_RESULT_SET_ERR, "Query has no result set",
- () -> await(ars.fetchNextPage()));
- }
-
- // Cursor closed error.
- {
- AsyncResultSet ars = await(ses.executeAsync(null, "SELECT * FROM
TEST"));
- await(ars.closeAsync());
-
- assertThrowsSqlException(
- CursorClosedException.class,
- Sql.CURSOR_CLOSED_ERR,
- "Cursor is closed",
- () -> await(ars.fetchNextPage()));
- }
+ @Override
+ protected ResultSet<SqlRow> executeForRead(Session ses, Transaction tx,
String query, Object... args) {
+ return new SyncResultSetAdapter(await(ses.executeAsync(tx, query,
args)));
}
- /**
- * DDL is non-transactional.
- */
- @Test
- public void ddlInTransaction() {
- Session ses = igniteSql().createSession();
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- {
- Transaction tx = igniteTx().begin();
- try {
- assertThrowsSqlException(
- Sql.STMT_VALIDATION_ERR,
- "DDL doesn't support transactions.",
- () -> await(ses.executeAsync(tx, "CREATE TABLE
TEST2(ID INT PRIMARY KEY, VAL0 INT)"))
- );
- } finally {
- tx.rollback();
- }
- }
- {
- Transaction tx = igniteTx().begin();
- AsyncResultSet<SqlRow> res = await(ses.executeAsync(tx, "INSERT
INTO TEST VALUES (?, ?)", -1, -1));
- assertEquals(1, res.affectedRows());
-
- assertThrowsSqlException(
- Sql.STMT_VALIDATION_ERR,
- "DDL doesn't support transactions.",
- () -> await(ses.executeAsync(tx, "CREATE TABLE TEST2(ID
INT PRIMARY KEY, VAL0 INT)"))
- );
- tx.commit();
-
- assertTrue(await(ses.executeAsync(null, "SELECT ID FROM TEST WHERE
ID = -1")).currentPage().iterator().hasNext());
- }
+ @Override
+ protected long[] executeBatch(Session ses, String sql, BatchedArguments
args) {
+ return await(ses.executeBatchAsync(null, sql, args));
}
- @Test
- public void closeSession() {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- IgniteSql sql = igniteSql();
- Session ses = sql.sessionBuilder().defaultPageSize(2).build();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
- }
-
- AsyncResultSet ars0 = await(ses.executeAsync(null, "SELECT ID FROM
TEST"));
-
- await(ses.closeAsync());
-
- // Fetched page is available after cancel.
- ars0.currentPage();
-
- SqlException sqlEx = assertThrowsSqlException(
- Sql.EXECUTION_CANCELLED_ERR,
- "The query was cancelled while executing",
- () -> await(ars0.fetchNextPage()));
-
- assertTrue(IgniteTestUtils.hasCause(sqlEx,
QueryCancelledException.class, null));
- assertThrowsSqlException(Sql.SESSION_CLOSED_ERR, "Session is closed",
() -> await(ses.executeAsync(null, "SELECT ID FROM TEST")));
- }
-
- @Test
- public void batch() {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- IgniteSql sql = CLUSTER_NODES.get(0).sql();
- Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT /
2).build();
-
- BatchedArguments args = BatchedArguments.of(0, 0);
-
- for (int i = 1; i < ROW_COUNT; ++i) {
- args.add(i, i);
- }
-
- long[] batchRes = await(ses.executeBatchAsync(null, "INSERT INTO TEST
VALUES (?, ?)", args));
-
- Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r));
-
- // Check that data are inserted OK
- List<List<Object>> res = sql("SELECT ID FROM TEST ORDER BY ID");
- IntStream.range(0, ROW_COUNT).forEach(i -> assertEquals(i,
res.get(i).get(0)));
+ @Override
+ protected ResultProcessor execute(Integer expectedPages, Transaction tx,
Session ses, String sql, Object... args) {
+ AsyncResultProcessor asyncProcessor = new
AsyncResultProcessor(expectedPages);
+ await(ses.executeAsync(tx, sql, args).thenCompose(asyncProcessor));
- // Check invalid query type
- assertThrowsSqlException(
- SqlBatchException.class,
- Sql.STMT_VALIDATION_ERR,
- "Invalid SQL statement type",
- () -> await(ses.executeBatchAsync(null, "SELECT * FROM TEST",
args)));
-
- assertThrowsSqlException(
- SqlBatchException.class,
- Sql.STMT_VALIDATION_ERR,
- "Invalid SQL statement type",
- () -> await(ses.executeBatchAsync(null, "CREATE TABLE TEST1(ID
INT PRIMARY KEY, VAL0 INT)", args)));
+ return asyncProcessor;
}
- @Test
- public void batchIncomplete() {
- int err = ROW_COUNT / 2;
-
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- IgniteSql sql = CLUSTER_NODES.get(0).sql();
- Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT /
2).build();
-
- BatchedArguments args = BatchedArguments.of(0, 0);
-
- for (int i = 1; i < ROW_COUNT; ++i) {
- if (i == err) {
- args.add(1, 1);
- } else {
- args.add(i, i);
- }
- }
-
- SqlBatchException ex = assertThrowsSqlException(
- SqlBatchException.class,
- Sql.CONSTRAINT_VIOLATION_ERR,
- "PK unique constraint is violated",
- () -> await(ses.executeBatchAsync(null, "INSERT INTO TEST
VALUES (?, ?)", args))
- );
-
- assertEquals(err, ex.updateCounters().length);
- IntStream.range(0, ex.updateCounters().length).forEach(i ->
assertEquals(1, ex.updateCounters()[i]));
+ @Override
+ protected void rollback(Transaction tx) {
+ await(tx.rollbackAsync());
}
- @Test
- public void resultSetCloseShouldFinishImplicitTransaction() throws
InterruptedException {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- IgniteSql sql = igniteSql();
- Session ses = sql.sessionBuilder().defaultPageSize(2).build();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
- }
-
- CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null,
"SELECT * FROM TEST");
-
- AsyncResultSet<SqlRow> ars = f.join();
- // There should be a pending transaction since not all data was read.
- boolean txStarted = waitForCondition(() -> txManager().pending() == 1,
5000);
- assertTrue(txStarted, "No pending transactions");
-
- ars.closeAsync().join();
- assertEquals(0, txManager().pending(), "Expected no pending
transactions");
+ @Override
+ protected void commit(Transaction tx) {
+ await(tx.commitAsync());
}
- @Test
- public void resultSetFullReadShouldFinishImplicitTransaction() {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- IgniteSql sql = igniteSql();
-
- // Fetch all data in one read.
- Session ses = sql.sessionBuilder().defaultPageSize(100).build();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
- }
+ @Override
+ protected void checkDml(int expectedAffectedRows, Transaction tx, Session
ses, String sql, Object... args) {
+ AsyncResultSet asyncRes = await(ses.executeAsync(tx, sql, args));
- CompletableFuture<AsyncResultSet<SqlRow>> f = ses.executeAsync(null,
"SELECT * FROM TEST");
+ assertFalse(asyncRes.wasApplied());
+ assertFalse(asyncRes.hasMorePages());
+ assertFalse(asyncRes.hasRowSet());
+ assertEquals(expectedAffectedRows, asyncRes.affectedRows());
- AsyncResultSet<SqlRow> ars = f.join();
- assertFalse(ars.hasMorePages());
+ assertNull(asyncRes.metadata());
- assertEquals(0, txManager().pending(), "Expected no pending
transactions");
+ await(asyncRes.closeAsync());
}
- private static void checkDdl(boolean expectedApplied, Session ses, String
sql, Transaction tx) {
+ @Override
+ protected void checkDdl(boolean expectedApplied, Session ses, String sql,
Transaction tx) {
CompletableFuture<AsyncResultSet<SqlRow>> fut = ses.executeAsync(
tx,
sql
@@ -837,93 +149,56 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
await(asyncRes.closeAsync());
}
- private static void checkDdl(boolean expectedApplied, Session ses, String
sql) {
- checkDdl(expectedApplied, ses, sql, null);
- }
-
- private static <T extends IgniteException> T checkError(Class<T> expCls,
Integer code, String msg, Session ses, String sql,
- Object... args) {
- return assertThrowsPublicException(() -> await(ses.executeAsync(null,
sql, args)), expCls, code, msg);
- }
-
- private static SqlException checkSqlError(
- int code,
- String msg,
- Session ses,
- String sql,
- Object... args
- ) {
- return assertThrowsSqlException(code, msg, () ->
await(ses.executeAsync(null, sql, args)));
- }
-
- protected static void checkDml(int expectedAffectedRows, Session ses,
String sql, Transaction tx, Object... args) {
- AsyncResultSet asyncRes = await(ses.executeAsync(tx, sql, args));
-
- assertFalse(asyncRes.wasApplied());
- assertFalse(asyncRes.hasMorePages());
- assertFalse(asyncRes.hasRowSet());
- assertEquals(expectedAffectedRows, asyncRes.affectedRows());
-
- assertNull(asyncRes.metadata());
-
- await(asyncRes.closeAsync());
- }
-
- private static void checkDml(int expectedAffectedRows, Session ses, String
sql, Object... args) {
- checkDml(expectedAffectedRows, ses, sql, null, args);
- }
-
- static <T extends IgniteException> T assertThrowsPublicException(
- RunnableX executable,
- Class<T> expCls,
- @Nullable Integer code,
- @Nullable String msgPart
- ) {
- T ex = assertThrows(expCls, executable::run);
-
- if (code != null) {
- assertEquals(new IgniteException(code).codeAsString(),
ex.codeAsString());
- }
-
- if (msgPart != null) {
- assertThat(ex.getMessage(), containsString(msgPart));
- }
-
- return ex;
- }
-
- static class TestPageProcessor implements
+ static class AsyncResultProcessor implements ResultProcessor,
Function<AsyncResultSet<SqlRow>,
CompletionStage<AsyncResultSet<SqlRow>>> {
- private int expectedPages;
+ private Integer expectedPages;
+ private long affectedRows;
private final List<SqlRow> res = new ArrayList<>();
- TestPageProcessor(int expectedPages) {
+ AsyncResultProcessor(Integer expectedPages) {
this.expectedPages = expectedPages;
}
@Override
public CompletionStage<AsyncResultSet<SqlRow>>
apply(AsyncResultSet<SqlRow> rs) {
- expectedPages--;
-
- assertTrue(rs.hasRowSet());
assertFalse(rs.wasApplied());
- assertEquals(-1L, rs.affectedRows());
- assertEquals(expectedPages > 0, rs.hasMorePages(),
- "hasMorePages(): [expected=" + (expectedPages > 0) + ",
actual=" + rs.hasMorePages() + ']');
+ //SELECT
+ if (rs.hasRowSet()) {
+ assertEquals(-1L, rs.affectedRows());
+
+ if (expectedPages != null) {
+ expectedPages--;
+ assertEquals(expectedPages > 0, rs.hasMorePages(),
+ "hasMorePages(): [expected=" + (expectedPages > 0)
+ ", actual=" + rs.hasMorePages() + ']');
+ }
- rs.currentPage().forEach(res::add);
+ rs.currentPage().forEach(res::add);
- if (rs.hasMorePages()) {
- return rs.fetchNextPage().thenCompose(this);
+ if (rs.hasMorePages()) {
+ return rs.fetchNextPage().thenCompose(this);
+ }
+ } else { //DML/DDL
+ affectedRows = rs.affectedRows();
+ assertNotEquals(-1L, affectedRows());
}
return rs.closeAsync().thenApply(v -> rs);
}
+ @Override
public List<SqlRow> result() {
//noinspection AssignmentOrReturnOfFieldWithMutableType
+ if (expectedPages != null) {
+ assertEquals(0, expectedPages, "Expected to be read more
pages");
+ }
+
return res;
}
+
+ @Override
+ public long affectedRows() {
+ return affectedRows;
+ }
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
index 356ce99c5d..425a76f56f 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
@@ -57,4 +57,10 @@ public class ItSqlClientAsynchronousApiTest extends
ItSqlAsynchronousApiTest {
public void closeSession() {
super.closeSession();
}
+
+ @Override
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-20598")
+ public void checkTransactionsWithDml() {
+ super.checkTransactionsWithDml();
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
index 671f0d3a1d..5bb9e4d9ae 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
@@ -21,11 +21,10 @@ import static
org.apache.ignite.internal.runner.app.client.ItAbstractThinClientT
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.sql.Session;
import org.apache.ignite.tx.IgniteTransactions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Disabled;
/**
* Tests for synchronous client SQL API.
@@ -56,19 +55,14 @@ public class ItSqlClientSynchronousApiTest extends
ItSqlSynchronousApiTest {
}
@Override
- @Test
- public void dml() {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- IgniteSql sql = igniteSql();
- Session ses = sql.createSession();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
- }
-
- checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1);
+ @Disabled("IGNITE-17134")
+ public void closeSession() {
+ super.closeSession();
+ }
- checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
+ @Override
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-20598")
+ public void checkTransactionsWithDml() {
+ super.checkTransactionsWithDml();
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index d616f75134..acc946bb53 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -17,539 +17,81 @@
package org.apache.ignite.internal.sql.api;
-import static
org.apache.ignite.internal.sql.api.ItSqlAsynchronousApiTest.assertThrowsPublicException;
-import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.util.Arrays;
-import java.util.HashSet;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
-import java.util.stream.IntStream;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest;
-import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.lang.ColumnAlreadyExistsException;
-import org.apache.ignite.lang.ColumnNotFoundException;
-import org.apache.ignite.lang.ErrorGroups;
-import org.apache.ignite.lang.ErrorGroups.Index;
-import org.apache.ignite.lang.ErrorGroups.Sql;
-import org.apache.ignite.lang.ErrorGroups.Transactions;
-import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IndexAlreadyExistsException;
-import org.apache.ignite.lang.IndexNotFoundException;
-import org.apache.ignite.lang.TableAlreadyExistsException;
-import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.sql.BatchedArguments;
-import org.apache.ignite.sql.CursorClosedException;
-import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.sql.NoRowSetExpectedException;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.SqlBatchException;
-import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.SqlRow;
-import org.apache.ignite.table.Table;
import org.apache.ignite.tx.Transaction;
-import org.apache.ignite.tx.TransactionOptions;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
/**
* Tests for synchronous SQL API.
*/
@SuppressWarnings("ThrowableNotThrown")
-public class ItSqlSynchronousApiTest extends ClusterPerClassIntegrationTest {
- private static final int ROW_COUNT = 16;
-
- @AfterEach
- public void dropTables() {
- for (Table t : CLUSTER_NODES.get(0).tables().tables()) {
- sql("DROP TABLE " + t.name());
- }
+public class ItSqlSynchronousApiTest extends ItSqlApiBaseTest {
+ @Override
+ protected ResultSet<SqlRow> executeForRead(Session ses, Transaction tx,
String query, Object... args) {
+ return ses.execute(tx, query, args);
}
- @Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-20096")
- public void ddl() throws Exception {
- IgniteSql sql = igniteSql();
- Session ses = sql.createSession();
-
- // CREATE TABLE
- checkDdl(true, ses, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
- checkError(
- TableAlreadyExistsException.class,
- ErrorGroups.Table.TABLE_ALREADY_EXISTS_ERR,
- "Table already exists [name=\"PUBLIC\".\"TEST\"]",
- ses,
- "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)"
- );
- checkSqlError(
- ErrorGroups.Table.TABLE_DEFINITION_ERR,
- "Can't create table with duplicate columns: ID, VAL, VAL",
- ses,
- "CREATE TABLE TEST1(ID INT PRIMARY KEY, VAL INT, VAL INT)"
- );
- checkDdl(false, ses, "CREATE TABLE IF NOT EXISTS TEST(ID INT PRIMARY
KEY, VAL VARCHAR)");
-
- // ADD COLUMN
- checkDdl(true, ses, "ALTER TABLE TEST ADD COLUMN VAL1 VARCHAR");
- checkError(
- TableNotFoundException.class,
- ErrorGroups.Table.TABLE_NOT_FOUND_ERR,
- "The table does not exist
[name=\"PUBLIC\".\"NOT_EXISTS_TABLE\"]",
- ses,
- "ALTER TABLE NOT_EXISTS_TABLE ADD COLUMN VAL1 VARCHAR"
- );
- checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE ADD
COLUMN VAL1 VARCHAR");
- checkError(
- ColumnAlreadyExistsException.class,
- ErrorGroups.Table.COLUMN_ALREADY_EXISTS_ERR,
- "Column already exists [name=\"VAL1\"]",
- ses,
- "ALTER TABLE TEST ADD COLUMN VAL1 INT"
- );
-
- // CREATE INDEX
- checkDdl(true, ses, "CREATE INDEX TEST_IDX ON TEST(VAL0)");
- checkError(
- IndexAlreadyExistsException.class,
- Index.INDEX_ALREADY_EXISTS_ERR,
- "Index already exists [name=\"PUBLIC\".\"TEST_IDX\"]",
- ses,
- "CREATE INDEX TEST_IDX ON TEST(VAL1)"
- );
- checkDdl(false, ses, "CREATE INDEX IF NOT EXISTS TEST_IDX ON
TEST(VAL1)");
-
- // TODO: IGNITE-19150 We are waiting for schema synchronization to
avoid races to create and destroy indexes
- waitForIndexBuild("TEST", "TEST_IDX");
-
- checkDdl(true, ses, "DROP INDEX TESt_iDX");
- checkDdl(true, ses, "CREATE INDEX TEST_IDX1 ON TEST(VAL0)");
- checkDdl(true, ses, "CREATE INDEX TEST_IDX2 ON TEST(VAL0)");
- checkDdl(true, ses, "CREATE INDEX TEST_IDX3 ON TEST(ID, VAL0, VAL1)");
- checkSqlError(
- Index.INVALID_INDEX_DEFINITION_ERR,
- "Can't create index on duplicate columns: VAL0, VAL0",
- ses,
- "CREATE INDEX TEST_IDX4 ON TEST(VAL0, VAL0)"
- );
-
- checkSqlError(
- Sql.STMT_VALIDATION_ERR,
- "Can`t delete column(s). Column VAL1 is used by indexes
[TEST_IDX3].",
- ses,
- "ALTER TABLE TEST DROP COLUMN val1"
- );
-
- SqlException ex = checkSqlError(
- Sql.STMT_VALIDATION_ERR,
- "Can`t delete column(s).",
- ses,
- "ALTER TABLE TEST DROP COLUMN (val0, val1)"
- );
-
- String msg = ex.getMessage();
- String explainMsg = "Unexpected error message: " + msg;
-
- assertTrue(msg.contains("Column VAL0 is used by indexes ["),
explainMsg);
- assertTrue(msg.contains("TEST_IDX1") && msg.contains("TEST_IDX2") &&
msg.contains("TEST_IDX3"), explainMsg);
- assertTrue(msg.contains("Column VAL1 is used by indexes [TEST_IDX3]"),
explainMsg);
-
- checkSqlError(
- Sql.STMT_VALIDATION_ERR,
- "Can`t delete column, belongs to primary key: [name=ID]",
- ses,
- "ALTER TABLE TEST DROP COLUMN id"
- );
-
- // TODO: IGNITE-19150 We are waiting for schema synchronization to
avoid races to create and destroy indexes
- waitForIndexBuild("TEST", "TEST_IDX3");
- checkDdl(true, ses, "DROP INDEX TESt_iDX3");
-
- // DROP COLUMNS
- checkDdl(true, ses, "ALTER TABLE TEST DROP COLUMN VAL1");
- checkError(
- TableNotFoundException.class,
- ErrorGroups.Table.TABLE_NOT_FOUND_ERR,
- "The table does not exist
[name=\"PUBLIC\".\"NOT_EXISTS_TABLE\"]",
- ses,
- "ALTER TABLE NOT_EXISTS_TABLE DROP COLUMN VAL1"
- );
- checkDdl(false, ses, "ALTER TABLE IF EXISTS NOT_EXISTS_TABLE DROP
COLUMN VAL1");
- checkError(
- ColumnNotFoundException.class,
- ErrorGroups.Table.COLUMN_NOT_FOUND_ERR,
- "Column does not exist [tableName=\"PUBLIC\".\"TEST\",
columnName=\"VAL1\"]",
- ses,
- "ALTER TABLE TEST DROP COLUMN VAL1"
- );
-
- // DROP TABLE
- checkDdl(false, ses, "DROP TABLE IF EXISTS NOT_EXISTS_TABLE");
-
- checkDdl(true, ses, "DROP TABLE TEST");
- checkError(
- TableNotFoundException.class,
- ErrorGroups.Table.TABLE_NOT_FOUND_ERR,
- "The table does not exist [name=\"PUBLIC\".\"TEST\"]",
- ses,
- "DROP TABLE TEST"
- );
-
- checkDdl(false, ses, "DROP INDEX IF EXISTS TEST_IDX");
-
- checkError(
- IndexNotFoundException.class,
- Index.INDEX_NOT_FOUND_ERR,
- "Index does not exist [name=\"PUBLIC\".\"TEST_IDX\"]", ses,
- "DROP INDEX TEST_IDX"
- );
- }
-
- @Test
- public void dml() {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- IgniteSql sql = igniteSql();
- Session ses = sql.createSession();
-
- TxManager txManagerInternal = txManager();
-
- int txPrevCnt = txManagerInternal.finished();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- checkDml(1, ses, "INSERT INTO TEST VALUES (?, ?)", i, i);
- }
-
- assertEquals(ROW_COUNT, txManagerInternal.finished() - txPrevCnt);
-
- assertEquals(0, txManagerInternal.pending());
-
- checkDml(ROW_COUNT, ses, "UPDATE TEST SET VAL0 = VAL0 + ?", 1);
-
- checkDml(ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
- }
-
- @SuppressWarnings("UnstableApiUsage")
- @Test
- public void select() throws Exception {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- IgniteSql sql = igniteSql();
- Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT /
4).build();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
- }
-
- ResultSet<SqlRow> rs = ses.execute(null, "SELECT ID FROM TEST");
-
- Set<Integer> set = new HashSet<>();
-
- rs.forEachRemaining(r -> set.add(r.intValue(0)));
-
- rs.close();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- assertTrue(set.remove(i), "Results invalid: " + rs);
- }
-
- assertTrue(set.isEmpty());
- }
-
- @Test
- public void errors() throws InterruptedException {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT NOT NULL)");
- IgniteSql sql = igniteSql();
- Session ses = sql.sessionBuilder().defaultPageSize(2).build();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
- }
-
- // Parse error.
- checkSqlError(Sql.STMT_PARSE_ERR, "Failed to parse query", ses,
"SELECT ID FROM");
-
- // Validation errors.
- checkSqlError(Sql.STMT_VALIDATION_ERR, "Column 'VAL0' does not allow
NULLs", ses,
- "INSERT INTO TEST VALUES (2, NULL)");
-
- checkSqlError(Sql.STMT_VALIDATION_ERR, "Object 'NOT_EXISTING_TABLE'
not found", ses,
- "SELECT * FROM NOT_EXISTING_TABLE");
-
- checkSqlError(Sql.STMT_VALIDATION_ERR, "Column 'NOT_EXISTING_COLUMN'
not found", ses,
- "SELECT NOT_EXISTING_COLUMN FROM TEST");
-
- checkSqlError(Sql.STMT_VALIDATION_ERR, "Multiple statements are not
allowed", ses, "SELECT 1; SELECT 2");
-
- checkSqlError(Sql.STMT_VALIDATION_ERR, "Table without PRIMARY KEY is
not supported", ses,
- "CREATE TABLE TEST2 (VAL INT)");
-
- // Execute error.
- checkSqlError(Sql.RUNTIME_ERR, "/ by zero", ses, "SELECT 1 / ?", 0);
- checkSqlError(Sql.RUNTIME_ERR, "/ by zero", ses, "UPDATE TEST SET val0
= val0/(val0 - ?) + " + ROW_COUNT, 0);
- checkSqlError(Sql.RUNTIME_ERR, "negative substring length not
allowed", ses, "SELECT SUBSTRING('foo', 1, -3)");
-
- // No result set error.
- {
- ResultSet rs = ses.execute(null, "CREATE TABLE TEST3 (ID INT
PRIMARY KEY)");
- assertThrowsSqlException(NoRowSetExpectedException.class,
Sql.QUERY_NO_RESULT_SET_ERR, "Query has no result set", rs::next);
- }
-
- // Cursor closed error.
- {
- ResultSet rs = ses.execute(null, "SELECT * FROM TEST");
- Thread.sleep(300); // ResultSetImpl fetches next page in
background, wait to it to complete to avoid flakiness.
- rs.close();
- assertThrowsSqlException(CursorClosedException.class,
Sql.CURSOR_CLOSED_ERR, "Cursor is closed",
- () -> rs.forEachRemaining(Object::hashCode));
- }
- }
-
- /**
- * DDL is non-transactional.
- */
- @Test
- public void ddlInTransaction() {
- Session ses = igniteSql().createSession();
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- {
- Transaction tx = igniteTx().begin();
- try {
- assertThrowsSqlException(
- Sql.STMT_VALIDATION_ERR,
- "DDL doesn't support transactions.",
- () -> ses.execute(tx, "CREATE TABLE TEST2(ID INT
PRIMARY KEY, VAL0 INT)")
- );
- } finally {
- tx.rollback();
- }
- }
- {
- Transaction tx = igniteTx().begin();
- ResultSet<SqlRow> res = ses.execute(tx, "INSERT INTO TEST VALUES
(?, ?)", -1, -1);
- assertEquals(1, res.affectedRows());
-
- assertThrowsSqlException(
- Sql.STMT_VALIDATION_ERR,
- "DDL doesn't support transactions.",
- () -> ses.execute(tx, "CREATE TABLE TEST2(ID INT PRIMARY
KEY, VAL0 INT)")
- );
- tx.commit();
-
- assertTrue(ses.execute(null, "SELECT ID FROM TEST WHERE ID =
-1").hasNext());
- }
-
- assertEquals(0, ((IgniteImpl)
CLUSTER_NODES.get(0)).txManager().pending());
+ @Override
+ protected long[] executeBatch(Session ses, String sql, BatchedArguments
args) {
+ return ses.executeBatch(null, sql, args);
}
- @ParameterizedTest
- @ValueSource(strings = {
- "INSERT INTO tst VALUES (2, ?)",
- "SELECT * FROM tst WHERE id = ? "
- })
- public void runtimeErrorInDmlCausesTransactionToFail(String query) {
- sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)");
-
- sql("INSERT INTO tst VALUES (?,?)", 1, 1);
- try (Session ses = igniteSql().createSession()) {
- Transaction tx = igniteTx().begin();
- String dmlQuery = "UPDATE tst SET val = val/(val - ?) + 1";
+ @Override
+ protected ResultProcessor execute(Integer expectedPages, Transaction tx,
Session ses, String sql, Object... args) {
+ SyncPageProcessor syncProcessor = new SyncPageProcessor();
- assertThrowsSqlException(
- Sql.RUNTIME_ERR,
- "/ by zero",
- () -> ses.execute(tx, dmlQuery, 1).affectedRows());
+ ResultSet<SqlRow> rs = ses.execute(tx, sql, args);
+ syncProcessor.process(rs);
- IgniteException err = assertThrows(IgniteException.class, () -> {
- ResultSet<SqlRow> rs = ses.execute(tx, query, 2);
- if (rs.hasRowSet()) {
- assertTrue(rs.hasNext());
- } else {
- assertTrue(rs.wasApplied());
- }
- });
-
- assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR,
err.code(), err.toString());
- }
+ return syncProcessor;
}
- @ParameterizedTest
- @ValueSource(strings = {
- "INSERT INTO tst VALUES (2, ?)",
- "SELECT * FROM tst WHERE id = ? "
- })
- public void runtimeErrorInQueryCausesTransactionToFail(String query) {
- sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)");
-
- sql("INSERT INTO tst VALUES (?,?)", 1, 1);
-
- try (Session ses = igniteSql().createSession()) {
- Transaction tx = igniteTx().begin();
-
- assertThrowsSqlException(
- Sql.RUNTIME_ERR,
- "/ by zero",
- () -> ses.execute(tx, "SELECT val/? FROM tst WHERE id=?",
0, 1).next());
+ protected ResultProcessor execute(int expectedPages, Transaction tx,
Session ses, String sql, Object... args) {
+ SyncPageProcessor syncProcessor = new SyncPageProcessor();
- IgniteException err = assertThrows(IgniteException.class, () -> {
- ResultSet<SqlRow> rs = ses.execute(tx, query, 2);
- if (rs.hasRowSet()) {
- assertTrue(rs.hasNext());
- } else {
- assertTrue(rs.wasApplied());
- }
- });
+ ResultSet<SqlRow> rs = ses.execute(tx, sql, args);
+ syncProcessor.process(rs);
- assertEquals(Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR,
err.code(), err.toString());
- }
+ return syncProcessor;
}
- @Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-20534")
- public void testLockIsNotReleasedAfterTxRollback() {
- Ignite ignite = CLUSTER_NODES.get(0);
- IgniteSql sql = ignite.sql();
-
- try (Session ses = ignite.sql().createSession()) {
- ses.execute(null, "CREATE TABLE IF NOT EXISTS tst(id INTEGER
PRIMARY KEY, val INTEGER)").affectedRows();
- }
-
- try (Session session = sql.createSession()) {
- Transaction tx = ignite.transactions().begin();
-
- assertThrows(RuntimeException.class, () -> session.execute(tx,
"SELECT 1/0"));
-
- tx.rollback();
-
- session.execute(tx, "INSERT INTO tst VALUES (1, 1)");
- }
-
- try (Session session = sql.createSession()) {
- Transaction tx = ignite.transactions().begin(new
TransactionOptions().readOnly(false));
-
- session.execute(tx, "INSERT INTO tst VALUES (1, 1)");
-
- tx.commit();
- }
+ @Override
+ protected void rollback(Transaction tx) {
+ tx.rollback();
}
- @Test
- public void batch() {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- IgniteSql sql = CLUSTER_NODES.get(0).sql();
- Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT /
2).build();
-
- BatchedArguments args = BatchedArguments.of(0, 0);
-
- for (int i = 1; i < ROW_COUNT; ++i) {
- args.add(i, i);
- }
-
- long[] batchRes = ses.executeBatch(null, "INSERT INTO TEST VALUES (?,
?)", args);
-
- Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r));
-
- // Check that data are inserted OK
- List<List<Object>> res = sql("SELECT ID FROM TEST ORDER BY ID");
- IntStream.range(0, ROW_COUNT).forEach(i -> assertEquals(i,
res.get(i).get(0)));
-
- // Check invalid query type
- assertThrowsSqlException(
- SqlBatchException.class,
- Sql.STMT_VALIDATION_ERR,
- "Invalid SQL statement type",
- () -> ses.executeBatch(null, "SELECT * FROM TEST", args)
- );
-
- assertThrowsSqlException(
- SqlBatchException.class,
- Sql.STMT_VALIDATION_ERR,
- "Invalid SQL statement type",
- () -> ses.executeBatch(null, "CREATE TABLE TEST1(ID INT
PRIMARY KEY, VAL0 INT)", args)
- );
+ @Override
+ protected void commit(Transaction tx) {
+ tx.commit();
}
- @Test
- public void batchIncomplete() {
- int err = ROW_COUNT / 2;
-
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- IgniteSql sql = CLUSTER_NODES.get(0).sql();
- Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT /
2).build();
-
- BatchedArguments args = BatchedArguments.of(0, 0);
-
- for (int i = 1; i < ROW_COUNT; ++i) {
- if (i == err) {
- args.add(1, 1);
- } else {
- args.add(i, i);
- }
- }
-
- SqlBatchException batchEx = assertThrows(
- SqlBatchException.class,
- () -> ses.executeBatch(null, "INSERT INTO TEST VALUES (?, ?)",
args)
+ @Override
+ protected void checkDml(int expectedAffectedRows, Transaction tx, Session
ses, String sql, Object... args) {
+ ResultSet res = ses.execute(
+ tx,
+ sql,
+ args
);
- assertEquals(Sql.CONSTRAINT_VIOLATION_ERR, batchEx.code());
- assertEquals(err, batchEx.updateCounters().length);
- IntStream.range(0, batchEx.updateCounters().length).forEach(i ->
assertEquals(1, batchEx.updateCounters()[i]));
- }
-
- @Test
- public void resultSetCloseShouldFinishImplicitTransaction() {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- IgniteSql sql = igniteSql();
- Session ses = sql.sessionBuilder().defaultPageSize(2).build();
-
- for (int i = 0; i < ROW_COUNT; ++i) {
- ses.execute(null, "INSERT INTO TEST VALUES (?, ?)", i, i);
- }
-
- ResultSet<?> rs = ses.execute(null, "SELECT * FROM TEST");
- assertEquals(1, txManager().pending());
- rs.close();
- assertEquals(0, txManager().pending(), "Expected no pending
transactions");
- }
-
- @Test
- public void resultSetFullReadShouldFinishImplicitTransaction() {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
- for (int i = 0; i < ROW_COUNT; ++i) {
- sql("INSERT INTO TEST VALUES (?, ?)", i, i);
- }
-
- IgniteSql sql = igniteSql();
-
- // Fetch all data in one read.
- Session ses = sql.sessionBuilder().defaultPageSize(100).build();
- ResultSet<SqlRow> rs = ses.execute(null, "SELECT * FROM TEST");
-
- while (rs.hasNext()) {
- rs.next();
- }
+ assertFalse(res.wasApplied());
+ assertFalse(res.hasRowSet());
+ assertEquals(expectedAffectedRows, res.affectedRows());
- assertEquals(0, txManager().pending(), "Expected no pending
transactions");
+ res.close();
}
- private static void checkDdl(boolean expectedApplied, Session ses, String
sql) {
+ @Override
+ protected void checkDdl(boolean expectedApplied, Session ses, String sql,
Transaction tx) {
ResultSet res = ses.execute(
- null,
+ tx,
sql
);
@@ -560,32 +102,24 @@ public class ItSqlSynchronousApiTest extends
ClusterPerClassIntegrationTest {
res.close();
}
- private static <T extends IgniteException> T checkError(Class<T> expCls,
Integer code, String msg, Session ses, String sql,
- Object... args) {
- return assertThrowsPublicException(() -> ses.execute(null, sql, args),
expCls, code, msg);
- }
+ static class SyncPageProcessor implements ResultProcessor {
+ private final List<SqlRow> res = new ArrayList<>();
+ private long affectedRows;
- private static SqlException checkSqlError(
- int code,
- String msg,
- Session ses,
- String sql,
- Object... args
- ) {
- return assertThrowsSqlException(code, msg, () -> ses.execute(null,
sql, args));
- }
-
- static void checkDml(int expectedAffectedRows, Session ses, String sql,
Object... args) {
- ResultSet res = ses.execute(
- null,
- sql,
- args
- );
+ @Override
+ public List<SqlRow> result() {
+ //noinspection AssignmentOrReturnOfFieldWithMutableType
+ return res;
+ }
- assertFalse(res.wasApplied());
- assertFalse(res.hasRowSet());
- assertEquals(expectedAffectedRows, res.affectedRows());
+ @Override
+ public long affectedRows() {
+ return affectedRows;
+ }
- res.close();
+ public void process(ResultSet<SqlRow> resultSet) {
+ affectedRows = resultSet.affectedRows();
+ resultSet.forEachRemaining(res::add);
+ }
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
index ac9e206455..f46465ea17 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
@@ -30,7 +30,6 @@ import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.lang.ErrorGroups.Sql;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -119,17 +118,16 @@ public class ItCreateTableDdlTest extends
ClusterPerClassIntegrationTest {
* Check invalid colocation columns configuration: - not PK columns; -
duplicates colocation columns.
*/
@Test
- @Disabled("IGNITE-20149")
public void invalidColocationColumns() {
assertThrowsSqlException(
Sql.STMT_VALIDATION_ERR,
- "Colocation columns must be subset of primary key",
+ "Failed to validate query. Colocation column 'VAL' is not part
of PK",
() -> sql("CREATE TABLE T0(ID0 INT, ID1 INT, VAL INT, PRIMARY
KEY (ID1, ID0)) COLOCATE (ID0, VAL)")
);
assertThrowsSqlException(
Sql.STMT_VALIDATION_ERR,
- "Colocation columns contains duplicates: [duplicates=[ID1]]]",
+ "Failed to validate query. Colocation column 'ID1' specified
more that once",
() -> sql("CREATE TABLE T0(ID0 INT, ID1 INT, VAL INT, PRIMARY
KEY (ID1, ID0)) COLOCATE (ID1, ID0, ID1)")
);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtil.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtil.java
new file mode 100644
index 0000000000..a36119ccca
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtil.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.lang;
+
+import static
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.mapToPublicException;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+
+import org.apache.ignite.lang.ErrorGroups.Common;
+import org.apache.ignite.lang.TraceableException;
+import org.apache.ignite.sql.SqlException;
+
+/**
+ * This utility class provides an ability to map Ignite internal exceptions to
public SqlException.
+ */
+public class SqlExceptionMapperUtil {
+
+ /**
+ * This method provides a mapping from internal exception to SQL public
ones.
+ *
+ * <p>The rules of mapping are the following:</p>
+ * <ul>
+ * <li>any instance of {@link Error} is returned as is, except {@link
AssertionError}
+ * that will always be mapped to {@link SqlException} with the {@link
Common#INTERNAL_ERR} error code.</li>
+ * <li>any instance of {@link TraceableException} is wrapped into
{@link SqlException}
+ * with the original {@link TraceableException#traceId() traceUd}
and {@link TraceableException#code() code}.</li>
+ * <li>if there are no any mappers that can do a mapping from the
given error to a public exception,
+ * then {@link SqlException} with the {@link Common#INTERNAL_ERR}
error code is returned.</li>
+ * </ul>
+ *
+ * @param origin Exception to be mapped.
+ * @return Public exception.
+ */
+ public static Throwable mapToPublicSqlException(Throwable origin) {
+ Throwable e = mapToPublicException(origin);
+ if (e instanceof Error) {
+ return e;
+ }
+ if (e instanceof SqlException) {
+ return e;
+ }
+
+ if (e instanceof TraceableException) {
+ TraceableException traceable = (TraceableException) e;
+ return new SqlException(traceable.traceId(), traceable.code(),
e.getMessage(), e);
+ }
+
+ return new SqlException(INTERNAL_ERR, origin);
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
index 6810ec8683..f947e6a7af 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.sql.api;
-import static
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.mapToPublicException;
+import static
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_CLOSED_ERR;
@@ -192,7 +192,7 @@ public class SessionImpl implements AbstractSession {
)
);
} catch (Exception e) {
- return CompletableFuture.failedFuture(mapToPublicException(e));
+ return CompletableFuture.failedFuture(mapToPublicSqlException(e));
} finally {
busyLock.leaveBusy();
}
@@ -205,7 +205,7 @@ public class SessionImpl implements AbstractSession {
closeInternal();
}
- throw new CompletionException(mapToPublicException(cause));
+ throw new CompletionException(mapToPublicSqlException(cause));
});
}
@@ -282,7 +282,7 @@ public class SessionImpl implements AbstractSession {
throw (CancellationException) cause;
}
- Throwable t = mapToPublicException(cause);
+ Throwable t = mapToPublicSqlException(cause);
if (t instanceof TraceableException) {
throw new SqlBatchException(
@@ -305,7 +305,7 @@ public class SessionImpl implements AbstractSession {
return resFut;
} catch (Exception e) {
- return CompletableFuture.failedFuture(mapToPublicException(e));
+ return CompletableFuture.failedFuture(mapToPublicSqlException(e));
} finally {
busyLock.leaveBusy();
}
@@ -367,7 +367,7 @@ public class SessionImpl implements AbstractSession {
return qryProc.closeSession(sessionId);
} catch (Exception e) {
- return CompletableFuture.failedFuture(mapToPublicException(e));
+ return CompletableFuture.failedFuture(mapToPublicSqlException(e));
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
index eaffd75867..b81c88f74c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.sql.engine;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
+import org.apache.ignite.internal.lang.SqlExceptionMapperUtil;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.sql.ResultSetMetadata;
@@ -98,6 +98,6 @@ public class AsyncSqlCursorImpl<T> implements
AsyncSqlCursor<T> {
private static Throwable wrapIfNecessary(Throwable t) {
Throwable err = ExceptionUtils.unwrapCause(t);
- return IgniteExceptionMapperUtil.mapToPublicException(err);
+ return SqlExceptionMapperUtil.mapToPublicSqlException(err);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index f82b8807d5..6bbd0a352c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -40,7 +40,7 @@ import org.apache.calcite.sql.SqlExplain;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
-import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
+import org.apache.ignite.internal.lang.SqlExceptionMapperUtil;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metrics.MetricManager;
@@ -192,7 +192,7 @@ public class PrepareServiceImpl implements PrepareService {
"Planning of a query aborted due to planner
timeout threshold is reached");
}
- throw new
CompletionException(IgniteExceptionMapperUtil.mapToPublicException(th));
+ throw new
CompletionException(SqlExceptionMapperUtil.mapToPublicSqlException(th));
}
);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtilTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtilTest.java
new file mode 100644
index 0000000000..807909ecdc
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/lang/SqlExceptionMapperUtilTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.lang;
+
+import static
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+import org.apache.ignite.sql.NoRowSetExpectedException;
+import org.apache.ignite.sql.SqlException;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests mapping internal exceptions to public SqlException.
+ */
+class SqlExceptionMapperUtilTest {
+ /**
+ * Tests a default mapping of internal exceptions passed from the sql
engine.
+ */
+ @Test
+ public void testSqlInternalExceptionDefaultMapping() {
+ CustomNoMappingException internalSqlErr = new
CustomNoMappingException(EXECUTION_CANCELLED_ERR);
+ Throwable mappedErr = mapToPublicSqlException(internalSqlErr);
+
+ assertThat(mappedErr, instanceOf(SqlException.class));
+
+ SqlException mappedSqlErr = (SqlException) mappedErr;
+
+ assertThat("Mapped exception should have the same trace identifier.",
mappedSqlErr.traceId(), is(internalSqlErr.traceId()));
+ assertThat("Mapped exception shouldn't have the same error code.",
mappedSqlErr.code(), is(INTERNAL_ERR));
+ }
+
+ /**
+ * Tests a default mapping of internal exceptions passed from the sql
engine.
+ */
+ @Test
+ public void testSqlInternalExceptionDefaultMappingForSqlException() {
+ NoRowSetExpectedException sqlErr = new NoRowSetExpectedException();
+
+ Throwable mappedErr = mapToPublicSqlException(sqlErr);
+
+ assertSame(sqlErr, mappedErr);
+ }
+
+ /**
+ * Test exception.
+ */
+ public static class CustomNoMappingException extends
IgniteInternalException {
+ /** Serial version UID. */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Creates a new instance of CustomNoMappingException with given code.
+ */
+ public CustomNoMappingException(int code) {
+ super(code, "Test internal exception [err=no mapping]");
+ }
+ }
+}
diff --git
a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
index acb0e4f73d..7c7f1e71a6 100644
---
a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
+++
b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/SqlTestUtils.java
@@ -34,9 +34,12 @@ import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.BitSet;
+import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.sql.engine.type.UuidType;
@@ -91,6 +94,11 @@ public class SqlTestUtils {
return ex;
}
+ public static <T> Stream<T> asStream(Iterator<T> sourceIterator) {
+ Iterable<T> iterable = () -> sourceIterator;
+ return StreamSupport.stream(iterable.spliterator(), false);
+ }
+
/**
* Convert {@link ColumnType} to string representation of SQL type.
*