This is an automated email from the ASF dual-hosted git repository. xuyangzhong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 5497b714347 [FLINK-38030][table-planner] Fix cast exception for TableLineageDatasetImpl when the catalog is not a AbstractCatalog (#26727) 5497b714347 is described below commit 5497b714347d017522daa4b96e9d0c8de2a1cd7b Author: Xuyang <xyzhong...@163.com> AuthorDate: Wed Jul 2 09:27:14 2025 +0800 [FLINK-38030][table-planner] Fix cast exception for TableLineageDatasetImpl when the catalog is not a AbstractCatalog (#26727) --- .../planner/lineage/TableLineageDatasetImpl.java | 4 +- .../planner/factories/PlainTestCatalogFactory.java | 372 +++++++++++++++++++++ .../planner/lineage/TableLineageUtilsTest.java | 117 +++++++ .../plan/common/TableLineageGraphTestBase.java | 52 +++ .../org.apache.flink.table.factories.Factory | 3 +- .../lineage-graph/plain-catalog-batch.json | 56 ++++ .../lineage-graph/plain-catalog-stream.json | 56 ++++ .../lineage-graph/without-catalog-batch.json | 56 ++++ .../lineage-graph/without-catalog-stream.json | 56 ++++ 9 files changed, 768 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDatasetImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDatasetImpl.java index 98d0274e689..ce000dc9458 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDatasetImpl.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDatasetImpl.java @@ -20,7 +20,6 @@ package org.apache.flink.table.planner.lineage; import org.apache.flink.streaming.api.lineage.LineageDataset; import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; -import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.catalog.ObjectPath; @@ -49,8 +48,7 @@ public class TableLineageDatasetImpl implements TableLineageDataset { this.catalogContext = CatalogContext.createContext( contextResolvedTable.getCatalog().isPresent() - ? ((AbstractCatalog) contextResolvedTable.getCatalog().get()) - .getName() + ? contextResolvedTable.getIdentifier().getCatalogName() : "", contextResolvedTable.getCatalog().orElse(null)); this.catalogBaseTable = contextResolvedTable.getTable(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/PlainTestCatalogFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/PlainTestCatalogFactory.java new file mode 100644 index 00000000000..41cb6ef0a89 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/PlainTestCatalogFactory.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.factories; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is a test catalog factory to create a catalog that does not extend {@link AbstractCatalog}. + */ +public class PlainTestCatalogFactory implements CatalogFactory { + + public static final String IDENTIFIER = "test-plain-catalog"; + + public static final ConfigOption<String> DEFAULT_DATABASE = + ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) + .stringType() + .noDefaultValue(); + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(DEFAULT_DATABASE); + return options; + } + + @Override + public Catalog createCatalog(CatalogFactory.Context context) { + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtil.createCatalogFactoryHelper(this, context); + helper.validate(); + + return new TestCatalog(context.getName(), helper.getOptions().get(DEFAULT_DATABASE)); + } + + /** Test catalog that does not extend {@link AbstractCatalog}. */ + public static class TestCatalog implements Catalog { + private final GenericInMemoryCatalog innerCatalog; + + public TestCatalog(String name, String defaultDatabase) { + this.innerCatalog = new GenericInMemoryCatalog(name, defaultDatabase); + } + + @Override + public void open() throws CatalogException { + innerCatalog.open(); + } + + @Override + public void close() throws CatalogException { + innerCatalog.close(); + } + + @Override + public String getDefaultDatabase() throws CatalogException { + return innerCatalog.getDefaultDatabase(); + } + + @Override + public List<String> listDatabases() throws CatalogException { + return innerCatalog.listDatabases(); + } + + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + return innerCatalog.getDatabase(databaseName); + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + return innerCatalog.databaseExists(databaseName); + } + + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + innerCatalog.createDatabase(name, database, ignoreIfExists); + } + + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + innerCatalog.dropDatabase(name, cascade); + } + + @Override + public void alterDatabase( + String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + innerCatalog.alterDatabase(name, newDatabase, ignoreIfNotExists); + } + + @Override + public List<String> listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + return innerCatalog.listTables(databaseName); + } + + @Override + public List<String> listViews(String databaseName) + throws DatabaseNotExistException, CatalogException { + return innerCatalog.listViews(databaseName); + } + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return innerCatalog.getTable(tablePath); + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + return innerCatalog.tableExists(tablePath); + } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + innerCatalog.dropTable(tablePath, ignoreIfNotExists); + } + + @Override + public void renameTable( + ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + innerCatalog.renameTable(tablePath, newTableName, ignoreIfNotExists); + } + + @Override + public void createTable( + ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + innerCatalog.createTable(tablePath, table, ignoreIfExists); + } + + @Override + public void alterTable( + ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + innerCatalog.alterTable(tablePath, newTable, ignoreIfNotExists); + } + + @Override + public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + return innerCatalog.listPartitions(tablePath); + } + + @Override + public List<CatalogPartitionSpec> listPartitions( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws TableNotExistException, + TableNotPartitionedException, + PartitionSpecInvalidException, + CatalogException { + return innerCatalog.listPartitions(tablePath, partitionSpec); + } + + @Override + public List<CatalogPartitionSpec> listPartitionsByFilter( + ObjectPath tablePath, List<Expression> filters) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + return innerCatalog.listPartitionsByFilter(tablePath, filters); + } + + @Override + public CatalogPartition getPartition( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + return innerCatalog.getPartition(tablePath, partitionSpec); + } + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + return innerCatalog.partitionExists(tablePath, partitionSpec); + } + + @Override + public void createPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition partition, + boolean ignoreIfExists) + throws TableNotExistException, + TableNotPartitionedException, + PartitionSpecInvalidException, + PartitionAlreadyExistsException, + CatalogException { + innerCatalog.createPartition(tablePath, partitionSpec, partition, ignoreIfExists); + } + + @Override + public void dropPartition( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + innerCatalog.dropPartition(tablePath, partitionSpec, ignoreIfNotExists); + } + + @Override + public void alterPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition newPartition, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + innerCatalog.alterPartition(tablePath, partitionSpec, newPartition, ignoreIfNotExists); + } + + @Override + public List<String> listFunctions(String dbName) + throws DatabaseNotExistException, CatalogException { + return innerCatalog.listFunctions(dbName); + } + + @Override + public CatalogFunction getFunction(ObjectPath functionPath) + throws FunctionNotExistException, CatalogException { + return innerCatalog.getFunction(functionPath); + } + + @Override + public boolean functionExists(ObjectPath functionPath) throws CatalogException { + return innerCatalog.functionExists(functionPath); + } + + @Override + public void createFunction( + ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) + throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException { + innerCatalog.createFunction(functionPath, function, ignoreIfExists); + } + + @Override + public void alterFunction( + ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + innerCatalog.alterFunction(functionPath, newFunction, ignoreIfNotExists); + } + + @Override + public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + innerCatalog.dropFunction(functionPath, ignoreIfNotExists); + } + + @Override + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return innerCatalog.getTableStatistics(tablePath); + } + + @Override + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return innerCatalog.getTableColumnStatistics(tablePath); + } + + @Override + public CatalogTableStatistics getPartitionStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + return innerCatalog.getPartitionStatistics(tablePath, partitionSpec); + } + + @Override + public CatalogColumnStatistics getPartitionColumnStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + return innerCatalog.getPartitionColumnStatistics(tablePath, partitionSpec); + } + + @Override + public void alterTableStatistics( + ObjectPath tablePath, + CatalogTableStatistics tableStatistics, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + innerCatalog.alterTableStatistics(tablePath, tableStatistics, ignoreIfNotExists); + } + + @Override + public void alterTableColumnStatistics( + ObjectPath tablePath, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + innerCatalog.alterTableColumnStatistics(tablePath, columnStatistics, ignoreIfNotExists); + } + + @Override + public void alterPartitionStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogTableStatistics partitionStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + innerCatalog.alterPartitionStatistics( + tablePath, partitionSpec, partitionStatistics, ignoreIfNotExists); + } + + @Override + public void alterPartitionColumnStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + innerCatalog.alterPartitionColumnStatistics( + tablePath, partitionSpec, columnStatistics, ignoreIfNotExists); + } + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/lineage/TableLineageUtilsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/lineage/TableLineageUtilsTest.java new file mode 100644 index 00000000000..35d300ea35e --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/lineage/TableLineageUtilsTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.lineage; + +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.planner.lineage.TableLineageUtils.createTableLineageDataset; +import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_CATALOG; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link TableLineageUtils}. */ +class TableLineageUtilsTest { + + private static final Catalog CATALOG = new GenericInMemoryCatalog(DEFAULT_CATALOG, "db1"); + private static final ResolvedSchema CATALOG_TABLE_RESOLVED_SCHEMA = + new ResolvedSchema( + Arrays.asList( + Column.physical("a", DataTypes.STRING()), + Column.physical("b", DataTypes.INT()), + Column.physical("c", DataTypes.BOOLEAN())), + Collections.emptyList(), + null, + Collections.emptyList()); + private static final Schema CATALOG_TABLE_SCHEMA = + Schema.newBuilder().fromResolvedSchema(CATALOG_TABLE_RESOLVED_SCHEMA).build(); + private static final Map<String, String> TABLE_OPTIONS = new HashMap<>(); + + static { + TABLE_OPTIONS.put("a", "1"); + TABLE_OPTIONS.put("b", "10"); + TABLE_OPTIONS.put("d", "4"); + } + + @Test + void testCreateTableLineageDatasetWithCatalog() { + final ObjectIdentifier objectIdentifier = + ObjectIdentifier.of(DEFAULT_CATALOG, "my_db", "my_permanent_table"); + final ContextResolvedTable resolvedTable = + ContextResolvedTable.permanent( + objectIdentifier, + CATALOG, + new ResolvedCatalogTable( + CatalogTable.newBuilder() + .schema(CATALOG_TABLE_SCHEMA) + .comment("my table") + .partitionKeys(Collections.emptyList()) + .options(TABLE_OPTIONS) + .build(), + CATALOG_TABLE_RESOLVED_SCHEMA)); + + LineageDataset lineageDataset = createTableLineageDataset(resolvedTable, Optional.empty()); + assertThat(lineageDataset).isInstanceOf(TableLineageDatasetImpl.class); + + TableLineageDatasetImpl tableLineageDataset = (TableLineageDatasetImpl) lineageDataset; + assertThat(tableLineageDataset.catalogContext().getCatalogName()) + .isEqualTo(DEFAULT_CATALOG); + assertThat(tableLineageDataset.name()).isEqualTo(objectIdentifier.asSummaryString()); + } + + @Test + void testCreateTableLineageDatasetWithoutCatalog() { + final ObjectIdentifier objectIdentifier = + ObjectIdentifier.of("default_cat", "default_db", "my_temporary_table"); + final ContextResolvedTable resolvedTable = + ContextResolvedTable.temporary( + objectIdentifier, + new ResolvedCatalogTable( + CatalogTable.newBuilder() + .schema(CATALOG_TABLE_SCHEMA) + .comment("my table") + .partitionKeys(Collections.emptyList()) + .options(TABLE_OPTIONS) + .build(), + CATALOG_TABLE_RESOLVED_SCHEMA)); + + LineageDataset lineageDataset = createTableLineageDataset(resolvedTable, Optional.empty()); + assertThat(lineageDataset).isInstanceOf(TableLineageDatasetImpl.class); + + TableLineageDatasetImpl tableLineageDataset = (TableLineageDatasetImpl) lineageDataset; + assertThat(tableLineageDataset.catalogContext().getCatalogName()).isEmpty(); + assertThat(tableLineageDataset.name()).isEqualTo(objectIdentifier.asSummaryString()); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/TableLineageGraphTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/TableLineageGraphTestBase.java index fc097cd51e4..4bc2ad47cdf 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/TableLineageGraphTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/TableLineageGraphTestBase.java @@ -133,6 +133,58 @@ public abstract class TableLineageGraphTestBase extends TableTestBase { verify(lineageGraph, isBatchMode() ? "union-batch.json" : "union-stream.json"); } + @Test + void testWithNonAbstractCatalog() throws Exception { + util.getTableEnv() + .executeSql( + "CREATE CATALOG MyPlainCat with ( " + + " 'type' = 'test-plain-catalog', " + + " 'default-database' = 'MyDatabase')"); + util.getTableEnv().executeSql("USE CATALOG MyPlainCat"); + + util.getTableEnv() + .executeSql( + "CREATE TABLE Src (\n" + + " a BIGINT,\n" + + " b INT NOT NULL\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = '" + + isBatchMode() + + "')"); + + util.getTableEnv() + .executeSql( + "CREATE TABLE Snk (\n" + + " b INT NOT NULL,\n" + + " a BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = '" + + isBatchMode() + + "')"); + + List<Transformation<?>> transformations = + util.generateTransformations("INSERT INTO Snk SELECT b, a FROM Src"); + LineageGraph lineageGraph = generateLineageGraph(transformations); + verify( + lineageGraph, + isBatchMode() ? "plain-catalog-batch.json" : "plain-catalog-stream.json"); + } + + @Test + void testWithoutCatalog() throws Exception { + util.getTableEnv().executeSql("CREATE TEMPORARY TABLE Src LIKE FirstTable"); + util.getTableEnv().executeSql("CREATE TEMPORARY TABLE Snk LIKE SecondTable"); + + List<Transformation<?>> transformations = + util.generateTransformations("INSERT INTO Snk SELECT * FROM Src"); + LineageGraph lineageGraph = generateLineageGraph(transformations); + verify( + lineageGraph, + isBatchMode() ? "without-catalog-batch.json" : "without-catalog-stream.json"); + } + private LineageGraph generateLineageGraph(List<Transformation<?>> transformations) { StreamGraphGenerator streamGraphGenerator = new StreamGraphGenerator( diff --git a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory index a7640c70f42..30b07f2bd1e 100644 --- a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -22,4 +22,5 @@ org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory org.apache.flink.table.planner.factories.TestProcedureCatalogFactory org.apache.flink.table.planner.utils.TestSimpleDynamicTableSourceFactory -org.apache.flink.table.planner.factories.TestValuesModelFactory \ No newline at end of file +org.apache.flink.table.planner.factories.TestValuesModelFactory +org.apache.flink.table.planner.factories.PlainTestCatalogFactory \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/lineage-graph/plain-catalog-batch.json b/flink-table/flink-table-planner/src/test/resources/lineage-graph/plain-catalog-batch.json new file mode 100644 index 00000000000..81dc3106163 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/lineage-graph/plain-catalog-batch.json @@ -0,0 +1,56 @@ +{ + "lineageEdges" : [ { + "sourceLineageVertex" : { + "datasets" : [ { + "name" : "MyPlainCat.MyDatabase.Src", + "namespace" : "values://FromElementsFunction", + "objectPath" : { + "databaseName" : "MyDatabase", + "objectName" : "Src", + "fullName" : "MyDatabase.Src" + }, + "facets" : { } + } ], + "boundedness" : "BOUNDED" + }, + "sinkVertex" : { + "datasets" : [ { + "name" : "MyPlainCat.MyDatabase.Snk", + "namespace" : "values://AppendingSinkFunction", + "objectPath" : { + "databaseName" : "MyDatabase", + "objectName" : "Snk", + "fullName" : "MyDatabase.Snk" + }, + "facets" : { } + } ], + "modifyType" : "INSERT" + } + } ], + "sources" : [ { + "datasets" : [ { + "name" : "MyPlainCat.MyDatabase.Src", + "namespace" : "values://FromElementsFunction", + "objectPath" : { + "databaseName" : "MyDatabase", + "objectName" : "Src", + "fullName" : "MyDatabase.Src" + }, + "facets" : { } + } ], + "boundedness" : "BOUNDED" + } ], + "sinks" : [ { + "datasets" : [ { + "name" : "MyPlainCat.MyDatabase.Snk", + "namespace" : "values://AppendingSinkFunction", + "objectPath" : { + "databaseName" : "MyDatabase", + "objectName" : "Snk", + "fullName" : "MyDatabase.Snk" + }, + "facets" : { } + } ], + "modifyType" : "INSERT" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/lineage-graph/plain-catalog-stream.json b/flink-table/flink-table-planner/src/test/resources/lineage-graph/plain-catalog-stream.json new file mode 100644 index 00000000000..723f1447327 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/lineage-graph/plain-catalog-stream.json @@ -0,0 +1,56 @@ +{ + "lineageEdges" : [ { + "sourceLineageVertex" : { + "datasets" : [ { + "name" : "MyPlainCat.MyDatabase.Src", + "namespace" : "values://FromElementsFunction", + "objectPath" : { + "databaseName" : "MyDatabase", + "objectName" : "Src", + "fullName" : "MyDatabase.Src" + }, + "facets" : { } + } ], + "boundedness" : "CONTINUOUS_UNBOUNDED" + }, + "sinkVertex" : { + "datasets" : [ { + "name" : "MyPlainCat.MyDatabase.Snk", + "namespace" : "values://AppendingSinkFunction", + "objectPath" : { + "databaseName" : "MyDatabase", + "objectName" : "Snk", + "fullName" : "MyDatabase.Snk" + }, + "facets" : { } + } ], + "modifyType" : "INSERT" + } + } ], + "sources" : [ { + "datasets" : [ { + "name" : "MyPlainCat.MyDatabase.Src", + "namespace" : "values://FromElementsFunction", + "objectPath" : { + "databaseName" : "MyDatabase", + "objectName" : "Src", + "fullName" : "MyDatabase.Src" + }, + "facets" : { } + } ], + "boundedness" : "CONTINUOUS_UNBOUNDED" + } ], + "sinks" : [ { + "datasets" : [ { + "name" : "MyPlainCat.MyDatabase.Snk", + "namespace" : "values://AppendingSinkFunction", + "objectPath" : { + "databaseName" : "MyDatabase", + "objectName" : "Snk", + "fullName" : "MyDatabase.Snk" + }, + "facets" : { } + } ], + "modifyType" : "INSERT" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/lineage-graph/without-catalog-batch.json b/flink-table/flink-table-planner/src/test/resources/lineage-graph/without-catalog-batch.json new file mode 100644 index 00000000000..8f818ef4449 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/lineage-graph/without-catalog-batch.json @@ -0,0 +1,56 @@ +{ + "lineageEdges" : [ { + "sourceLineageVertex" : { + "datasets" : [ { + "name" : "default_catalog.default_database.Src", + "namespace" : "values://FromElementsFunction", + "objectPath" : { + "databaseName" : "default_database", + "objectName" : "Src", + "fullName" : "default_database.Src" + }, + "facets" : { } + } ], + "boundedness" : "BOUNDED" + }, + "sinkVertex" : { + "datasets" : [ { + "name" : "default_catalog.default_database.Snk", + "namespace" : "values://AppendingSinkFunction", + "objectPath" : { + "databaseName" : "default_database", + "objectName" : "Snk", + "fullName" : "default_database.Snk" + }, + "facets" : { } + } ], + "modifyType" : "INSERT" + } + } ], + "sources" : [ { + "datasets" : [ { + "name" : "default_catalog.default_database.Src", + "namespace" : "values://FromElementsFunction", + "objectPath" : { + "databaseName" : "default_database", + "objectName" : "Src", + "fullName" : "default_database.Src" + }, + "facets" : { } + } ], + "boundedness" : "BOUNDED" + } ], + "sinks" : [ { + "datasets" : [ { + "name" : "default_catalog.default_database.Snk", + "namespace" : "values://AppendingSinkFunction", + "objectPath" : { + "databaseName" : "default_database", + "objectName" : "Snk", + "fullName" : "default_database.Snk" + }, + "facets" : { } + } ], + "modifyType" : "INSERT" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/lineage-graph/without-catalog-stream.json b/flink-table/flink-table-planner/src/test/resources/lineage-graph/without-catalog-stream.json new file mode 100644 index 00000000000..86549608e0f --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/lineage-graph/without-catalog-stream.json @@ -0,0 +1,56 @@ +{ + "lineageEdges" : [ { + "sourceLineageVertex" : { + "datasets" : [ { + "name" : "default_catalog.default_database.Src", + "namespace" : "values://FromElementsFunction", + "objectPath" : { + "databaseName" : "default_database", + "objectName" : "Src", + "fullName" : "default_database.Src" + }, + "facets" : { } + } ], + "boundedness" : "CONTINUOUS_UNBOUNDED" + }, + "sinkVertex" : { + "datasets" : [ { + "name" : "default_catalog.default_database.Snk", + "namespace" : "values://AppendingSinkFunction", + "objectPath" : { + "databaseName" : "default_database", + "objectName" : "Snk", + "fullName" : "default_database.Snk" + }, + "facets" : { } + } ], + "modifyType" : "INSERT" + } + } ], + "sources" : [ { + "datasets" : [ { + "name" : "default_catalog.default_database.Src", + "namespace" : "values://FromElementsFunction", + "objectPath" : { + "databaseName" : "default_database", + "objectName" : "Src", + "fullName" : "default_database.Src" + }, + "facets" : { } + } ], + "boundedness" : "CONTINUOUS_UNBOUNDED" + } ], + "sinks" : [ { + "datasets" : [ { + "name" : "default_catalog.default_database.Snk", + "namespace" : "values://AppendingSinkFunction", + "objectPath" : { + "databaseName" : "default_database", + "objectName" : "Snk", + "fullName" : "default_database.Snk" + }, + "facets" : { } + } ], + "modifyType" : "INSERT" + } ] +} \ No newline at end of file