This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0a0be8d223 IGNITE-18878 Sql. Introduce schema synchronisation for
query processing, naive fix - Fixes #1754.
0a0be8d223 is described below
commit 0a0be8d2236f42a95f8ba2c593ab14a0bcdb169c
Author: zstan <[email protected]>
AuthorDate: Thu Mar 16 15:08:39 2023 +0300
IGNITE-18878 Sql. Introduce schema synchronisation for query processing,
naive fix - Fixes #1754.
Signed-off-by: zstan <[email protected]>
---
.../ignite/internal/causality/VersionedValue.java | 2 +-
.../java/org/apache/ignite/lang/ErrorGroups.java | 3 -
.../apache/ignite/internal/index/IndexManager.java | 8 +-
.../internal/index/event/IndexEventParameters.java | 2 +-
.../internal/runner/app/ItDataSchemaSyncTest.java | 47 ++++++-
.../runner/app/ItIgniteNodeRestartTest.java | 66 +++++++++-
.../internal/sql/engine/ItFunctionsTest.java | 2 +-
.../internal/sql/engine/ItMixedQueriesTest.java | 2 -
.../internal/sql/engine/SqlQueryProcessor.java | 4 +-
.../sql/engine/exec/ExecutionServiceImpl.java | 94 +++++++++-----
.../sql/engine/externalize/RelInputEx.java | 8 --
.../sql/engine/externalize/RelJsonReader.java | 42 ++-----
.../sql/engine/message/QueryStartRequest.java | 5 +
.../internal/sql/engine/rel/IgniteTableModify.java | 8 +-
.../engine/rel/ProjectableFilterableTableScan.java | 14 +--
.../internal/sql/engine/schema/IgniteSchema.java | 28 ++++-
.../sql/engine/schema/SqlSchemaManager.java | 12 +-
.../sql/engine/schema/SqlSchemaManagerImpl.java | 92 ++++++++------
.../internal/sql/engine/util/BaseQueryContext.java | 18 ++-
.../sql/engine/util/HashFunctionFactoryImpl.java | 2 +-
.../internal/sql/engine/StopCalciteModuleTest.java | 18 ++-
.../sql/engine/exec/ExecutionServiceImplTest.java | 30 +++--
.../engine/exec/schema/SqlSchemaManagerTest.java | 140 +++------------------
.../sql/engine/externalize/RelJsonReaderTest.java | 77 ------------
.../engine/framework/PredefinedSchemaManager.java | 12 +-
.../sql/engine/framework/TestBuilders.java | 2 +-
.../sql/engine/planner/AbstractPlannerTest.java | 24 +++-
27 files changed, 381 insertions(+), 381 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
b/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
index efca7c5fa1..d309379728 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
@@ -49,7 +49,7 @@ import org.jetbrains.annotations.Nullable;
*/
public class VersionedValue<T> {
/** Token until the value is initialized. */
- private static final long NOT_INITIALIZED = -1L;
+ public static final long NOT_INITIALIZED = -1L;
/** Default history size. */
private static final int DEFAULT_MAX_HISTORY_SIZE = 10;
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 07e5e5ca37..0ce7e3f6b2 100755
--- a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -157,9 +157,6 @@ public class ErrorGroups {
/** Object already exists in schema. */
public static final int OBJECT_ALREADY_EXISTS_ERR =
SQL_ERR_GROUP.registerErrorCode(17);
- /** Table version not found. */
- public static final int TABLE_VER_NOT_FOUND_ERR =
SQL_ERR_GROUP.registerErrorCode(18);
-
/** Query mapping error. */
public static final int QUERY_MAPPING_ERR =
SQL_ERR_GROUP.registerErrorCode(19);
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 424df53627..e6439336c1 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -233,7 +233,7 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
return future;
} catch (Exception ex) {
- return CompletableFuture.failedFuture(ex);
+ return failedFuture(ex);
} finally {
busyLock.leaveBusy();
}
@@ -253,7 +253,7 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
boolean failIfNotExists
) {
if (!busyLock.enterBusy()) {
- return CompletableFuture.failedFuture(new NodeStoppingException());
+ return failedFuture(new NodeStoppingException());
}
LOG.debug("Going to drop index [schema={}, index={}]", schemaName,
indexName);
@@ -547,7 +547,7 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
String newName,
ConfigurationNotificationEvent<TableIndexView> ctx
) {
- return CompletableFuture.failedFuture(new
UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-16196"));
+ return failedFuture(new
UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-16196"));
}
/** {@inheritDoc} */
@@ -559,7 +559,7 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
/** {@inheritDoc} */
@Override
public @NotNull CompletableFuture<?> onUpdate(@NotNull
ConfigurationNotificationEvent<TableIndexView> ctx) {
- return CompletableFuture.failedFuture(new
IllegalStateException("Should not be called"));
+ return failedFuture(new IllegalStateException("Should not be
called"));
}
}
}
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEventParameters.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEventParameters.java
index 3324a2463c..f9f244a23c 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEventParameters.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEventParameters.java
@@ -59,7 +59,7 @@ public class IndexEventParameters extends EventParameters {
* @param indexId An index identifier.
* @param index An index instance.
*/
- protected IndexEventParameters(long revision, UUID indexId, @Nullable
Index<?> index) {
+ private IndexEventParameters(long revision, UUID indexId, @Nullable
Index<?> index) {
super(revision);
this.indexId = indexId;
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
index 10c2abe728..3771263772 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
@@ -27,11 +27,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.InitParameters;
@@ -187,6 +190,44 @@ public class ItDataSchemaSyncTest extends
IgniteAbstractTest {
assertTrue(waitForCondition(() ->
table1.schemaView().schema().version() == 2, 5_000));
}
+ /**
+ * Check that sql query will wait until appropriate schema is not
propagated into all nodes.
+ */
+ @Test
+ public void queryWaitAppropriateSchema() throws Exception {
+ Ignite ignite0 = clusterNodes.get(0);
+ IgniteImpl ignite1 = (IgniteImpl) clusterNodes.get(1);
+
+ createTable(ignite0, TABLE_NAME);
+
+ WatchListenerInhibitor listenerInhibitor =
WatchListenerInhibitor.metastorageEventsInhibitor(ignite1);
+
+ listenerInhibitor.startInhibit();
+
+ sql(ignite0, "CREATE INDEX idx1 ON " + TABLE_NAME + "(valint)");
+
+ CompletableFuture<Void> fut = CompletableFuture.runAsync(() ->
sql(ignite0, "SELECT * FROM "
+ + TABLE_NAME + " WHERE valint > 0"));
+
+ try {
+ // wait a timeout to observe that query can`t be executed.
+ fut.get(1, TimeUnit.SECONDS);
+
+ fail();
+ } catch (TimeoutException e) {
+ // Expected, no op.
+ }
+
+ listenerInhibitor.stopInhibit();
+
+ // only check that request is executed without timeout.
+ ResultSet<SqlRow> rs = sql(ignite0, "SELECT * FROM " + TABLE_NAME + "
WHERE valint > 0");
+
+ assertNotNull(rs);
+
+ rs.close();
+ }
+
/**
* Test correctness of schemes recovery after node restart.
*/
@@ -314,9 +355,11 @@ public class ItDataSchemaSyncTest extends
IgniteAbstractTest {
sql(node, "ALTER TABLE " + tableName + " ADD COLUMN valstr2 VARCHAR
NOT NULL DEFAULT 'default'");
}
- protected void sql(Ignite node, String query, Object... args) {
+ protected ResultSet<SqlRow> sql(Ignite node, String query, Object... args)
{
+ ResultSet<SqlRow> rs = null;
try (Session session = node.sql().createSession()) {
- session.execute(null, query, args);
+ rs = session.execute(null, query, args);
}
+ return rs;
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 23eafef4d1..548e113b96 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -113,7 +113,9 @@ import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.TransactionException;
@@ -663,6 +665,68 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
assertEquals(DEFAULT_NODE_PORT, nodePort);
}
+ /**
+ * Check correctness of return results after node restart.
+ * Scenario:
+ * <ol>
+ * <li>Start two nodes and fill the data.</li>
+ * <li>Create index.</li>
+ * <li>Check explain contain index scan.</li>
+ * <li>Check return results.</li>
+ * <li>Restart one node.</li>
+ * <li>Run query and compare results.</li>
+ * </ol>
+ */
+ @Test
+ public void testQueryCorrectnessAfterNodeRestart() throws
InterruptedException {
+ IgniteImpl ignite1 = startNode(0);
+
+ createTableWithoutData(ignite1, TABLE_NAME, 2, 1);
+
+ IgniteImpl ignite2 = startNode(1);
+
+ String sql = "SELECT id FROM " + TABLE_NAME + " WHERE id > 0 ORDER BY
id";
+
+ int intRes;
+
+ try (Session session1 = ignite1.sql().createSession(); Session
session2 = ignite2.sql().createSession()) {
+ session1.execute(null, "CREATE INDEX idx1 ON " + TABLE_NAME +
"(id)");
+
+ waitForIndex(List.of(ignite1, ignite2), "idx1");
+
+ createTableWithData(List.of(ignite1), TABLE_NAME, 2, 1);
+
+ ResultSet<SqlRow> plan = session1.execute(null, "EXPLAIN PLAN FOR
" + sql);
+
+ String planStr = plan.next().stringValue(0);
+
+ assertTrue(planStr.contains("IndexScan"));
+
+ ResultSet<SqlRow> res1 = session1.execute(null, sql);
+
+ ResultSet<SqlRow> res2 = session2.execute(null, sql);
+
+ intRes = res1.next().intValue(0);
+
+ assertEquals(intRes, res2.next().intValue(0));
+
+ res1.close();
+
+ res2.close();
+ }
+
+ // TODO: Uncomment after IGNITE-18203
+ /*stopNode(0);
+
+ ignite1 = startNode(0);
+
+ try (Session session1 = ignite1.sql().createSession()) {
+ ResultSet<SqlRow> res3 = session1.execute(null, sql);
+
+ assertEquals(intRes, res3.next().intValue(0));
+ }*/
+ }
+
/**
* Restarts a node with changing configuration.
*/
@@ -1055,7 +1119,7 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
private void createTableWithData(List<IgniteImpl> nodes, String name, int
replicas, int partitions)
throws InterruptedException {
try (Session session = nodes.get(0).sql().createSession()) {
- session.execute(null, "CREATE TABLE " + name
+ session.execute(null, "CREATE TABLE IF NOT EXISTS " + name
+ "(id INT PRIMARY KEY, name VARCHAR) WITH replicas=" +
replicas + ", partitions=" + partitions);
waitForIndex(nodes, name + "_PK");
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
index 827158a6d2..309bcfc1ed 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
@@ -37,7 +37,7 @@ import org.junit.jupiter.api.Test;
* Test Ignite SQL functions.
*/
public class ItFunctionsTest extends ClusterPerClassIntegrationTest {
- private static final Object[] NULL_RESULT = new Object[] { null };
+ private static final Object[] NULL_RESULT = { null };
@Test
public void testTimestampDiffWithFractionsOfSecond() {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItMixedQueriesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItMixedQueriesTest.java
index fa249c48d7..74cab4acc8 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItMixedQueriesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItMixedQueriesTest.java
@@ -27,7 +27,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -208,7 +207,6 @@ public class ItMixedQueriesTest extends
ClusterPerClassIntegrationTest {
* Verifies that table modification events are passed to a calcite schema
modification listener.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-16679")
public void testIgniteSchemaAwaresAlterTableCommand() {
String selectAllQry = "select * from test_tbl";
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index c4ea27f00a..8d084d192c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -100,10 +100,10 @@ public class SqlQueryProcessor implements QueryProcessor {
private static final long PLANNER_TIMEOUT = 15000L;
/** Size of the cache for query plans. */
- public static final int PLAN_CACHE_SIZE = 1024;
+ private static final int PLAN_CACHE_SIZE = 1024;
/** Session expiration check period in milliseconds. */
- public static final long SESSION_EXPIRE_CHECK_PERIOD =
TimeUnit.SECONDS.toMillis(1);
+ private static final long SESSION_EXPIRE_CHECK_PERIOD =
TimeUnit.SECONDS.toMillis(1);
/** Name of the default schema. */
public static final String DEFAULT_SCHEMA_NAME = "PUBLIC";
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 817259d19f..fd4ffaa2c1 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -245,8 +245,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
.build();
}
- private QueryPlan prepareFragment(String jsonFragment) {
- IgniteRel plan = physNodesCache.computeIfAbsent(jsonFragment, ser ->
fromJson(sqlSchemaManager, ser));
+ private QueryPlan prepareFragment(String jsonFragment, BaseQueryContext
ctx) {
+ IgniteRel plan = physNodesCache.computeIfAbsent(jsonFragment, ser ->
fromJson(ctx, ser));
return new FragmentPlan(plan);
}
@@ -260,8 +260,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
case DML:
// TODO a barrier between previous operation and this one
case QUERY:
- return executeQuery(tx, ctx, (MultiStepPlan) plan
- );
+ return executeQuery(tx, ctx, (MultiStepPlan) plan);
case EXPLAIN:
return executeExplain((ExplainPlan) plan);
case DDL:
@@ -321,13 +320,20 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private void onMessage(String nodeName, QueryStartRequest msg) {
assert nodeName != null && msg != null;
- DistributedQueryManager queryManager =
queryManagerMap.computeIfAbsent(msg.queryId(), key -> {
- BaseQueryContext ctx = createQueryContext(key, msg.schema(),
msg.parameters());
+ CompletableFuture<?> fut =
sqlSchemaManager.actualSchemaAsync(msg.schemaVersion());
- return new DistributedQueryManager(ctx);
- });
+ if (fut.isDone()) {
+ submitFragment(nodeName, msg);
+ } else {
+ fut.whenComplete((mgr, ex) -> {
+ if (ex != null) {
+ handleError(ex, nodeName, msg);
+ return;
+ }
- queryManager.submitFragment(nodeName, msg.root(),
msg.fragmentDescription(), msg.txAttributes());
+ taskExecutor.execute(msg.queryId(), msg.fragmentId(), () ->
submitFragment(nodeName, msg));
+ });
+ }
}
private void onMessage(String nodeName, QueryStartResponse msg) {
@@ -395,6 +401,28 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
return mgr.localFragments();
}
+ private void submitFragment(String nodeName, QueryStartRequest msg) {
+ DistributedQueryManager queryManager = getOrCreateQueryManager(msg);
+
+ queryManager.submitFragment(nodeName, msg.root(),
msg.fragmentDescription(), msg.txAttributes());
+ }
+
+ private void handleError(Throwable ex, String nodeName, QueryStartRequest
msg) {
+ DistributedQueryManager queryManager = getOrCreateQueryManager(msg);
+
+ queryManager.handleError(ex, nodeName,
msg.fragmentDescription().fragmentId());
+ }
+
+ private DistributedQueryManager getOrCreateQueryManager(QueryStartRequest
msg) {
+ DistributedQueryManager queryManager =
queryManagerMap.computeIfAbsent(msg.queryId(), key -> {
+ BaseQueryContext ctx = createQueryContext(key, msg.schema(),
msg.parameters());
+
+ return new DistributedQueryManager(ctx);
+ });
+
+ return queryManager;
+ }
+
/**
* A convenient class that manages the initialization and termination of
distributed queries.
*/
@@ -414,9 +442,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private volatile Long rootFragmentId = null;
- private DistributedQueryManager(
- BaseQueryContext ctx
- ) {
+ private DistributedQueryManager(BaseQueryContext ctx) {
this.ctx = ctx;
var root = new CompletableFuture<AsyncRootNode<RowT,
List<Object>>>();
@@ -445,6 +471,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
.fragmentDescription(desc)
.parameters(ctx.parameters())
.txAttributes(txAttributes)
+ .schemaVersion(ctx.schemaVersion())
.build();
var fut = new CompletableFuture<Void>();
@@ -554,30 +581,39 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
);
}
- private void submitFragment(String initiatorNode, String
fragmentString, FragmentDescription desc, TxAttributes txAttributes) {
+ private void submitFragment(
+ String initiatorNode,
+ String fragmentString,
+ FragmentDescription desc,
+ TxAttributes txAttributes
+ ) {
try {
- QueryPlan qryPlan = prepareFragment(fragmentString);
+ QueryPlan qryPlan = prepareFragment(fragmentString, ctx);
FragmentPlan plan = (FragmentPlan) qryPlan;
executeFragment(plan, createContext(initiatorNode, desc,
txAttributes));
} catch (Throwable ex) {
- LOG.debug("Unable to start query fragment", ex);
+ handleError(ex, initiatorNode, desc.fragmentId());
+ }
+ }
- try {
- msgSrvc.send(
- initiatorNode,
- FACTORY.queryStartResponse()
- .queryId(ctx.queryId())
- .fragmentId(desc.fragmentId())
- .error(ex)
- .build()
- );
- } catch (Exception e) {
- LOG.info("Unable to send error message", e);
-
- close(true);
- }
+ private void handleError(Throwable ex, String initiatorNode, long
fragmentId) {
+ LOG.debug("Unable to start query fragment", ex);
+
+ try {
+ msgSrvc.send(
+ initiatorNode,
+ FACTORY.queryStartResponse()
+ .queryId(ctx.queryId())
+ .fragmentId(fragmentId)
+ .error(ex)
+ .build()
+ );
+ } catch (Exception e) {
+ LOG.info("Unable to send error message", e);
+
+ close(true);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelInputEx.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelInputEx.java
index a614438074..15b0e9f2eb 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelInputEx.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelInputEx.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.sql.engine.externalize;
import java.util.List;
-import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelInput;
import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
@@ -37,13 +36,6 @@ public interface RelInputEx extends RelInput {
*/
RelCollation getCollation(String tag);
- /**
- * Returns table by its id.
- *
- * @return A table with given id.
- */
- RelOptTable getTableById();
-
/**
* Returns search bounds.
*
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
index 645256b1a9..dd929a2ac2 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
@@ -30,12 +30,11 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.UUID;
import java.util.function.Function;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.RelOptTableImpl;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelDistribution;
@@ -50,8 +49,7 @@ import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
-import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
-import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
+import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.sql.SqlException;
@@ -61,15 +59,14 @@ import org.apache.ignite.sql.SqlException;
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class RelJsonReader {
- private static final TypeReference<LinkedHashMap<String, Object>> TYPE_REF
= new TypeReference<>() {
- };
+ private static final TypeReference<LinkedHashMap<String, Object>> TYPE_REF
= new TypeReference<>() {};
private final ObjectMapper mapper = new
ObjectMapper().enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
- private final SqlSchemaManager schemaManager;
-
private final RelJson relJson;
+ private final RelOptSchema relOptSchema;
+
private final Map<String, RelNode> relMap = new LinkedHashMap<>();
private RelNode lastRel;
@@ -78,8 +75,8 @@ public class RelJsonReader {
* FromJson.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public static <T extends RelNode> T fromJson(SqlSchemaManager
schemaManager, String json) {
- RelJsonReader reader = new RelJsonReader(schemaManager);
+ public static <T extends RelNode> T fromJson(BaseQueryContext ctx, String
json) {
+ RelJsonReader reader = new RelJsonReader(ctx.catalogReader());
return (T) reader.read(json);
}
@@ -88,8 +85,8 @@ public class RelJsonReader {
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public RelJsonReader(SqlSchemaManager schemaManager) {
- this.schemaManager = schemaManager;
+ public RelJsonReader(RelOptSchema relOptSchema) {
+ this.relOptSchema = relOptSchema;
relJson = new RelJson();
}
@@ -147,25 +144,8 @@ public class RelJsonReader {
/** {@inheritDoc} */
@Override
public RelOptTable getTable(String table) {
- // For deserialization #getTableById() should be used instead
because
- // it's the only way to find out that someone just recreate the
table
- // (probably with different schema) with the same name while the
plan
- // was serialized
- throw new AssertionError("Unexpected method was called");
- }
-
- /** {@inheritDoc} */
- @Override
- public RelOptTable getTableById() {
- String tableId = getString("tableId");
- int ver = ((Number) get("tableVer")).intValue();
-
- IgniteTable table =
schemaManager.tableById(UUID.fromString(tableId), ver);
-
- List<String> tableName = getStringList("table");
-
- return RelOptTableImpl.create(null,
table.getRowType(Commons.typeFactory()), tableName,
- table, c -> null);
+ List<String> list = getStringList(table);
+ return relOptSchema.getTableForMember(list);
}
/** {@inheritDoc} */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
index d8f000b490..f104188038 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
@@ -54,4 +54,9 @@ public interface QueryStartRequest extends
ExecutionContextAwareMessage {
*/
@Marshallable
TxAttributes txAttributes();
+
+ /**
+ * Return last schema version, just a stub, need to be removed after
IGNITE-18733.
+ */
+ long schemaVersion();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteTableModify.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteTableModify.java
index e78d754877..e4b3fc3bf6 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteTableModify.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteTableModify.java
@@ -28,7 +28,6 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rex.RexNode;
-import org.apache.ignite.internal.sql.engine.externalize.RelInputEx;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -77,7 +76,7 @@ public class IgniteTableModify extends TableModify implements
IgniteRel {
this(
input.getCluster(),
input.getTraitSet().replace(IgniteConvention.INSTANCE),
- ((RelInputEx) input).getTableById(),
+ input.getTable("table"),
input.getInput(),
input.getEnum("operation", Operation.class),
input.getStringList("updateColumnList"),
@@ -115,10 +114,9 @@ public class IgniteTableModify extends TableModify
implements IgniteRel {
@Override
public RelWriter explainTerms(RelWriter pw) {
+ // for correct rel obtaining from ExecutionServiceImpl#physNodesCache.
return super.explainTerms(pw)
.itemIf("tableId",
getTable().unwrap(IgniteTable.class).id().toString(),
- pw.getDetailLevel() == ALL_ATTRIBUTES)
- .itemIf("tableVer",
getTable().unwrap(IgniteTable.class).version(),
- pw.getDetailLevel() == ALL_ATTRIBUTES);
+ pw.getDetailLevel() == ALL_ATTRIBUTES);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/ProjectableFilterableTableScan.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/ProjectableFilterableTableScan.java
index 516cb77249..e264aded24 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/ProjectableFilterableTableScan.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/ProjectableFilterableTableScan.java
@@ -44,7 +44,6 @@ import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.util.ControlFlowException;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.mapping.Mappings;
-import org.apache.ignite.internal.sql.engine.externalize.RelInputEx;
import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCost;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
@@ -88,13 +87,7 @@ public abstract class ProjectableFilterableTableScan extends
TableScan {
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
protected ProjectableFilterableTableScan(RelInput input) {
- super(
- input.getCluster(),
- input.getTraitSet(),
- List.of(),
- ((RelInputEx) input).getTableById()
- );
-
+ super(input);
condition = input.getExpression("filters");
projects = input.get("projects") == null ? null :
input.getExpressionList("projects");
requiredColumns = input.get("requiredColumns") == null ? null :
input.getBitSet("requiredColumns");
@@ -135,10 +128,7 @@ public abstract class ProjectableFilterableTableScan
extends TableScan {
return explainTerms0(pw
.item("table", table.getQualifiedName())
.itemIf("tableId",
table.unwrap(IgniteTable.class).id().toString(),
- pw.getDetailLevel() == ALL_ATTRIBUTES)
- .itemIf("tableVer", table.unwrap(IgniteTable.class).version(),
- pw.getDetailLevel() == ALL_ATTRIBUTES)
- );
+ pw.getDetailLevel() == ALL_ATTRIBUTES));
}
/** {@inheritDoc} */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
index 01d2fba58d..2ecdfdbe0e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
@@ -35,6 +35,8 @@ public class IgniteSchema extends AbstractSchema {
private final Map<UUID, IgniteIndex> idxMap;
+ private final long schemaVersion;
+
/**
* Creates a Schema with given tables and indexes.
*
@@ -45,11 +47,13 @@ public class IgniteSchema extends AbstractSchema {
public IgniteSchema(
String schemaName,
@Nullable Map<String, Table> tableMap,
- @Nullable Map<UUID, IgniteIndex> indexMap
+ @Nullable Map<UUID, IgniteIndex> indexMap,
+ long schemaVersion
) {
this.schemaName = schemaName;
this.tblMap = tableMap == null ? new ConcurrentHashMap<>() : new
ConcurrentHashMap<>(tableMap);
this.idxMap = indexMap == null ? new ConcurrentHashMap<>() : new
ConcurrentHashMap<>(indexMap);
+ this.schemaVersion = schemaVersion;
}
/**
@@ -58,11 +62,20 @@ public class IgniteSchema extends AbstractSchema {
* @param schemaName A name of the schema to create.
*/
public IgniteSchema(String schemaName) {
- this(schemaName, null, null);
+ this(schemaName, null, null, -1);
+ }
+
+ /**
+ * Creates an empty Schema.
+ *
+ * @param schemaName A name of the schema to create.
+ */
+ public IgniteSchema(String schemaName, long schemaVersion) {
+ this(schemaName, null, null, schemaVersion);
}
- public static IgniteSchema copy(IgniteSchema old) {
- return new IgniteSchema(old.schemaName, old.tblMap, old.idxMap);
+ public static IgniteSchema copy(IgniteSchema old, long schemaVersion) {
+ return new IgniteSchema(old.schemaName, old.tblMap, old.idxMap,
schemaVersion);
}
/**
@@ -127,4 +140,11 @@ public class IgniteSchema extends AbstractSchema {
public IgniteIndex index(UUID indexId) {
return idxMap.get(indexId);
}
+
+ /**
+ * Return actual schema version.
+ */
+ public long schemaVersion() {
+ return schemaVersion;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
index 251d4994f9..75462e7a47 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
@@ -18,12 +18,12 @@
package org.apache.ignite.internal.sql.engine.schema;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import org.apache.calcite.schema.SchemaPlus;
import org.jetbrains.annotations.Nullable;
/**
- * SchemaHolder interface.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Sql schemas operations interface.
*/
public interface SqlSchemaManager {
/**
@@ -35,9 +35,13 @@ public interface SqlSchemaManager {
* Returns a table by given id.
*
* @param id An id of required table.
- * @param ver Minimal required version.
*
* @return The table.
*/
- IgniteTable tableById(UUID id, int ver);
+ IgniteTable tableById(UUID id);
+
+ /**
+ * Wait for {@code ver} schema version, just a stub, need to be removed
after IGNITE-18733.
+ */
+ CompletableFuture<?> actualSchemaAsync(long ver);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index db5a68e378..ee91d68700 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -19,12 +19,13 @@ package org.apache.ignite.internal.sql.engine.schema;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static
org.apache.ignite.internal.causality.VersionedValue.NOT_INITIALIZED;
import static
org.apache.ignite.internal.sql.engine.SqlQueryProcessor.DEFAULT_SCHEMA_NAME;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.OBJECT_NOT_FOUND_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.SCHEMA_EVALUATION_ERR;
-import static org.apache.ignite.lang.ErrorGroups.Sql.TABLE_VER_NOT_FOUND_ERR;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
@@ -35,12 +36,14 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.causality.OutdatedTokenException;
import org.apache.ignite.internal.causality.VersionedValue;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.index.Index;
@@ -57,7 +60,6 @@ import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.NodeStoppingException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -69,10 +71,10 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
private final VersionedValue<Map<String, IgniteSchema>> schemasVv;
private final VersionedValue<Map<UUID, IgniteTable>> tablesVv;
+ private final Map<UUID, CompletableFuture<?>> pkIdxReady = new
ConcurrentHashMap<>();
private final VersionedValue<Map<UUID, IgniteIndex>> indicesVv;
- private final TableManager tableManager;
private final SchemaManager schemaManager;
private final ReplicaService replicaService;
private final HybridClock clock;
@@ -96,7 +98,6 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager
{
Consumer<Function<Long, CompletableFuture<?>>> registry,
IgniteSpinBusyLock busyLock
) {
- this.tableManager = tableManager;
this.schemaManager = schemaManager;
this.replicaService = replicaService;
this.clock = clock;
@@ -145,63 +146,62 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
public SchemaPlus schema(@Nullable String schema) {
SchemaPlus schemaPlus = calciteSchemaVv.latest();
+ // stub for waiting pk indexes, more clear place is IgniteSchema
+
CompletableFuture.allOf(pkIdxReady.values().toArray(CompletableFuture[]::new)).join();
+
return schema != null ? schemaPlus.getSubSchema(schema) :
schemaPlus.getSubSchema(DEFAULT_SCHEMA_NAME);
}
/** {@inheritDoc} */
@Override
- @NotNull
- public IgniteTable tableById(UUID id, int ver) {
+ public CompletableFuture<?> actualSchemaAsync(long ver) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
}
try {
- IgniteTable table = tablesVv.latest().get(id);
-
- // there is a chance that someone tries to resolve table before
- // the distributed event of that table creation has been processed
- // by TableManager, so we need to get in sync with the TableManager
- if (table == null || ver > table.version()) {
- table = awaitLatestTableSchema(id);
+ if (ver == NOT_INITIALIZED) {
+ return completedFuture(null);
}
- if (table == null) {
- throw new IgniteInternalException(OBJECT_NOT_FOUND_ERR,
- IgniteStringFormatter.format("Table not found
[tableId={}]", id));
- }
+ CompletableFuture<SchemaPlus> lastSchemaFut;
- if (table.version() < ver) {
- throw new IgniteInternalException(TABLE_VER_NOT_FOUND_ERR,
- IgniteStringFormatter.format("Table version not found
[tableId={}, requiredVer={}, latestKnownVer={}]",
- id, ver, table.version()));
+ try {
+ lastSchemaFut = calciteSchemaVv.get(ver);
+ } catch (OutdatedTokenException e) {
+ return completedFuture(null);
}
- return table;
+ return lastSchemaFut;
} finally {
busyLock.leaveBusy();
}
}
- public void registerListener(SchemaUpdateListener listener) {
- listeners.add(listener);
- }
-
- private @Nullable IgniteTable awaitLatestTableSchema(UUID tableId) {
+ /** {@inheritDoc} */
+ @Override
+ @NotNull
+ public IgniteTable tableById(UUID id) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
+ }
try {
- TableImpl table = tableManager.table(tableId);
+ IgniteTable table = tablesVv.latest().get(id);
if (table == null) {
- return null;
+ throw new IgniteInternalException(OBJECT_NOT_FOUND_ERR,
+ format("Table not found [tableId={}]", id));
}
- table.schemaView().waitLatestSchema();
-
- return convert(table);
- } catch (NodeStoppingException e) {
- throw new IgniteInternalException(NODE_STOPPING_ERR, e);
+ return table;
+ } finally {
+ busyLock.leaveBusy();
}
}
+ public void registerListener(SchemaUpdateListener listener) {
+ listeners.add(listener);
+ }
+
/**
* OnSqlTypeCreated.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -223,10 +223,12 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
Map<String, IgniteSchema> res = new HashMap<>(schemas);
IgniteSchema schema = res.compute(schemaName,
- (k, v) -> v == null ? new IgniteSchema(schemaName) :
IgniteSchema.copy(v));
+ (k, v) -> v == null ? new IgniteSchema(schemaName,
causalityToken) : IgniteSchema.copy(v, causalityToken));
CompletableFuture<IgniteTableImpl> igniteTableFuture =
convert(causalityToken, table);
+ pkIdxReady.put(table.tableId(), new CompletableFuture<>());
+
return tablesVv.update(causalityToken, (tables, ex) ->
inBusyLock(busyLock, () -> {
if (ex != null) {
@@ -295,13 +297,15 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
Map<String, IgniteSchema> res = new HashMap<>(schemas);
IgniteSchema schema = res.compute(schemaName,
- (k, v) -> v == null ? new IgniteSchema(schemaName) :
IgniteSchema.copy(v));
+ (k, v) -> v == null ? new IgniteSchema(schemaName,
causalityToken) : IgniteSchema.copy(v, causalityToken));
IgniteTable table = (IgniteTable) schema.getTable(tableName);
if (table != null) {
schema.removeTable(tableName);
+ pkIdxReady.remove(table.id());
+
return tablesVv.update(causalityToken, (tables, ex) ->
inBusyLock(busyLock, () -> {
if (ex != null) {
return failedFuture(ex);
@@ -414,7 +418,7 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
Map<String, IgniteSchema> res = new HashMap<>(schemas);
IgniteSchema schema = res.compute(schemaName,
- (k, v) -> v == null ? new IgniteSchema(schemaName) :
IgniteSchema.copy(v));
+ (k, v) -> v == null ? new IgniteSchema(schemaName,
causalityToken) : IgniteSchema.copy(v, causalityToken));
return tablesVv.update(
causalityToken,
@@ -441,7 +445,7 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
resIdxs.put(index.id(), schemaIndex);
- return
CompletableFuture.completedFuture(resIdxs);
+ return completedFuture(resIdxs);
})
).thenCompose(ignore -> {
table.addIndex(schemaIndex);
@@ -454,6 +458,15 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
).thenCompose(v -> completedFuture(res));
}));
+ // this stub is necessary for observing pk index creation.
+ schemasVv.whenComplete((token, stringIgniteSchemaMap, throwable)
-> {
+ CompletableFuture<?> pkFut = pkIdxReady.get(index.tableId());
+ // this listener is called repeatedly on node stop.
+ if (pkFut != null) {
+ pkFut.complete(null);
+ }
+ });
+
return calciteSchemaVv.get(causalityToken);
} finally {
busyLock.leaveBusy();
@@ -481,9 +494,10 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
Map<String, IgniteSchema> res = new HashMap<>(schemas);
IgniteSchema schema = res.compute(schemaName,
- (k, v) -> v == null ? new IgniteSchema(schemaName) :
IgniteSchema.copy(v));
+ (k, v) -> v == null ? new IgniteSchema(schemaName,
causalityToken) : IgniteSchema.copy(v, causalityToken));
IgniteIndex rmvIndex = schema.removeIndex(indexId);
+
if (rmvIndex != null) {
return tablesVv.update(
causalityToken,
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
index 66bf84df95..3985f225c8 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
@@ -50,9 +50,9 @@ import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.QueryCancel;
-import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
import org.apache.ignite.internal.sql.engine.rex.IgniteRexBuilder;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.util.ArrayUtils;
@@ -157,7 +157,7 @@ public final class BaseQueryContext extends
AbstractQueryContext {
private CalciteCatalogReader catalogReader;
- private long plannerTimeout;
+ private final long plannerTimeout;
/**
* Private constructor, used by a builder.
@@ -232,6 +232,10 @@ public final class BaseQueryContext extends
AbstractQueryContext {
return plannerTimeout;
}
+ public long schemaVersion() {
+ return
Objects.requireNonNull(schema().unwrap(IgniteSchema.class)).schemaVersion();
+ }
+
/**
* Returns calcite catalog reader.
*/
@@ -292,8 +296,6 @@ public final class BaseQueryContext extends
AbstractQueryContext {
private Object[] parameters = ArrayUtils.OBJECT_EMPTY_ARRAY;
- private TxAttributes txAttributes;
-
private long plannerTimeout;
public Builder frameworkConfig(FrameworkConfig frameworkCfg) {
@@ -321,18 +323,14 @@ public final class BaseQueryContext extends
AbstractQueryContext {
return this;
}
- public Builder txAttributes(TxAttributes txAttributes) {
- this.txAttributes = txAttributes;
- return this;
- }
-
public Builder plannerTimeout(long plannerTimeout) {
this.plannerTimeout = plannerTimeout;
return this;
}
public BaseQueryContext build() {
- return new BaseQueryContext(queryId, frameworkCfg, cancel,
parameters, log, plannerTimeout);
+ return new BaseQueryContext(queryId, frameworkCfg, cancel,
parameters,
+ log, plannerTimeout);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
index 10c60656dd..9cac261113 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
@@ -46,7 +46,7 @@ public class HashFunctionFactoryImpl<T> implements
HashFunctionFactory<T> {
public RowHashFunction<T> create(int[] fields, UUID tableId) {
int fieldCnt = fields.length;
NativeType[] fieldTypes = new NativeType[fieldCnt];
- TableDescriptor tblDesc = sqlSchemaManager.tableById(tableId,
-1).descriptor();
+ TableDescriptor tblDesc =
sqlSchemaManager.tableById(tableId).descriptor();
ImmutableIntList colocationColumns = tblDesc.distribution().getKeys();
assert colocationColumns.size() == fieldCnt : "fieldsCount=" +
fieldCnt + ", colocationColumns=" + colocationColumns;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index 5c800ed23c..3c672e2748 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -49,6 +49,8 @@ import org.apache.ignite.configuration.ConfigurationValue;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.index.IndexManager;
+import org.apache.ignite.internal.index.event.IndexEvent;
+import org.apache.ignite.internal.index.event.IndexEventParameters;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.EventListener;
@@ -64,6 +66,7 @@ import
org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException;
import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
+import
org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestHashIndex;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.InternalTable;
@@ -140,6 +143,8 @@ public class StopCalciteModuleTest {
private final ClusterNode localNode = new ClusterNode("mock-node-id",
NODE_NAME, null);
+ private UUID tblId = UUID.randomUUID();
+
/**
* Before.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -168,12 +173,21 @@ public class StopCalciteModuleTest {
doAnswer(invocation -> {
EventListener<TableEventParameters> clo =
(EventListener<TableEventParameters>) invocation.getArguments()[1];
- clo.notify(new TableEventParameters(0, UUID.randomUUID(), "TEST",
new TableImpl(tbl, schemaReg, new HeapLockManager())),
+ clo.notify(new TableEventParameters(0, tblId, "TEST", new
TableImpl(tbl, schemaReg, new HeapLockManager())),
null);
return null;
}).when(tableManager).listen(eq(TableEvent.CREATE), any());
+ doAnswer(invocation -> {
+ EventListener<IndexEventParameters> clo =
(EventListener<IndexEventParameters>) invocation.getArguments()[1];
+
+ clo.notify(new IndexEventParameters(0,
TestHashIndex.create(List.of("ID"), "pk_idx", tblId)),
+ null);
+
+ return null;
+ }).when(indexManager).listen(eq(IndexEvent.CREATE), any());
+
RowAssembler asm = new RowAssembler(schemaReg.schema());
asm.appendInt(0);
@@ -227,7 +241,7 @@ public class StopCalciteModuleTest {
clock
);
- when(tbl.tableId()).thenReturn(UUID.randomUUID());
+ when(tbl.tableId()).thenReturn(tblId);
when(tbl.primaryReplicas()).thenReturn(List.of(new
PrimaryReplica(localNode, -1L)));
when(txManager.begin(anyBoolean())).thenReturn(new
NoOpTransaction(localNode.name()));
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index aee1b4c8a8..5f944a52ff 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -30,7 +30,7 @@ import static
org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -39,12 +39,14 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
+import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.tools.Frameworks;
@@ -119,7 +121,7 @@ public class ExecutionServiceImplTest {
private final TestTable table = createTable("TEST_TBL", 1_000_000,
IgniteDistributions.random(),
"ID", NativeTypes.INT32, "VAL", NativeTypes.INT32);
- private final IgniteSchema schema = new IgniteSchema("PUBLIC",
Map.of(table.name(), table), null);
+ private final IgniteSchema schema = new IgniteSchema("PUBLIC",
Map.of(table.name(), table), null, -1);
private TestCluster testCluster;
private List<ExecutionServiceImpl<?>> executionServices;
@@ -147,7 +149,7 @@ public class ExecutionServiceImplTest {
public void testCloseByCursor() throws Exception {
ExecutionService execService = executionServices.get(0);
BaseQueryContext ctx = createContext();
- QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
+ QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
@@ -182,7 +184,7 @@ public class ExecutionServiceImplTest {
public void testCancelOnInitiator() throws InterruptedException {
ExecutionService execService = executionServices.get(0);
BaseQueryContext ctx = createContext();
- QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
+ QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
@@ -217,7 +219,7 @@ public class ExecutionServiceImplTest {
public void testInitializationFailedOnRemoteNode() throws
InterruptedException {
ExecutionService execService = executionServices.get(0);
BaseQueryContext ctx = createContext();
- QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
+ QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
@@ -270,7 +272,7 @@ public class ExecutionServiceImplTest {
ExecutionService execService = executionServices.get(0);
BaseQueryContext ctx = createContext();
- QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
+ QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
@@ -297,7 +299,7 @@ public class ExecutionServiceImplTest {
public void testCancelOnRemote() throws InterruptedException {
ExecutionService execService = executionServices.get(0);
BaseQueryContext ctx = createContext();
- QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
+ QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
@@ -333,7 +335,7 @@ public class ExecutionServiceImplTest {
public void testCursorIsClosedAfterAllDataRead() throws
InterruptedException {
ExecutionService execService = executionServices.get(0);
BaseQueryContext ctx = createContext();
- QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
+ QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
InternalTransaction tx = new NoOpTransaction(nodeNames.get(0));
AsyncCursor<List<Object>> cursor = execService.executePlan(tx, plan,
ctx);
@@ -360,7 +362,7 @@ public class ExecutionServiceImplTest {
public void testCursorIsClosedAfterAllDataRead2() throws
InterruptedException {
ExecutionService execService = executionServices.get(0);
BaseQueryContext ctx = createContext();
- QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
+ QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
InternalTransaction tx = new NoOpTransaction(nodeNames.get(0));
AsyncCursor<List<Object>> cursor = execService.executePlan(tx, plan,
ctx);
@@ -400,7 +402,15 @@ public class ExecutionServiceImplTest {
when(topologyService.localMember()).thenReturn(clusterNode);
- when(schemaManagerMock.tableById(any(), anyInt())).thenReturn(table);
+ when(schemaManagerMock.tableById(any())).thenReturn(table);
+
+
when(schemaManagerMock.actualSchemaAsync(isA(long.class))).thenReturn(CompletableFuture.completedFuture(null));
+
+ CalciteSchema rootSch = CalciteSchema.createRootSchema(false);
+ rootSch.add(schema.getName(), schema);
+ SchemaPlus plus = rootSch.plus();
+
+ when(schemaManagerMock.schema(any())).thenReturn(plus);
var executionService = new ExecutionServiceImpl<>(
messageService,
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
index 3f19350c2d..5a6bb03f48 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
@@ -20,21 +20,15 @@ package org.apache.ignite.internal.sql.engine.exec.schema;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@@ -55,6 +49,7 @@ import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import
org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestHashIndex;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
@@ -64,8 +59,6 @@ import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.lang.NodeStoppingException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -82,10 +75,8 @@ public class SqlSchemaManagerTest {
private final UUID indexId = UUID.randomUUID();
- private final int tableVer = 1;
-
private final SchemaDescriptor schemaDescriptor = new SchemaDescriptor(
- tableVer,
+ 1,
new Column[]{new Column(0, "ID", NativeTypes.INT64, false)},
new Column[]{new Column(1, "VAL", NativeTypes.INT64, false)}
);
@@ -136,116 +127,6 @@ public class SqlSchemaManagerTest {
testRevisionRegister.moveForward();
}
- @Test
- public void testNonExistingTable() throws NodeStoppingException {
- UUID tblId = UUID.randomUUID();
-
- IgniteInternalException ex =
assertThrows(IgniteInternalException.class, () ->
sqlSchemaManager.tableById(tblId, tableVer));
- assertThat(ex.getMessage(), containsString("Table not found"));
-
- Mockito.verify(tableManager).table(eq(tblId));
-
- verifyNoMoreInteractions(tableManager);
- }
-
- @Test
- public void testTableEventIsNotProcessed() throws NodeStoppingException {
- when(tableManager.table(eq(tableId))).thenReturn(table);
- when(table.schemaView()).thenReturn(schemaRegistry);
-
- InternalTable mock = mock(InternalTable.class);
- when(mock.tableId()).thenReturn(tableId);
-
- when(table.internalTable()).thenReturn(mock);
- when(schemaRegistry.schema()).thenReturn(schemaDescriptor);
-
when(schemaRegistry.lastSchemaVersion()).thenReturn(schemaDescriptor.version());
-
- when(schemaManager.schemaRegistry(any())).thenReturn(schemaRegistry);
-
- IgniteTable actTable = sqlSchemaManager.tableById(tableId, tableVer);
-
- assertEquals(tableId, actTable.id());
-
- Mockito.verify(tableManager).table(eq(tableId));
-
- verifyNoMoreInteractions(tableManager);
- }
-
- @Test
- public void testTableEventIsProcessedRequiredVersionIsSame() {
- InternalTable mock = mock(InternalTable.class);
- when(mock.tableId()).thenReturn(tableId);
- when(mock.name()).thenReturn("PUBLIC.T");
-
- when(table.internalTable()).thenReturn(mock);
- when(schemaRegistry.schema()).thenReturn(schemaDescriptor);
-
when(schemaRegistry.lastSchemaVersion()).thenReturn(schemaDescriptor.version());
-
- when(schemaManager.schemaRegistry(anyLong(),
any())).thenReturn(completedFuture(schemaRegistry));
-
- sqlSchemaManager.onTableCreated("PUBLIC", table,
testRevisionRegister.actualToken() + 1);
- testRevisionRegister.moveForward();
-
- IgniteTable actTable = sqlSchemaManager.tableById(tableId, tableVer);
-
- assertEquals(tableId, actTable.id());
-
- verifyNoMoreInteractions(tableManager);
- }
-
- @Test
- public void testTableEventIsProcessedRequiredVersionIsLess() {
- InternalTable mock = mock(InternalTable.class);
- when(mock.tableId()).thenReturn(tableId);
- when(mock.name()).thenReturn("PUBLIC.T");
-
- when(table.internalTable()).thenReturn(mock);
- when(schemaRegistry.schema()).thenReturn(schemaDescriptor);
-
when(schemaRegistry.lastSchemaVersion()).thenReturn(schemaDescriptor.version());
-
- when(schemaManager.schemaRegistry(anyLong(),
any())).thenReturn(completedFuture(schemaRegistry));
-
- sqlSchemaManager.onTableCreated("PUBLIC", table,
testRevisionRegister.actualToken() + 1);
- testRevisionRegister.moveForward();
-
- IgniteTable actTable = sqlSchemaManager.tableById(tableId, tableVer -
1);
-
- assertEquals(tableId, actTable.id());
-
- verifyNoMoreInteractions(tableManager);
- }
-
- @Test
- public void testTableEventIsProcessedRequiredVersionIsGreater() throws
NodeStoppingException {
- when(table.schemaView()).thenReturn(schemaRegistry);
-
- InternalTable mock = mock(InternalTable.class);
- when(mock.tableId()).thenReturn(tableId);
- when(mock.name()).thenReturn("PUBLIC.T");
-
- when(table.internalTable()).thenReturn(mock);
- when(schemaRegistry.schema()).thenReturn(schemaDescriptor);
- when(schemaRegistry.lastSchemaVersion()).thenReturn(tableVer - 1);
- when(schemaManager.schemaRegistry(anyLong(),
any())).thenReturn(completedFuture(schemaRegistry));
- when(schemaManager.schemaRegistry(any())).thenReturn(schemaRegistry);
-
- sqlSchemaManager.onTableCreated("PUBLIC", table,
testRevisionRegister.actualToken() + 1);
- testRevisionRegister.moveForward();
-
- when(tableManager.table(eq(tableId))).thenReturn(table);
- when(schemaRegistry.lastSchemaVersion()).thenReturn(tableVer);
-
- IgniteTable actTable = sqlSchemaManager.tableById(tableId, tableVer);
- assertEquals(tableId, actTable.id());
-
- IgniteInternalException ex =
assertThrows(IgniteInternalException.class, () ->
sqlSchemaManager.tableById(tableId, tableVer + 1));
- assertThat(ex.getMessage(), containsString("Table version not found"));
-
- Mockito.verify(tableManager, times(2)).table(eq(tableId));
-
- verifyNoMoreInteractions(tableManager);
- }
-
@Test
public void testOnTableDroppedHandler() {
when(table.name()).thenReturn("T");
@@ -255,6 +136,7 @@ public class SqlSchemaManagerTest {
when(mock.name()).thenReturn("T");
when(table.internalTable()).thenReturn(mock);
+ when(table.tableId()).thenReturn(tableId);
when(schemaRegistry.schema()).thenReturn(schemaDescriptor);
when(schemaRegistry.lastSchemaVersion()).thenReturn(schemaDescriptor.version());
@@ -262,6 +144,9 @@ public class SqlSchemaManagerTest {
sqlSchemaManager.onTableCreated("PUBLIC", table,
testRevisionRegister.actualToken() + 1);
testRevisionRegister.moveForward();
+ sqlSchemaManager.onIndexCreated(
+ TestHashIndex.create(List.of("ID"), "pk_idx", tableId),
testRevisionRegister.actualToken() + 1);
+ testRevisionRegister.moveForward();
Table schemaTable = sqlSchemaManager.schema("PUBLIC").getTable("T");
@@ -282,6 +167,7 @@ public class SqlSchemaManagerTest {
when(mock.name()).thenReturn("T");
when(table.internalTable()).thenReturn(mock);
+ when(table.tableId()).thenReturn(tableId);
when(schemaRegistry.schema()).thenReturn(schemaDescriptor);
when(schemaRegistry.lastSchemaVersion()).thenReturn(schemaDescriptor.version());
when(schemaManager.schemaRegistry(anyLong(),
any())).thenReturn(completedFuture(schemaRegistry));
@@ -289,7 +175,11 @@ public class SqlSchemaManagerTest {
sqlSchemaManager.onTableCreated("PUBLIC", table,
testRevisionRegister.actualToken() + 1);
testRevisionRegister.moveForward();
- assertTrue(((IgniteTableImpl)
sqlSchemaManager.schema("PUBLIC").getTable("T")).indexes().isEmpty());
+ sqlSchemaManager.onIndexCreated(
+ TestHashIndex.create(List.of("ID"), "pk_idx", tableId),
testRevisionRegister.actualToken() + 1);
+ testRevisionRegister.moveForward();
+
+ assertEquals(1, ((IgniteTableImpl)
sqlSchemaManager.schema("PUBLIC").getTable("T")).indexes().size());
IndexDescriptor descMock = mock(IndexDescriptor.class);
when(descMock.columns()).thenReturn(List.of());
@@ -323,18 +213,22 @@ public class SqlSchemaManagerTest {
@Test
- public void testIndexEventsProcessed() throws Exception {
+ public void testIndexEventsProcessed() {
InternalTable mock = mock(InternalTable.class);
when(mock.tableId()).thenReturn(tableId);
when(mock.name()).thenReturn("T");
when(table.internalTable()).thenReturn(mock);
+ when(table.tableId()).thenReturn(tableId);
when(schemaRegistry.schema()).thenReturn(schemaDescriptor);
when(schemaRegistry.lastSchemaVersion()).thenReturn(schemaDescriptor.version());
when(schemaManager.schemaRegistry(anyLong(),
any())).thenReturn(completedFuture(schemaRegistry));
sqlSchemaManager.onTableCreated("PUBLIC", table,
testRevisionRegister.actualToken() + 1);
testRevisionRegister.moveForward();
+ sqlSchemaManager.onIndexCreated(
+ TestHashIndex.create(List.of("ID"), "pk_idx", tableId),
testRevisionRegister.actualToken() + 1);
+ testRevisionRegister.moveForward();
IndexDescriptor descMock = mock(IndexDescriptor.class);
when(descMock.columns()).thenReturn(List.of());
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReaderTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReaderTest.java
deleted file mode 100644
index 0d61f3a638..0000000000
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReaderTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.externalize;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.isA;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.UUID;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.schema.Statistic;
-import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
-import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
-import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-
-/**
- * Tests to verify {@link RelJsonReader} behaviour.
- */
-public class RelJsonReaderTest {
-
- /**
- * Test verifies that during deserialization table being resolved by its
ID.
- */
- @Test
- void fromJson() {
- UUID tableId = UUID.randomUUID();
- int tableVer = 2;
-
- IgniteTable igniteTableMock = mock(IgniteTable.class);
- when(igniteTableMock.getStatistic()).thenReturn(new Statistic() {});
-
when(igniteTableMock.getRowType(any())).thenReturn(mock(RelDataType.class));
-
- SqlSchemaManager schemaMock = mock(SqlSchemaManager.class);
- when(schemaMock.tableById(tableId,
tableVer)).thenReturn(igniteTableMock);
-
- String json = ""
- + "{\n"
- + " \"rels\" : [ {\n"
- + " \"id\" : \"0\",\n"
- + " \"relOp\" : \"IgniteTableScan\",\n"
- + " \"table\" : [\"PUBLIC\", \"TEST\"],\n"
- + " \"tableId\" : \"" + tableId + "\",\n"
- + " \"tableVer\" : " + tableVer + ",\n"
- + " \"inputs\" : [ ]\n"
- + " } ]\n"
- + "}";
-
- RelNode node = RelJsonReader.fromJson(schemaMock, json);
-
- assertThat(node, isA(IgniteTableScan.class));
- assertThat(node.getTable(), notNullValue());
- assertThat(node.getTable().unwrap(IgniteTable.class),
is(igniteTableMock));
- Mockito.verify(schemaMock).tableById(tableId, tableVer);
- }
-}
\ No newline at end of file
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
index bd9e9bc4ce..a34dff4c1f 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.calcite.schema.SchemaPlus;
@@ -45,12 +46,12 @@ public class PredefinedSchemaManager implements
SqlSchemaManager {
private final Map<UUID, IgniteTable> tableById;
/** Constructs schema manager from a single schema. */
- public PredefinedSchemaManager(IgniteSchema schema) {
+ PredefinedSchemaManager(IgniteSchema schema) {
this(List.of(schema));
}
/** Constructs schema manager from a collection of schemas. */
- public PredefinedSchemaManager(Collection<IgniteSchema> schemas) {
+ PredefinedSchemaManager(Collection<IgniteSchema> schemas) {
this.root = Frameworks.createRootSchema(false);
this.tableById = new HashMap<>();
@@ -72,9 +73,14 @@ public class PredefinedSchemaManager implements
SqlSchemaManager {
return schema == null ? root : root.getSubSchema(schema);
}
+ @Override
+ public CompletableFuture<?> actualSchemaAsync(long ver) {
+ return CompletableFuture.completedFuture(null);
+ }
+
/** {@inheritDoc} */
@Override
- public IgniteTable tableById(UUID id, int ver) {
+ public IgniteTable tableById(UUID id) {
return tableById.get(id);
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 6e82df41da..98ff797af7 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -267,7 +267,7 @@ public class TestBuilders {
.map(ClusterTableBuilderImpl::build)
.collect(Collectors.toMap(TestTable::name,
Function.identity()));
- var schemaManager = new PredefinedSchemaManager(new
IgniteSchema("PUBLIC", tableMap, null));
+ var schemaManager = new PredefinedSchemaManager(new
IgniteSchema("PUBLIC", tableMap, null, -1));
Map<String, TestNode> nodes = nodeNames.stream()
.map(name -> new TestNode(name,
clusterService.forNode(name), schemaManager))
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index 454cbfe40a..c31027e6d3 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -94,7 +94,6 @@ import org.apache.ignite.internal.schema.NativeType;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.externalize.RelJsonReader;
-import org.apache.ignite.internal.sql.engine.framework.PredefinedSchemaManager;
import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
import org.apache.ignite.internal.sql.engine.prepare.Cloner;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
@@ -698,8 +697,10 @@ public abstract class AbstractPlannerTest extends
IgniteAbstractTest {
List<RelNode> deserializedNodes = new ArrayList<>();
+ BaseQueryContext ctx = baseQueryContext(schemas, null);
+
for (String s : serialized) {
- RelJsonReader reader = new RelJsonReader(new
PredefinedSchemaManager(schemas));
+ RelJsonReader reader = new RelJsonReader(ctx.catalogReader());
deserializedNodes.add(reader.read(s));
}
@@ -1246,20 +1247,33 @@ public abstract class AbstractPlannerTest extends
IgniteAbstractTest {
}
}
- static class TestHashIndex implements Index<IndexDescriptor> {
+ /** Test Hash index implementation. */
+ public static class TestHashIndex implements Index<IndexDescriptor> {
private final UUID id = UUID.randomUUID();
- private final UUID tableId = UUID.randomUUID();
+ private UUID tableId = UUID.randomUUID();
private final IndexDescriptor descriptor;
+ /** Create index. */
+ public static TestHashIndex create(List<String> indexedColumns, String
name, UUID tableId) {
+ var descriptor = new IndexDescriptor(name, indexedColumns);
+
+ TestHashIndex idx = new TestHashIndex(descriptor);
+
+ idx.tableId = tableId;
+
+ return idx;
+ }
+
+ /** Create index. */
public static TestHashIndex create(List<String> indexedColumns, String
name) {
var descriptor = new IndexDescriptor(name, indexedColumns);
return new TestHashIndex(descriptor);
}
- public TestHashIndex(IndexDescriptor descriptor) {
+ TestHashIndex(IndexDescriptor descriptor) {
this.descriptor = descriptor;
}