This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 89eb752c99 IGNITE-19493 Sql. Change query execution flow - Fixes #2162.
89eb752c99 is described below
commit 89eb752c9981f0880de70249329637196558bb46
Author: zstan <[email protected]>
AuthorDate: Thu Jun 8 10:56:07 2023 +0300
IGNITE-19493 Sql. Change query execution flow - Fixes #2162.
Signed-off-by: zstan <[email protected]>
---
.../ignite/lang/SchemaNotFoundException.java | 49 ++++++++++
.../ignite/internal/sql/api/ItCommonApiTest.java | 100 +++++++++++++++++++++
.../types/timestamp/test_incorrect_timestamp.test | 1 -
.../internal/sql/engine/SqlQueryProcessor.java | 37 +++++---
.../prepare/ddl/DdlSqlToCommandConverter.java | 4 +-
.../internal/sql/engine/schema/IgniteSchema.java | 2 +-
.../sql/engine/schema/SqlSchemaManagerImpl.java | 6 +-
.../engine/framework/PredefinedSchemaManager.java | 2 +-
8 files changed, 179 insertions(+), 22 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/lang/SchemaNotFoundException.java
b/modules/api/src/main/java/org/apache/ignite/lang/SchemaNotFoundException.java
new file mode 100644
index 0000000000..559bb091d6
--- /dev/null
+++
b/modules/api/src/main/java/org/apache/ignite/lang/SchemaNotFoundException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import static org.apache.ignite.lang.ErrorGroups.Sql.SCHEMA_NOT_FOUND_ERR;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
+
+import java.util.UUID;
+
+/**
+ * Exception is thrown when a specified schema cannot be found.
+ */
+public class SchemaNotFoundException extends IgniteException {
+ /**
+ * Creates an exception with the given schema name.
+ *
+ * @param schemaName Schema name.
+ */
+ public SchemaNotFoundException(String schemaName) {
+ super(SCHEMA_NOT_FOUND_ERR, format("Schema not found [schemaName={}]",
schemaName));
+ }
+
+ /**
+ * Creates an exception with the given trace ID, error code, detailed
message, and cause.
+ *
+ * @param traceId Unique identifier of the exception.
+ * @param code Full error code.
+ * @param message Detailed message.
+ * @param cause Optional nested exception (can be {@code null}).
+ */
+ public SchemaNotFoundException(UUID traceId, int code, String message,
Throwable cause) {
+ super(traceId, code, message, cause);
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
index 1b535794d0..326177814c 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
@@ -23,11 +23,14 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.lang.ErrorGroups.Sql;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.time.Instant;
import java.time.LocalDateTime;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.calcite.schema.SchemaPlus;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.app.IgniteImpl;
import
org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter;
@@ -38,7 +41,12 @@ import
org.apache.ignite.internal.schema.testutils.definition.TableDefinition;
import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.Session;
@@ -46,6 +54,8 @@ import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
/** Test common SQL API. */
@@ -168,4 +178,94 @@ public class ItCommonApiTest extends
ClusterPerClassIntegrationTest {
assertEquals(ins, res.next().timestampValue(3));
}
}
+
+ /** Check transaction change status with erroneous statements. */
+ @Test
+ public void testTxStateChangedOnErroneousOp() {
+ sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+ // TODO: need to be refactored after
https://issues.apache.org/jira/browse/IGNITE-19663
+ TxManager txManagerInternal =
+ (TxManager)
IgniteTestUtils.getFieldValue(CLUSTER_NODES.get(0), IgniteImpl.class,
"txManager");
+
+ SqlQueryProcessor queryProc =
+ (SqlQueryProcessor)
IgniteTestUtils.getFieldValue(CLUSTER_NODES.get(0), IgniteImpl.class,
"qryEngine");
+
+ SqlSchemaManager oldManager =
+ (SqlSchemaManager) IgniteTestUtils.getFieldValue(queryProc,
SqlQueryProcessor.class, "sqlSchemaManager");
+
+ int txPrevCnt = txManagerInternal.finished();
+
+ Transaction tx = CLUSTER_NODES.get(0).transactions().begin();
+
+ try {
+ sql(tx, "INSERT INTO PUBLIC.TEST VALUES(1, 1)");
+ sql(tx, "INSERT INTO NOTEXIST.TEST VALUES(1, 1)");
+ } catch (Throwable ignore) {
+ // No op.
+ }
+
+ assertEquals(0, txManagerInternal.finished() - txPrevCnt);
+ InternalTransaction tx0 = (InternalTransaction) tx;
+ assertNull(tx0.state());
+
+ tx.rollback();
+ assertEquals(1, txManagerInternal.finished() - txPrevCnt);
+
+ sql("INSERT INTO TEST VALUES(1, 1)");
+ assertEquals(2, txManagerInternal.finished() - txPrevCnt);
+
+ var schemaManager = new ErroneousSchemaManager();
+
+ // TODO: refactor after
https://issues.apache.org/jira/browse/IGNITE-17694
+ IgniteTestUtils.setFieldValue(queryProc, "sqlSchemaManager",
schemaManager);
+
+ try {
+ sql("SELECT a FROM NOTEXIST.TEST");
+ } catch (Throwable ignore) {
+ // No op.
+ }
+
+ try {
+ sql("INSERT INTO NOTEXIST.TEST VALUES(1, 1)");
+ } catch (Throwable ignore) {
+ // No op.
+ }
+
+ assertEquals(2, txManagerInternal.finished() - txPrevCnt);
+
+ IgniteTestUtils.setFieldValue(queryProc, "sqlSchemaManager",
oldManager);
+ }
+
+ private static class ErroneousSchemaManager implements SqlSchemaManager {
+ /** {@inheritDoc} */
+ @Override
+ public SchemaPlus schema(@Nullable String schema) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SchemaPlus schema(@Nullable String name, int version) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteTable tableById(int id) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<SchemaPlus> actualSchemaAsync(long ver) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SchemaPlus activeSchema(@Nullable String name, long timestamp) {
+ throw new UnsupportedOperationException();
+ }
+ }
}
diff --git
a/modules/runner/src/integrationTest/sql/types/timestamp/test_incorrect_timestamp.test
b/modules/runner/src/integrationTest/sql/types/timestamp/test_incorrect_timestamp.test
index 8901fc6e3b..cc64d410c1 100644
---
a/modules/runner/src/integrationTest/sql/types/timestamp/test_incorrect_timestamp.test
+++
b/modules/runner/src/integrationTest/sql/types/timestamp/test_incorrect_timestamp.test
@@ -1,7 +1,6 @@
# name: test/sql/types/timestamp/test_incorrect_timestamp.test
# description: Test out of range/incorrect timestamp formats
# group: [timestamp]
-# Ignore https://issues.apache.org/jira/browse/IGNITE-15623
statement ok
PRAGMA enable_verification
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 526c472b6c..64d1ffaf0c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine;
import static
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
import static org.apache.ignite.lang.ErrorGroups.Sql.OPERATION_INTERRUPTED_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_INVALID_ERR;
-import static org.apache.ignite.lang.ErrorGroups.Sql.SCHEMA_NOT_FOUND_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_EXPIRED_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_NOT_FOUND_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Sql.UNSUPPORTED_DDL_OPERATION_ERR;
@@ -35,6 +34,7 @@ import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import java.util.function.Supplier;
@@ -95,6 +95,7 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.lang.SchemaNotFoundException;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.sql.SqlException;
import org.jetbrains.annotations.NotNull;
@@ -386,13 +387,6 @@ public class SqlQueryProcessor implements QueryProcessor {
String schemaName =
session.properties().get(QueryProperty.DEFAULT_SCHEMA);
- SchemaPlus schema = sqlSchemaManager.schema(schemaName);
-
- if (schema == null) {
- return CompletableFuture.failedFuture(
- new IgniteInternalException(SCHEMA_NOT_FOUND_ERR,
format("Schema not found [schemaName={}]", schemaName)));
- }
-
InternalTransaction outerTx =
context.unwrap(InternalTransaction.class);
QueryCancel queryCancel = new QueryCancel();
@@ -413,6 +407,8 @@ public class SqlQueryProcessor implements QueryProcessor {
CompletableFuture<Void> start = new CompletableFuture<>();
+ AtomicReference<InternalTransaction> tx = new AtomicReference<>();
+
CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start
.thenApply(v -> {
StatementParseResult parseResult =
IgniteSqlParser.parse(sql, StatementParseResult.MODE);
@@ -425,6 +421,16 @@ public class SqlQueryProcessor implements QueryProcessor {
.thenCompose(sqlNode -> {
boolean rwOp = dataModificationOp(sqlNode);
+ boolean implicitTxRequired = outerTx == null;
+
+ tx.set(implicitTxRequired ? txManager.begin(!rwOp) :
outerTx);
+
+ SchemaPlus schema = sqlSchemaManager.schema(schemaName);
+
+ if (schema == null) {
+ return CompletableFuture.failedFuture(new
SchemaNotFoundException(schemaName));
+ }
+
BaseQueryContext ctx = BaseQueryContext.builder()
.frameworkConfig(
Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
@@ -439,11 +445,7 @@ public class SqlQueryProcessor implements QueryProcessor {
return prepareSvc.prepareAsync(sqlNode, ctx)
.thenApply(plan -> {
- boolean implicitTxRequired = outerTx == null;
-
- InternalTransaction tx = implicitTxRequired ?
txManager.begin(!rwOp) : outerTx;
-
- var dataCursor = executionSrvc.executePlan(tx,
plan, ctx);
+ var dataCursor =
executionSrvc.executePlan(tx.get(), plan, ctx);
SqlQueryType queryType = plan.type();
assert queryType != null : "Expected a full
plan but got a fragment: " + plan;
@@ -451,7 +453,7 @@ public class SqlQueryProcessor implements QueryProcessor {
return new AsyncSqlCursorImpl<>(
queryType,
plan.metadata(),
- implicitTxRequired ? tx : null,
+ implicitTxRequired ? tx.get() : null,
new AsyncCursor<List<Object>>() {
@Override
public
CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) {
@@ -475,6 +477,13 @@ public class SqlQueryProcessor implements QueryProcessor {
if (ex instanceof CancellationException) {
queryCancel.cancel();
}
+
+ if (ex != null && outerTx == null) {
+ InternalTransaction tx0 = tx.get();
+ if (tx0 != null) {
+ tx0.rollback();
+ }
+ }
});
start.completeAsync(() -> null, taskExecutor);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java
index 2447f950d7..9bc1b486fb 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/ddl/DdlSqlToCommandConverter.java
@@ -32,7 +32,6 @@ import static
org.apache.ignite.lang.ErrorGroups.Sql.PRIMARY_KEYS_MULTIPLE_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.PRIMARY_KEY_MISSING_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_INVALID_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_VALIDATION_ERR;
-import static org.apache.ignite.lang.ErrorGroups.Sql.SCHEMA_NOT_FOUND_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.SQL_TO_REL_CONVERSION_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Sql.STORAGE_ENGINE_NOT_VALID_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Sql.UNSUPPORTED_DDL_OPERATION_ERR;
@@ -96,6 +95,7 @@ import
org.apache.ignite.internal.sql.engine.sql.IgniteSqlIndexType;
import org.apache.ignite.internal.sql.engine.sql.IgniteSqlZoneOption;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.SchemaNotFoundException;
import org.apache.ignite.sql.ColumnType;
import org.apache.ignite.sql.SqlException;
import org.jetbrains.annotations.Nullable;
@@ -709,7 +709,7 @@ public class DdlSqlToCommandConverter {
private void ensureSchemaExists(PlanningContext ctx, String schemaName) {
if (ctx.catalogReader().getRootSchema().getSubSchema(schemaName, true)
== null) {
- throw new SqlException(SCHEMA_NOT_FOUND_ERR, "Schema with name " +
schemaName + " not found");
+ throw new SchemaNotFoundException(schemaName);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
index 9aa3ef3b44..27320aa2a8 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
@@ -28,7 +28,7 @@ import org.jetbrains.annotations.Nullable;
* Ignite schema.
*/
public class IgniteSchema extends AbstractSchema {
- public static final long INITIAL_VERSION = -1;
+ static final long INITIAL_VERSION = -1;
private final String schemaName;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index 816f0cbc3b..b7c32e5c74 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -149,11 +149,11 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
/** {@inheritDoc} */
@Override
public SchemaPlus schema(@Nullable String schema) {
- SchemaPlus schemaPlus = calciteSchemaVv.latest();
-
// stub for waiting pk indexes, more clear place is IgniteSchema
CompletableFuture.allOf(pkIdxReady.values().toArray(CompletableFuture[]::new)).join();
+ SchemaPlus schemaPlus = calciteSchemaVv.latest();
+
return schema != null ? schemaPlus.getSubSchema(schema) :
schemaPlus.getSubSchema(DEFAULT_SCHEMA_NAME);
}
@@ -171,7 +171,7 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
}
try {
if (ver == IgniteSchema.INITIAL_VERSION) {
- return completedFuture(null);
+ return completedFuture(calciteSchemaVv.latest());
}
CompletableFuture<SchemaPlus> lastSchemaFut;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
index 73ddc9098b..c98b745510 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
@@ -81,7 +81,7 @@ public class PredefinedSchemaManager implements
SqlSchemaManager {
/** {@inheritDoc} */
@Override
public CompletableFuture<?> actualSchemaAsync(long ver) {
- return CompletableFuture.completedFuture(null);
+ return CompletableFuture.completedFuture(root);
}
/** {@inheritDoc} */