This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-19497 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit a1ba302325fd824a8a124d7a5b4b14523fda718e Author: amashenkov <[email protected]> AuthorDate: Tue Aug 1 21:01:30 2023 +0300 Fix HashFunctionFactoryImpl to use specific schema version. Fix SqlSchemaManager interface. Drop unused code. --- .../ignite/internal/sql/api/ItCommonApiTest.java | 17 +- .../internal/sql/engine/SqlQueryProcessor.java | 146 +----- .../sql/engine/exec/ExecutionServiceImpl.java | 3 +- .../sql/engine/schema/CatalogSqlSchemaManager.java | 18 +- .../sql/engine/schema/SqlSchemaManager.java | 24 +- .../sql/engine/schema/SqlSchemaManagerImpl.java | 584 --------------------- .../internal/sql/engine/util/BaseQueryContext.java | 4 +- .../sql/engine/util/HashFunctionFactoryImpl.java | 19 +- .../sql/engine/exec/ExecutionServiceImplTest.java | 7 +- .../engine/exec/schema/SqlSchemaManagerTest.java | 329 ------------ .../engine/framework/PredefinedSchemaManager.java | 16 +- .../internal/sql/engine/framework/TestNode.java | 4 +- .../engine/schema/CatalogSqlSchemaManagerTest.java | 22 +- 13 files changed, 55 insertions(+), 1138 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java index 5b46b788bb..a35925eb5a 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java @@ -34,7 +34,6 @@ import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest; import org.apache.ignite.internal.sql.engine.SqlQueryProcessor; import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException; -import org.apache.ignite.internal.sql.engine.schema.IgniteTable; import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.tx.InternalTransaction; @@ -222,12 +221,6 @@ public class ItCommonApiTest extends ClusterPerClassIntegrationTest { } private static class ErroneousSchemaManager implements SqlSchemaManager { - /** {@inheritDoc} */ - @Override - public SchemaPlus schema(@Nullable String schema) { - return null; - } - /** {@inheritDoc} */ @Override public SchemaPlus schema(@Nullable String name, int version) { @@ -236,19 +229,13 @@ public class ItCommonApiTest extends ClusterPerClassIntegrationTest { /** {@inheritDoc} */ @Override - public IgniteTable tableById(int id) { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override - public CompletableFuture<SchemaPlus> actualSchemaAsync(long ver) { + public CompletableFuture<?> actualSchemaAsync(long ver) { throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override - public SchemaPlus activeSchema(@Nullable String name, long timestamp) { + public SchemaPlus latestSchema(@Nullable String name) { throw new UnsupportedOperationException(); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 9098326e46..3b19070984 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -44,7 +44,6 @@ import org.apache.ignite.internal.distributionzones.DistributionZoneManager; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.index.IndexManager; import org.apache.ignite.internal.index.event.IndexEvent; -import org.apache.ignite.internal.index.event.IndexEventParameters; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.manager.Event; @@ -68,8 +67,8 @@ import org.apache.ignite.internal.sql.engine.prepare.PrepareService; import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl; import org.apache.ignite.internal.sql.engine.property.PropertiesHelper; import org.apache.ignite.internal.sql.engine.property.PropertiesHolder; +import org.apache.ignite.internal.sql.engine.schema.CatalogSqlSchemaManager; import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; -import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl; import org.apache.ignite.internal.sql.engine.session.Session; import org.apache.ignite.internal.sql.engine.session.SessionId; import org.apache.ignite.internal.sql.engine.session.SessionInfo; @@ -85,7 +84,6 @@ import org.apache.ignite.internal.sql.engine.util.TypeUtils; import org.apache.ignite.internal.storage.DataStorageManager; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.event.TableEvent; -import org.apache.ignite.internal.table.event.TableEventParameters; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.util.IgniteSpinBusyLock; @@ -95,11 +93,10 @@ import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.lang.SchemaNotFoundException; import org.apache.ignite.network.ClusterService; import org.apache.ignite.sql.SqlException; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** - * SqlQueryProcessor. + * SqlQueryProcessor. * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 */ public class SqlQueryProcessor implements QueryProcessor { @@ -205,15 +202,10 @@ public class SqlQueryProcessor implements QueryProcessor { this.clock = clock; this.catalogManager = catalogManager; - sqlSchemaManager = new SqlSchemaManagerImpl( - tableManager, - schemaManager, - registry, - busyLock + sqlSchemaManager = new CatalogSqlSchemaManager( + catalogManager, + 0 // TOOD fix cache size ); - - registerTableListener(TableEvent.CREATE, new TableCreatedListener((SqlSchemaManagerImpl) sqlSchemaManager)); - registerIndexListener(IndexEvent.CREATE, new IndexCreatedListener((SqlSchemaManagerImpl) sqlSchemaManager)); } /** {@inheritDoc} */ @@ -245,8 +237,6 @@ public class SqlQueryProcessor implements QueryProcessor { msgSrvc )); - ((SqlSchemaManagerImpl) sqlSchemaManager).registerListener(prepareSvc); - this.prepareSvc = prepareSvc; var ddlCommandHandler = new DdlCommandHandlerWrapper( @@ -261,8 +251,6 @@ public class SqlQueryProcessor implements QueryProcessor { var dependencyResolver = new ExecutionDependencyResolverImpl(executableTableRegistry); - ((SqlSchemaManagerImpl) sqlSchemaManager).registerListener(executableTableRegistry); - var executionSrvc = registerService(ExecutionServiceImpl.create( clusterSrvc.topologyService(), msgSrvc, @@ -280,11 +268,6 @@ public class SqlQueryProcessor implements QueryProcessor { this.executionSrvc = executionSrvc; - registerTableListener(TableEvent.ALTER, new TableUpdatedListener(((SqlSchemaManagerImpl) sqlSchemaManager))); - registerTableListener(TableEvent.DROP, new TableDroppedListener(((SqlSchemaManagerImpl) sqlSchemaManager))); - - registerIndexListener(IndexEvent.DROP, new IndexDroppedListener(((SqlSchemaManagerImpl) sqlSchemaManager))); - services.forEach(LifecycleAware::start); } @@ -364,18 +347,6 @@ public class SqlQueryProcessor implements QueryProcessor { return service; } - private void registerTableListener(TableEvent evt, AbstractTableEventListener lsnr) { - evtLsnrs.add(Pair.of(evt, lsnr)); - - tableManager.listen(evt, lsnr); - } - - private void registerIndexListener(IndexEvent evt, AbstractIndexEventListener lsnr) { - evtLsnrs.add(Pair.of(evt, lsnr)); - - indexManager.listen(evt, lsnr); - } - private CompletableFuture<AsyncSqlCursor<List<Object>>> querySingle0( SessionId sessionId, QueryContext context, @@ -490,113 +461,6 @@ public class SqlQueryProcessor implements QueryProcessor { return stage; } - private abstract static class AbstractTableEventListener implements EventListener<TableEventParameters> { - protected final SqlSchemaManagerImpl schemaHolder; - - private AbstractTableEventListener(SqlSchemaManagerImpl schemaHolder) { - this.schemaHolder = schemaHolder; - } - } - - private abstract static class AbstractIndexEventListener implements EventListener<IndexEventParameters> { - protected final SqlSchemaManagerImpl schemaHolder; - - private AbstractIndexEventListener(SqlSchemaManagerImpl schemaHolder) { - this.schemaHolder = schemaHolder; - } - } - - private static class TableCreatedListener extends AbstractTableEventListener { - private TableCreatedListener(SqlSchemaManagerImpl schemaHolder) { - super(schemaHolder); - } - - /** {@inheritDoc} */ - @Override - public CompletableFuture<Boolean> notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) { - return schemaHolder.onTableCreated( - // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas - DEFAULT_SCHEMA_NAME, - parameters.tableId(), - parameters.causalityToken() - ) - .thenApply(v -> false); - } - } - - private static class TableUpdatedListener extends AbstractTableEventListener { - private TableUpdatedListener(SqlSchemaManagerImpl schemaHolder) { - super(schemaHolder); - } - - /** {@inheritDoc} */ - @Override - public CompletableFuture<Boolean> notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) { - return schemaHolder.onTableUpdated( - // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas - DEFAULT_SCHEMA_NAME, - parameters.tableId(), - parameters.causalityToken() - ) - .thenApply(v -> false); - } - } - - private static class TableDroppedListener extends AbstractTableEventListener { - private TableDroppedListener(SqlSchemaManagerImpl schemaHolder) { - super(schemaHolder); - } - - /** {@inheritDoc} */ - @Override - public CompletableFuture<Boolean> notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) { - return schemaHolder.onTableDropped( - // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas - DEFAULT_SCHEMA_NAME, - parameters.tableId(), - parameters.causalityToken() - ) - .thenApply(v -> false); - } - } - - private static class IndexDroppedListener extends AbstractIndexEventListener { - private IndexDroppedListener(SqlSchemaManagerImpl schemaHolder) { - super(schemaHolder); - } - - /** {@inheritDoc} */ - @Override - public CompletableFuture<Boolean> notify(@NotNull IndexEventParameters parameters, @Nullable Throwable exception) { - return schemaHolder.onIndexDropped( - // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas - DEFAULT_SCHEMA_NAME, - parameters.tableId(), - parameters.indexId(), - parameters.causalityToken() - ) - .thenApply(v -> false); - } - } - - private static class IndexCreatedListener extends AbstractIndexEventListener { - private IndexCreatedListener(SqlSchemaManagerImpl schemaHolder) { - super(schemaHolder); - } - - /** {@inheritDoc} */ - @Override - public CompletableFuture<Boolean> notify(@NotNull IndexEventParameters parameters, @Nullable Throwable exception) { - return schemaHolder.onIndexCreated( - parameters.tableId(), - parameters.indexId(), - parameters.indexDescriptor(), - parameters.causalityToken() - ) - .thenApply(v -> false); - } - } - /** Returns {@code true} if this is data modification operation. */ private static boolean dataModificationOp(ParsedResult parsedResult) { return parsedResult.queryType() == SqlQueryType.DML; diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index 50428f0b91..e330ebd4ed 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -81,6 +81,7 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteRel; import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify; import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan; import org.apache.ignite.internal.sql.engine.rel.SourceAwareIgniteRel; +import org.apache.ignite.internal.sql.engine.schema.IgniteCatalogSchema; import org.apache.ignite.internal.sql.engine.schema.IgniteTable; import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; @@ -172,7 +173,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve dependencyResolver, (ctx, deps) -> new LogicalRelImplementor<>( ctx, - new HashFunctionFactoryImpl<>(sqlSchemaManager, handler), + new HashFunctionFactoryImpl<>(ctx.getRootSchema().unwrap(IgniteCatalogSchema.class), handler), mailboxRegistry, exchangeSrvc, deps) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java index 371f885abc..432ad5834c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java @@ -67,13 +67,6 @@ public class CatalogSqlSchemaManager implements SqlSchemaManager { this.cache = Caffeine.newBuilder().maximumSize(cacheSize).<Map.Entry<String, Integer>, SchemaPlus>build().asMap(); } - /** {@inheritDoc} */ - @Override - public SchemaPlus schema(@Nullable String schema) { - // Should be removed -schema(name, version) must be used instead - throw new UnsupportedOperationException(); - } - /** {@inheritDoc} */ @Override public SchemaPlus schema(String name, int version) { @@ -83,13 +76,6 @@ public class CatalogSqlSchemaManager implements SqlSchemaManager { return cache.computeIfAbsent(entry, (e) -> createSqlSchema(e.getValue(), catalogManager.schema(e.getKey(), e.getValue()))); } - /** {@inheritDoc} */ - @Override - public IgniteTable tableById(int id) { - // Should be removed - this method is used to obtain native types from a table. - throw new UnsupportedOperationException(); - } - /** {@inheritDoc} */ @Override public CompletableFuture<?> actualSchemaAsync(long ver) { @@ -98,10 +84,10 @@ public class CatalogSqlSchemaManager implements SqlSchemaManager { /** {@inheritDoc} */ @Override - public SchemaPlus activeSchema(@Nullable String name, long timestamp) { + public SchemaPlus latestSchema(@Nullable String name) { String schemaName = name == null ? DEFAULT_SCHEMA_NAME : name; - int version = catalogManager.activeCatalogVersion(timestamp); + int version = catalogManager.activeCatalogVersion(Long.MAX_VALUE); CatalogSchemaDescriptor descriptor = catalogManager.schema(schemaName, version); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java index e2c1fb6e1b..25eb750e36 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java @@ -20,29 +20,19 @@ package org.apache.ignite.internal.sql.engine.schema; import java.util.concurrent.CompletableFuture; import org.apache.calcite.schema.SchemaPlus; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; /** * Sql schemas operations interface. */ public interface SqlSchemaManager { - /** - * Returns a required schema if specified, or default schema otherwise. - */ - SchemaPlus schema(@Nullable String schema); - /** * Returns schema with given name and by the given version, if name is not specified, returns default schema of the given version. - */ - SchemaPlus schema(@Nullable String name, int version); - - /** - * Returns a table by given id. - * - * @param id An id of required table. * - * @return The table. + * @param schemaName Schema name. If {@code null}, then default schema name will be used. + * @param version Catalog version. */ - IgniteTable tableById(int id); + SchemaPlus schema(@Nullable String schemaName, int version); /** * Wait for {@code ver} schema version, just a stub, need to be removed after IGNITE-18733. @@ -51,7 +41,9 @@ public interface SqlSchemaManager { CompletableFuture<?> actualSchemaAsync(long ver); /** - * Returns a required schema if specified, or default schema otherwise. + * Returns a schema corresponding to the latest Catalog version, which is available on node locally. + * Note: For test purposes only. Method does NOT wait for actual Catalog version in the grid. */ - SchemaPlus activeSchema(@Nullable String name, long timestamp); + @TestOnly + SchemaPlus latestSchema(@Nullable String name); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java deleted file mode 100644 index 1722d86074..0000000000 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java +++ /dev/null @@ -1,584 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.sql.engine.schema; - -import static java.util.concurrent.CompletableFuture.completedFuture; -import static java.util.concurrent.CompletableFuture.failedFuture; -import static org.apache.ignite.internal.sql.engine.SqlQueryProcessor.DEFAULT_SCHEMA_NAME; -import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; -import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; -import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; -import static org.apache.ignite.lang.IgniteStringFormatter.format; - -import it.unimi.dsi.fastutil.ints.IntArrayList; -import it.unimi.dsi.fastutil.ints.IntList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.function.Consumer; -import java.util.function.DoubleSupplier; -import java.util.function.LongFunction; -import java.util.stream.Collectors; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.tools.Frameworks; -import org.apache.ignite.internal.causality.CompletableVersionedValue; -import org.apache.ignite.internal.causality.IncrementalVersionedValue; -import org.apache.ignite.internal.causality.OutdatedTokenException; -import org.apache.ignite.internal.index.HashIndex; -import org.apache.ignite.internal.index.Index; -import org.apache.ignite.internal.index.IndexDescriptor; -import org.apache.ignite.internal.index.SortedIndexDescriptor; -import org.apache.ignite.internal.index.SortedIndexImpl; -import org.apache.ignite.internal.schema.Column; -import org.apache.ignite.internal.schema.DefaultValueProvider; -import org.apache.ignite.internal.schema.DefaultValueProvider.Type; -import org.apache.ignite.internal.schema.SchemaDescriptor; -import org.apache.ignite.internal.schema.SchemaManager; -import org.apache.ignite.internal.schema.SchemaRegistry; -import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; -import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; -import org.apache.ignite.internal.table.InternalTable; -import org.apache.ignite.internal.table.TableImpl; -import org.apache.ignite.internal.table.distributed.TableManager; -import org.apache.ignite.internal.util.IgniteSpinBusyLock; -import org.apache.ignite.lang.IgniteInternalException; -import org.apache.ignite.lang.NodeStoppingException; -import org.jetbrains.annotations.Nullable; - -/** - * Holds actual schema and mutates it on schema change, requested by Ignite. - */ -public class SqlSchemaManagerImpl implements SqlSchemaManager { - private final IncrementalVersionedValue<Map<String, IgniteSchema>> schemasVv; - - private final IncrementalVersionedValue<Map<Integer, IgniteTable>> tablesVv; - - private final Map<Integer, CompletableFuture<?>> pkIdxReady = new ConcurrentHashMap<>(); - - private final IncrementalVersionedValue<Map<Integer, IgniteIndex>> indicesVv; - - private final TableManager tableManager; - private final SchemaManager schemaManager; - - private final CompletableVersionedValue<SchemaPlus> calciteSchemaVv; - - private final Set<SchemaUpdateListener> listeners = new CopyOnWriteArraySet<>(); - - /** Busy lock for stop synchronisation. */ - private final IgniteSpinBusyLock busyLock; - - /** - * Constructor. - * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 - */ - public SqlSchemaManagerImpl( - TableManager tableManager, - SchemaManager schemaManager, - Consumer<LongFunction<CompletableFuture<?>>> registry, - IgniteSpinBusyLock busyLock - ) { - this.tableManager = tableManager; - this.schemaManager = schemaManager; - - schemasVv = new IncrementalVersionedValue<>(registry, HashMap::new); - tablesVv = new IncrementalVersionedValue<>(registry, HashMap::new); - indicesVv = new IncrementalVersionedValue<>(registry, HashMap::new); - this.busyLock = busyLock; - - calciteSchemaVv = new CompletableVersionedValue<>(() -> { - SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false); - newCalciteSchema.add(DEFAULT_SCHEMA_NAME, new IgniteSchema(DEFAULT_SCHEMA_NAME)); - return newCalciteSchema; - }); - - schemasVv.whenComplete((token, stringIgniteSchemaMap, throwable) -> { - if (!busyLock.enterBusy()) { - calciteSchemaVv.completeExceptionally(token, new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException())); - - return; - } - try { - if (throwable != null) { - calciteSchemaVv.completeExceptionally( - token, - new IgniteInternalException( - INTERNAL_ERR, "Couldn't evaluate sql schemas for causality token: " + token, throwable) - ); - - return; - } - - SchemaPlus newCalciteSchema = rebuild(stringIgniteSchemaMap); - - listeners.forEach(SchemaUpdateListener::onSchemaUpdated); - - calciteSchemaVv.complete(token, newCalciteSchema); - } finally { - busyLock.leaveBusy(); - } - }); - } - - /** {@inheritDoc} */ - @Override - public SchemaPlus schema(@Nullable String schema) { - // stub for waiting pk indexes, more clear place is IgniteSchema - CompletableFuture.allOf(pkIdxReady.values().toArray(CompletableFuture[]::new)).join(); - - SchemaPlus schemaPlus = calciteSchemaVv.latest(); - - return schema != null ? schemaPlus.getSubSchema(schema) : schemaPlus.getSubSchema(DEFAULT_SCHEMA_NAME); - } - - /** {@inheritDoc} */ - @Override - public SchemaPlus schema(String name, int version) { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override - public CompletableFuture<?> actualSchemaAsync(long ver) { - if (!busyLock.enterBusy()) { - throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()); - } - try { - if (ver == IgniteSchema.INITIAL_VERSION) { - return completedFuture(calciteSchemaVv.latest()); - } - - CompletableFuture<SchemaPlus> lastSchemaFut; - - try { - lastSchemaFut = calciteSchemaVv.get(ver); - } catch (OutdatedTokenException e) { - return completedFuture(null); - } - - return lastSchemaFut; - } finally { - busyLock.leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override - public SchemaPlus activeSchema(@Nullable String name, long timestamp) { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override - public IgniteTable tableById(int id) { - if (!busyLock.enterBusy()) { - throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()); - } - try { - IgniteTable table = tablesVv.latest().get(id); - - if (table == null) { - throw new IgniteInternalException(INTERNAL_ERR, - format("Table not found [tableId={}]", id)); - } - - return table; - } finally { - busyLock.leaveBusy(); - } - } - - public void registerListener(SchemaUpdateListener listener) { - listeners.add(listener); - } - - /** - * OnSqlTypeCreated. - * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 - */ - public CompletableFuture<?> onTableCreated(String schemaName, int tableId, long causalityToken) { - if (!busyLock.enterBusy()) { - return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException())); - } - - try { - pkIdxReady.computeIfAbsent(tableId, k -> new CompletableFuture<>()); - - CompletableFuture<Map<Integer, IgniteTable>> updatedTables = tablesVv.update(causalityToken, (tables, e) -> - inBusyLock(busyLock, () -> { - if (e != null) { - return failedFuture(e); - } - - return tableManager.tableAsync(causalityToken, tableId) - .thenCompose(table -> convert(causalityToken, table)) - .thenApply(igniteTable -> { - Map<Integer, IgniteTable> resTbls = new HashMap<>(tables); - - IgniteTable oldTable = resTbls.put(igniteTable.id(), igniteTable); - - // looks like this is UPDATE operation - if (oldTable != null) { - for (var index : oldTable.indexes().values()) { - igniteTable.addIndex(index); - } - } - - return resTbls; - }); - }) - ); - - schemasVv.update(causalityToken, (schemas, e) -> inBusyLock(busyLock, () -> { - if (e != null) { - return failedFuture(e); - } - - return updatedTables.thenApply(tables -> { - IgniteTable igniteTable = tables.get(tableId); - - Map<String, IgniteSchema> res = new HashMap<>(schemas); - - IgniteSchema schema = res.compute(schemaName, - (k, v) -> v == null ? new IgniteSchema(schemaName, causalityToken) : IgniteSchema.copy(v, causalityToken)); - - schema.addTable(igniteTable); - - return res; - }); - })); - - // calciteSchemaVv depends on all other Versioned Values and is completed last. - return calciteSchemaVv.get(causalityToken); - } finally { - busyLock.leaveBusy(); - } - } - - /** - * OnSqlTypeUpdated. - * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 - */ - public CompletableFuture<?> onTableUpdated(String schemaName, int tableId, long causalityToken) { - return onTableCreated(schemaName, tableId, causalityToken); - } - - /** - * OnSqlTypeDropped. - * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 - */ - public CompletableFuture<?> onTableDropped(String schemaName, int tableId, long causalityToken) { - if (!busyLock.enterBusy()) { - return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException())); - } - - try { - CompletableFuture<IgniteTable> removedTableFuture = new CompletableFuture<>(); - - tablesVv.update(causalityToken, (tables, e) -> inBusyLock(busyLock, () -> { - if (e != null) { - removedTableFuture.completeExceptionally(e); - - return failedFuture(e); - } - - Map<Integer, IgniteTable> resTbls = new HashMap<>(tables); - - IgniteTable removedTable = resTbls.remove(tableId); - - removedTableFuture.complete(removedTable); - - return completedFuture(resTbls); - })); - - schemasVv.update(causalityToken, (schemas, e) -> inBusyLock(busyLock, () -> { - if (e != null) { - return failedFuture(e); - } - - return removedTableFuture.thenApply(table -> { - if (table == null) { - return schemas; - } - - Map<String, IgniteSchema> res = new HashMap<>(schemas); - - IgniteSchema schema = res.compute(schemaName, - (k, v) -> v == null ? new IgniteSchema(schemaName, causalityToken) : IgniteSchema.copy(v, causalityToken)); - - schema.removeTable(table.name()); - - pkIdxReady.remove(table.id()); - - return res; - }); - })); - - // calciteSchemaVv depends on all other Versioned Values and is completed last. - return calciteSchemaVv.get(causalityToken); - } finally { - busyLock.leaveBusy(); - } - } - - /** - * Rebuilds Calcite schemas. - * - * @param schemas Ignite schemas. - */ - private SchemaPlus rebuild(Map<String, IgniteSchema> schemas) { - SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false); - - newCalciteSchema.add(DEFAULT_SCHEMA_NAME, new IgniteSchema(DEFAULT_SCHEMA_NAME)); - - schemas.forEach(newCalciteSchema::add); - - return newCalciteSchema; - } - - private CompletableFuture<IgniteTableImpl> convert(long causalityToken, TableImpl table) { - return schemaManager.schemaRegistry(causalityToken, table.tableId()) - .thenApply(schemaRegistry -> inBusyLock(busyLock, () -> convert(table, schemaRegistry, causalityToken))); - } - - private IgniteTableImpl convert(TableImpl table, SchemaRegistry schemaRegistry, long schemaVersion) { - SchemaDescriptor descriptor = schemaRegistry.schema(); - - List<ColumnDescriptor> colDescriptors = descriptor.columnNames().stream() - .map(descriptor::column) - .sorted(Comparator.comparingInt(Column::columnOrder)) - .map(col -> new ColumnDescriptorImpl( - col.name(), - descriptor.isKeyColumn(col.schemaIndex()), - col.nullable(), - col.columnOrder(), - col.schemaIndex(), - col.type(), - convertDefaultValueProvider(col.defaultValueProvider()), - col::defaultValue - )) - .collect(Collectors.toList()); - - IntList colocationColumns = new IntArrayList(); - - for (Column column : descriptor.colocationColumns()) { - colocationColumns.add(column.columnOrder()); - } - - // TODO Use the actual zone ID after implementing https://issues.apache.org/jira/browse/IGNITE-18426. - IgniteDistribution distribution = IgniteDistributions.affinity(colocationColumns, table.tableId(), table.tableId()); - - InternalTable internalTable = table.internalTable(); - DoubleSupplier rowCount = IgniteTableImpl.rowCountStatistic(internalTable); - - return new IgniteTableImpl( - new TableDescriptorImpl(colDescriptors, distribution), - internalTable.tableId(), - internalTable.name(), - schemaRegistry.lastSchemaVersion(), - rowCount - ); - } - - private DefaultValueStrategy convertDefaultValueProvider(DefaultValueProvider defaultValueProvider) { - return defaultValueProvider.type() == Type.CONSTANT - ? DefaultValueStrategy.DEFAULT_CONSTANT - : DefaultValueStrategy.DEFAULT_COMPUTED; - } - - /** - * Index created callback method register index in Calcite schema. - * - * @param tableId Table ID. - * @param indexId Index ID. - * @param causalityToken Causality token. - * @return Schema registration future. - */ - public CompletableFuture<?> onIndexCreated(int tableId, int indexId, IndexDescriptor indexDescriptor, long causalityToken) { - if (!busyLock.enterBusy()) { - return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException())); - } - - try { - CompletableFuture<Map<Integer, IgniteIndex>> updatedIndices = indicesVv.update(causalityToken, (indices, e) -> - inBusyLock(busyLock, () -> { - if (e != null) { - return failedFuture(e); - } - - return tableManager.tableAsync(causalityToken, tableId).thenApply(table -> { - var igniteIndex = new IgniteIndex(newIndex(table, indexId, indexDescriptor)); - - Map<Integer, IgniteIndex> resIdxs = new HashMap<>(indices); - - resIdxs.put(indexId, igniteIndex); - - return resIdxs; - }); - })); - - CompletableFuture<Map<Integer, IgniteTable>> updatedTables = tablesVv.update(causalityToken, (tables, e) -> - inBusyLock(busyLock, () -> { - if (e != null) { - return failedFuture(e); - } - - return updatedIndices.thenApply(indices -> { - IgniteIndex igniteIndex = indices.get(indexId); - - Map<Integer, IgniteTable> resTbls = new HashMap<>(tables); - - IgniteTable igniteTable = resTbls.computeIfPresent(tableId, - (k, v) -> IgniteTableImpl.copyOf((IgniteTableImpl) v)); - - assert igniteTable != null : "Table " + tableId + " was not found"; - - igniteTable.addIndex(igniteIndex); - - return resTbls; - }); - })); - - schemasVv.update(causalityToken, (schemas, e) -> inBusyLock(busyLock, () -> { - if (e != null) { - return failedFuture(e); - } - - return updatedTables.thenCombine(updatedIndices, (tables, indices) -> inBusyLock(busyLock, () -> { - Map<String, IgniteSchema> res = new HashMap<>(schemas); - - IgniteSchema schema = res.compute(DEFAULT_SCHEMA_NAME, - (k, v) -> v == null ? new IgniteSchema(k, causalityToken) : IgniteSchema.copy(v, causalityToken)); - - schema.addTable(tables.get(tableId)); - - schema.addIndex(indexId, indices.get(indexId)); - - return res; - })); - })); - - // this stub is necessary for observing pk index creation. - schemasVv.whenComplete((token, stringIgniteSchemaMap, throwable) -> { - CompletableFuture<?> pkFut = pkIdxReady.get(tableId); - // this listener is called repeatedly on node stop. - if (pkFut != null) { - pkFut.complete(null); - } - }); - - return calciteSchemaVv.get(causalityToken); - } finally { - busyLock.leaveBusy(); - } - } - - private static Index<?> newIndex(TableImpl table, int indexId, IndexDescriptor descriptor) { - if (descriptor instanceof SortedIndexDescriptor) { - return new SortedIndexImpl(indexId, table.internalTable(), (SortedIndexDescriptor) descriptor); - } else { - return new HashIndex(indexId, table.internalTable(), descriptor); - } - } - - /** - * Index dropped callback method deregisters index from Calcite schema. - * - * @param schemaName Schema name. - * @param indexId Index id. - * @param causalityToken Causality token. - * @return Schema registration future. - */ - public CompletableFuture<?> onIndexDropped(String schemaName, int tableId, int indexId, long causalityToken) { - if (!busyLock.enterBusy()) { - return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException())); - } - - try { - CompletableFuture<IgniteIndex> removedIndexFuture = new CompletableFuture<>(); - - indicesVv.update(causalityToken, (indices, e) -> inBusyLock(busyLock, () -> { - if (e != null) { - removedIndexFuture.completeExceptionally(e); - - return failedFuture(e); - } - - Map<Integer, IgniteIndex> resIdxs = new HashMap<>(indices); - - IgniteIndex rmvIdx = resIdxs.remove(indexId); - - removedIndexFuture.complete(rmvIdx); - - return completedFuture(resIdxs); - })); - - CompletableFuture<Map<Integer, IgniteTable>> updatedTables = tablesVv.update(causalityToken, (tables, e) -> - inBusyLock(busyLock, () -> { - if (e != null) { - return failedFuture(e); - } - - Map<Integer, IgniteTable> resTbls = new HashMap<>(tables); - - IgniteTable table = resTbls.computeIfPresent(tableId, (k, v) -> IgniteTableImpl.copyOf((IgniteTableImpl) v)); - - if (table == null) { - return completedFuture(resTbls); - } else { - return removedIndexFuture.thenApply(rmvIndex -> { - table.removeIndex(rmvIndex.name()); - - return resTbls; - }); - } - })); - - schemasVv.update(causalityToken, (schemas, e) -> inBusyLock(busyLock, () -> { - if (e != null) { - return failedFuture(e); - } - - Map<String, IgniteSchema> res = new HashMap<>(schemas); - - IgniteSchema schema = res.compute(schemaName, - (k, v) -> v == null ? new IgniteSchema(schemaName, causalityToken) : IgniteSchema.copy(v, causalityToken)); - - schema.removeIndex(indexId); - - return updatedTables.thenApply(tables -> { - IgniteTable table = tables.get(tableId); - - if (table != null) { - schema.addTable(tables.get(tableId)); - } - - return res; - }); - })); - - return calciteSchemaVv.get(causalityToken); - } finally { - busyLock.leaveBusy(); - } - } -} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java index c6a8cc0743..37cae7bd55 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java @@ -52,7 +52,7 @@ import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.sql.engine.QueryCancel; import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory; import org.apache.ignite.internal.sql.engine.rex.IgniteRexBuilder; -import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; +import org.apache.ignite.internal.sql.engine.schema.IgniteCatalogSchema; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; import org.apache.ignite.internal.util.ArrayUtils; @@ -222,7 +222,7 @@ public final class BaseQueryContext extends AbstractQueryContext { } public long schemaVersion() { - return Objects.requireNonNull(schema().unwrap(IgniteSchema.class)).schemaVersion(); + return Objects.requireNonNull(schema().unwrap(IgniteCatalogSchema.class)).version(); } /** diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java index 4ad5484257..6eac606773 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java @@ -26,7 +26,8 @@ import org.apache.ignite.internal.schema.NativeType; import org.apache.ignite.internal.schema.NativeTypeSpec; import org.apache.ignite.internal.sql.engine.exec.RowHandler; import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor; -import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; +import org.apache.ignite.internal.sql.engine.schema.IgniteCatalogSchema; +import org.apache.ignite.internal.sql.engine.schema.IgniteTable; import org.apache.ignite.internal.sql.engine.schema.TableDescriptor; import org.apache.ignite.internal.util.ColocationUtils; import org.apache.ignite.internal.util.HashCalculator; @@ -35,11 +36,11 @@ import org.apache.ignite.internal.util.HashCalculator; * Factory for creating a function to calculate the hash of the specified fields of a row. */ public class HashFunctionFactoryImpl<T> implements HashFunctionFactory<T> { - private final SqlSchemaManager sqlSchemaManager; private final RowHandler<T> rowHandler; + private final IgniteCatalogSchema schema; - public HashFunctionFactoryImpl(SqlSchemaManager sqlSchemaManager, RowHandler<T> rowHandler) { - this.sqlSchemaManager = sqlSchemaManager; + public HashFunctionFactoryImpl(IgniteCatalogSchema schema, RowHandler<T> rowHandler) { + this.schema = schema; this.rowHandler = rowHandler; } @@ -48,7 +49,15 @@ public class HashFunctionFactoryImpl<T> implements HashFunctionFactory<T> { public RowHashFunction<T> create(int[] fields, int tableId) { int fieldCnt = fields.length; NativeType[] fieldTypes = new NativeType[fieldCnt]; - TableDescriptor tblDesc = sqlSchemaManager.tableById(tableId).descriptor(); + //TODO: optimize this + TableDescriptor tblDesc = schema.getTableNames().stream() + .map(schema::getTable) + .map(IgniteTable.class::cast) + .filter(t -> t.id() == tableId) + .findFirst() + .orElseThrow(() -> new AssertionError("No table found: tableId=" + tableId)) + .descriptor(); + ImmutableIntList colocationColumns = tblDesc.distribution().getKeys(); assert colocationColumns.size() == fieldCnt : "fieldsCount=" + fieldCnt + ", colocationColumns=" + colocationColumns; diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java index ec61eca2fb..627c2e81a3 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java @@ -89,6 +89,7 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan; import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor; import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptorImpl; import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy; +import org.apache.ignite.internal.sql.engine.schema.IgniteCatalogSchema; import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl; @@ -590,10 +591,6 @@ public class ExecutionServiceImplTest { when(topologyService.localMember()).thenReturn(clusterNode); - when(schemaManagerMock.tableById(anyInt())).thenReturn(table); - - when(schemaManagerMock.actualSchemaAsync(isA(long.class))).thenReturn(CompletableFuture.completedFuture(null)); - TestExecutableTableRegistry executableTableRegistry = new TestExecutableTableRegistry(); executableTableRegistry.setColocatioGroupProvider((tableId) -> { // Make sure the exception is handled properly if it occurs during the mapping phase. @@ -785,7 +782,7 @@ public class ExecutionServiceImplTest { MailboxRegistry mailboxRegistry, ExchangeService exchangeService, ResolvedDependencies deps) { - HashFunctionFactory<Object[]> funcFactory = new HashFunctionFactoryImpl<>(mock(SqlSchemaManager.class), ctx.rowHandler()); + HashFunctionFactory<Object[]> funcFactory = new HashFunctionFactoryImpl<>(ctx.getRootSchema().unwrap(IgniteCatalogSchema.class), ctx.rowHandler()); return new LogicalRelImplementor<>(ctx, funcFactory, mailboxRegistry, exchangeService, deps) { @Override diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java deleted file mode 100644 index eb63c18ebe..0000000000 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java +++ /dev/null @@ -1,329 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.sql.engine.exec.schema; - -import static java.util.concurrent.CompletableFuture.allOf; -import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNotSame; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertSame; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import java.util.function.LongFunction; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.Table; -import org.apache.ignite.internal.index.Index; -import org.apache.ignite.internal.index.IndexDescriptor; -import org.apache.ignite.internal.schema.Column; -import org.apache.ignite.internal.schema.NativeTypes; -import org.apache.ignite.internal.schema.SchemaDescriptor; -import org.apache.ignite.internal.schema.SchemaManager; -import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl; -import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestHashIndex; -import org.apache.ignite.internal.sql.engine.schema.IgniteIndex; -import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; -import org.apache.ignite.internal.sql.engine.schema.IgniteTable; -import org.apache.ignite.internal.sql.engine.schema.IgniteTableImpl; -import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl; -import org.apache.ignite.internal.table.InternalTable; -import org.apache.ignite.internal.table.TableImpl; -import org.apache.ignite.internal.table.distributed.TableManager; -import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; -import org.apache.ignite.internal.util.IgniteSpinBusyLock; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; - -/** - * Tests to verify {@link SqlSchemaManagerImpl}. - */ -@ExtendWith(MockitoExtension.class) -public class SqlSchemaManagerTest extends BaseIgniteAbstractTest { - private final int tableId = 1; - - private final int indexId = 2; - - private final SchemaDescriptor schemaDescriptor = new SchemaDescriptor( - 1, - new Column[]{new Column(0, "ID", NativeTypes.INT64, false)}, - new Column[]{new Column(1, "VAL", NativeTypes.INT64, false)} - ); - - @Mock - private TableManager tableManager; - - @Mock - private SchemaManager schemaManager; - - @Mock - private TableImpl table; - - @Mock - private Index<IndexDescriptor> index; - - @Mock - private SchemaRegistryImpl schemaRegistry; - - private SqlSchemaManagerImpl sqlSchemaManager; - - private TestRevisionRegister testRevisionRegister; - - /** Busy lock for stop synchronisation. */ - private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); - - @BeforeEach - public void setup() { - Mockito.reset(tableManager); - - when(tableManager.tableAsync(anyLong(), eq(tableId))).thenReturn(completedFuture(table)); - - testRevisionRegister = new TestRevisionRegister(); - - sqlSchemaManager = new SqlSchemaManagerImpl( - tableManager, - schemaManager, - testRevisionRegister, - busyLock - ); - - testRevisionRegister.moveForward(); - } - - @Test - public void testOnTableDroppedHandler() { - InternalTable mock = mock(InternalTable.class); - when(mock.tableId()).thenReturn(tableId); - when(mock.name()).thenReturn("T"); - - when(table.internalTable()).thenReturn(mock); - when(table.tableId()).thenReturn(tableId); - when(schemaRegistry.schema()).thenReturn(schemaDescriptor); - when(schemaRegistry.lastSchemaVersion()).thenReturn(schemaDescriptor.version()); - - when(schemaManager.schemaRegistry(anyLong(), anyInt())).thenReturn(completedFuture(schemaRegistry)); - - sqlSchemaManager.onTableCreated("PUBLIC", tableId, testRevisionRegister.actualToken() + 1); - testRevisionRegister.moveForward(); - - TestHashIndex testHashIndex = TestHashIndex.create(List.of("ID"), "pk_idx", tableId); - - sqlSchemaManager.onIndexCreated( - testHashIndex.tableId(), - testHashIndex.id(), - testHashIndex.descriptor(), - testRevisionRegister.actualToken() + 1 - ); - - testRevisionRegister.moveForward(); - - Table schemaTable = sqlSchemaManager.schema("PUBLIC").getTable("T"); - - assertNotNull(schemaTable); - IgniteTableImpl igniteTable = assertInstanceOf(IgniteTableImpl.class, schemaTable); - assertEquals(tableId, igniteTable.id()); - - sqlSchemaManager.onTableDropped("PUBLIC", tableId, testRevisionRegister.actualToken() + 1); - testRevisionRegister.moveForward(); - - assertNull(sqlSchemaManager.schema("PUBLIC").getTable("T")); - } - - @Test - public void testIndexEventHandler() { - InternalTable mock = mock(InternalTable.class); - when(mock.tableId()).thenReturn(tableId); - when(mock.name()).thenReturn("T"); - - when(table.internalTable()).thenReturn(mock); - when(table.tableId()).thenReturn(tableId); - when(schemaRegistry.schema()).thenReturn(schemaDescriptor); - when(schemaRegistry.lastSchemaVersion()).thenReturn(schemaDescriptor.version()); - when(schemaManager.schemaRegistry(anyLong(), anyInt())).thenReturn(completedFuture(schemaRegistry)); - - sqlSchemaManager.onTableCreated("PUBLIC", tableId, testRevisionRegister.actualToken() + 1); - testRevisionRegister.moveForward(); - - TestHashIndex testHashIndex = TestHashIndex.create(List.of("ID"), "pk_idx", tableId); - - sqlSchemaManager.onIndexCreated( - testHashIndex.tableId(), - testHashIndex.id(), - testHashIndex.descriptor(), - testRevisionRegister.actualToken() + 1 - ); - - testRevisionRegister.moveForward(); - - assertEquals(1, ((IgniteTableImpl) sqlSchemaManager.schema("PUBLIC").getTable("T")).indexes().size()); - - IndexDescriptor descMock = mock(IndexDescriptor.class); - when(descMock.columns()).thenReturn(List.of()); - when(descMock.name()).thenReturn("PUBLIC.I"); - - sqlSchemaManager.onIndexCreated(tableId, indexId, descMock, testRevisionRegister.actualToken() + 1); - - testRevisionRegister.moveForward(); - - IgniteSchema schema = sqlSchemaManager.schema("PUBLIC").unwrap(IgniteSchema.class); - Table schemaTable = schema.getTable("T"); - IgniteIndex igniteIndex = schema.index(indexId); - - assertNotNull(igniteIndex); - - IgniteTableImpl igniteTable = assertInstanceOf(IgniteTableImpl.class, schemaTable); - - assertEquals(igniteTable.id(), igniteIndex.tableId()); - assertSame(igniteIndex, igniteTable.indexes().get("PUBLIC.I")); - - sqlSchemaManager.onIndexDropped("PUBLIC", igniteTable.id(), indexId, testRevisionRegister.actualToken() + 1); - testRevisionRegister.moveForward(); - - assertNull(sqlSchemaManager.schema("PUBLIC").unwrap(IgniteSchema.class).index(indexId)); - - verifyNoMoreInteractions(tableManager); - } - - - @Test - public void testIndexEventsProcessed() { - InternalTable mock = mock(InternalTable.class); - when(mock.tableId()).thenReturn(tableId); - when(mock.name()).thenReturn("T"); - - when(table.internalTable()).thenReturn(mock); - when(table.tableId()).thenReturn(tableId); - when(schemaRegistry.schema()).thenReturn(schemaDescriptor); - when(schemaRegistry.lastSchemaVersion()).thenReturn(schemaDescriptor.version()); - when(schemaManager.schemaRegistry(anyLong(), anyInt())).thenReturn(completedFuture(schemaRegistry)); - - sqlSchemaManager.onTableCreated("PUBLIC", table.tableId(), testRevisionRegister.actualToken() + 1); - testRevisionRegister.moveForward(); - - TestHashIndex testHashIndex = TestHashIndex.create(List.of("ID"), "pk_idx", tableId); - - sqlSchemaManager.onIndexCreated( - testHashIndex.tableId(), - testHashIndex.id(), - testHashIndex.descriptor(), - testRevisionRegister.actualToken() + 1 - ); - - testRevisionRegister.moveForward(); - - String idxName = "I"; - - IndexDescriptor descMock = mock(IndexDescriptor.class); - when(descMock.columns()).thenReturn(List.of()); - when(descMock.name()).thenReturn(idxName); - - { - SchemaPlus schema1 = sqlSchemaManager.schema("PUBLIC"); - - sqlSchemaManager.onIndexCreated(tableId, indexId, descMock, testRevisionRegister.actualToken() + 1); - testRevisionRegister.moveForward(); - - SchemaPlus schema2 = sqlSchemaManager.schema("PUBLIC"); - - // Validate schema snapshot. - assertNotSame(schema1, schema2); - assertNotSame(schema1.getTable("T"), schema2.getTable("T")); - - assertNull(schema1.unwrap(IgniteSchema.class).index(indexId)); - assertNotNull(schema2.unwrap(IgniteSchema.class).index(indexId)); - - assertNull(((IgniteTable) schema1.getTable("T")).getIndex(idxName)); - assertNotNull(((IgniteTable) schema2.getTable("T")).getIndex(idxName)); - } - { - sqlSchemaManager.onIndexDropped("PUBLIC", table.tableId(), indexId, testRevisionRegister.actualToken() + 1); - SchemaPlus schema1 = sqlSchemaManager.schema("PUBLIC"); - testRevisionRegister.moveForward(); - - SchemaPlus schema2 = sqlSchemaManager.schema("PUBLIC"); - - // Validate schema snapshot. - assertNotSame(schema1, schema2); - assertNotSame(schema1.getTable("T"), schema2.getTable("T")); - - assertNotNull(schema1.unwrap(IgniteSchema.class).index(indexId)); - assertNull(schema2.unwrap(IgniteSchema.class).index(indexId)); - - assertNull(((IgniteTable) schema2.getTable("T")).getIndex(idxName)); - assertNotNull(((IgniteTable) schema1.getTable("T")).getIndex(idxName)); - } - - verifyNoMoreInteractions(tableManager); - } - - /** - * Test revision register. - */ - private static class TestRevisionRegister implements Consumer<LongFunction<CompletableFuture<?>>> { - AtomicLong token = new AtomicLong(-1); - - /** Revision consumer. */ - private LongFunction<CompletableFuture<?>> moveRevision; - - /** - * Moves forward token. - */ - void moveForward() { - await(moveRevision.apply(token.incrementAndGet())); - } - - /** - * Gets an actual token. - * - * @return Actual token. - */ - long actualToken() { - return token.get(); - } - - /** {@inheritDoc} */ - @Override - public void accept(LongFunction<CompletableFuture<?>> function) { - if (moveRevision == null) { - moveRevision = function; - } else { - LongFunction<CompletableFuture<?>> old = moveRevision; - - moveRevision = rev -> allOf( - old.apply(rev), - function.apply(rev) - ); - } - } - } -} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java index 73ddc9098b..582087733d 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.sql.engine.framework; +import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME; + import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -66,12 +68,6 @@ public class PredefinedSchemaManager implements SqlSchemaManager { } } - /** {@inheritDoc} */ - @Override - public SchemaPlus schema(@Nullable String schema) { - return schema == null ? root : root.getSubSchema(schema); - } - /** {@inheritDoc} */ @Override public SchemaPlus schema(String name, int version) { @@ -86,13 +82,11 @@ public class PredefinedSchemaManager implements SqlSchemaManager { /** {@inheritDoc} */ @Override - public SchemaPlus activeSchema(@Nullable String name, long timestamp) { + public SchemaPlus latestSchema(@Nullable String name) { return schema(name); } - /** {@inheritDoc} */ - @Override - public IgniteTable tableById(int id) { - return tableById.get(id); + private SchemaPlus schema(@Nullable String schemaName) { + return root.getSubSchema(schemaName == null ? DEFAULT_SCHEMA_NAME : schemaName); } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java index 88f270c581..031f7a8ac3 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java @@ -27,7 +27,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; -import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.tools.Frameworks; import org.apache.ignite.internal.sql.engine.AsyncCursor; import org.apache.ignite.internal.sql.engine.QueryCancel; @@ -59,6 +58,7 @@ import org.apache.ignite.internal.sql.engine.prepare.QueryPlan; import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter; import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan; import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan; +import org.apache.ignite.internal.sql.engine.schema.IgniteCatalogSchema; import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; import org.apache.ignite.internal.sql.engine.sql.ParsedResult; import org.apache.ignite.internal.sql.engine.sql.ParserService; @@ -131,7 +131,7 @@ public class TestNode implements LifecycleAware { dependencyResolver, (ctx, deps) -> new LogicalRelImplementor<Object[]>( ctx, - new HashFunctionFactoryImpl<>(schemaManager, rowHandler), + new HashFunctionFactoryImpl<>(ctx.getRootSchema().unwrap(IgniteCatalogSchema.class), rowHandler), mailboxRegistry, exchangeService, deps diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java index 18b64e3644..0fa0228799 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java @@ -99,7 +99,7 @@ public class CatalogSqlSchemaManagerTest { testSchema.init(catalogManager); SqlSchemaManager sqlSchemaManager = newSchemaManager(); - SchemaPlus schemaPlus = sqlSchemaManager.activeSchema(testSchema.name, testSchema.timestamp); + SchemaPlus schemaPlus = sqlSchemaManager.latestSchema(testSchema.name); IgniteCatalogSchema schema = unwrapSchema(schemaPlus); assertEquals(testSchema.name, schema.getName()); @@ -156,7 +156,7 @@ public class CatalogSqlSchemaManagerTest { testSchema.init(catalogManager); SqlSchemaManager sqlSchemaManager = newSchemaManager(); - SchemaPlus schemaPlus = sqlSchemaManager.activeSchema(testSchema.name, testSchema.timestamp); + SchemaPlus schemaPlus = sqlSchemaManager.latestSchema(testSchema.name); IgniteCatalogSchema schema = unwrapSchema(schemaPlus); IgniteSchemaTable table = getTable(schema, testTable); @@ -197,7 +197,7 @@ public class CatalogSqlSchemaManagerTest { SqlSchemaManager sqlSchemaManager = newSchemaManager(); { - SchemaPlus schemaPlus = sqlSchemaManager.activeSchema(testSchema.name, testSchema.timestamp); + SchemaPlus schemaPlus = sqlSchemaManager.latestSchema(testSchema.name); IgniteCatalogSchema schema = unwrapSchema(schemaPlus); assertEquals(DEFAULT_SCHEMA_NAME, schema.getName()); @@ -205,7 +205,7 @@ public class CatalogSqlSchemaManagerTest { } { - SchemaPlus schemaPlus = sqlSchemaManager.activeSchema(null, testSchema.timestamp); + SchemaPlus schemaPlus = sqlSchemaManager.latestSchema(null); IgniteCatalogSchema schema = unwrapSchema(schemaPlus); assertEquals(DEFAULT_SCHEMA_NAME, schema.getName()); @@ -234,7 +234,7 @@ public class CatalogSqlSchemaManagerTest { testSchema.init(catalogManager); SqlSchemaManager sqlSchemaManager = newSchemaManager(); - SchemaPlus schemaPlus = sqlSchemaManager.activeSchema(testSchema.name, testSchema.timestamp); + SchemaPlus schemaPlus = sqlSchemaManager.latestSchema(testSchema.name); IgniteCatalogSchema schema = unwrapSchema(schemaPlus); IgniteTable table = getTable(schema, testTable); @@ -274,7 +274,7 @@ public class CatalogSqlSchemaManagerTest { testSchema.init(catalogManager); SqlSchemaManager sqlSchemaManager = newSchemaManager(); - SchemaPlus schemaPlus = sqlSchemaManager.activeSchema(testSchema.name, testSchema.timestamp); + SchemaPlus schemaPlus = sqlSchemaManager.latestSchema(testSchema.name); IgniteCatalogSchema schema = unwrapSchema(schemaPlus); IgniteTable table = (IgniteTable) schema.getTable(testTable.name); @@ -302,7 +302,7 @@ public class CatalogSqlSchemaManagerTest { testSchema.init(catalogManager); SqlSchemaManager sqlSchemaManager = newSchemaManager(); - SchemaPlus schemaPlus = sqlSchemaManager.activeSchema(testSchema.name, testSchema.timestamp); + SchemaPlus schemaPlus = sqlSchemaManager.latestSchema(testSchema.name); IgniteCatalogSchema schema = unwrapSchema(schemaPlus); IgniteTable table = (IgniteTable) schema.getTable(testTable.name); @@ -336,7 +336,7 @@ public class CatalogSqlSchemaManagerTest { testSchema.init(catalogManager); SqlSchemaManager sqlSchemaManager = newSchemaManager(); - SchemaPlus schemaPlus = sqlSchemaManager.activeSchema(testSchema.name, testSchema.timestamp); + SchemaPlus schemaPlus = sqlSchemaManager.latestSchema(testSchema.name); IgniteCatalogSchema schema = unwrapSchema(schemaPlus); IgniteTable table = (IgniteTable) schema.getTable(testTable.name); @@ -369,7 +369,7 @@ public class CatalogSqlSchemaManagerTest { testSchema.init(catalogManager); SqlSchemaManager sqlSchemaManager = newSchemaManager(); - SchemaPlus schemaPlus = sqlSchemaManager.activeSchema(testSchema.name, testSchema.timestamp); + SchemaPlus schemaPlus = sqlSchemaManager.latestSchema(testSchema.name); IgniteCatalogSchema schema = unwrapSchema(schemaPlus); IgniteSchemaTable table = (IgniteSchemaTable) schema.getTable(testTable.name); @@ -413,7 +413,7 @@ public class CatalogSqlSchemaManagerTest { testSchema.init(catalogManager); SqlSchemaManager sqlSchemaManager = newSchemaManager(); - SchemaPlus schemaPlus = sqlSchemaManager.activeSchema(testSchema.name, testSchema.timestamp); + SchemaPlus schemaPlus = sqlSchemaManager.latestSchema(testSchema.name); IgniteCatalogSchema schema = unwrapSchema(schemaPlus); IgniteSchemaTable table = (IgniteSchemaTable) schema.getTable(testTable.name); @@ -469,7 +469,7 @@ public class CatalogSqlSchemaManagerTest { void init(CatalogManager catalogManager) { CatalogSchemaDescriptor schemaDescriptor = newSchemaDescriptor(version); - when(catalogManager.activeCatalogVersion(timestamp)).thenReturn(version); + when(catalogManager.activeCatalogVersion(Long.MAX_VALUE)).thenReturn(version); when(catalogManager.schema(name != null ? name : DEFAULT_SCHEMA_NAME, version)).thenReturn(schemaDescriptor); }
