This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5497b714347 [FLINK-38030][table-planner] Fix cast exception for 
TableLineageDatasetImpl when the catalog is not a AbstractCatalog (#26727)
5497b714347 is described below

commit 5497b714347d017522daa4b96e9d0c8de2a1cd7b
Author: Xuyang <xyzhong...@163.com>
AuthorDate: Wed Jul 2 09:27:14 2025 +0800

    [FLINK-38030][table-planner] Fix cast exception for TableLineageDatasetImpl 
when the catalog is not a AbstractCatalog (#26727)
---
 .../planner/lineage/TableLineageDatasetImpl.java   |   4 +-
 .../planner/factories/PlainTestCatalogFactory.java | 372 +++++++++++++++++++++
 .../planner/lineage/TableLineageUtilsTest.java     | 117 +++++++
 .../plan/common/TableLineageGraphTestBase.java     |  52 +++
 .../org.apache.flink.table.factories.Factory       |   3 +-
 .../lineage-graph/plain-catalog-batch.json         |  56 ++++
 .../lineage-graph/plain-catalog-stream.json        |  56 ++++
 .../lineage-graph/without-catalog-batch.json       |  56 ++++
 .../lineage-graph/without-catalog-stream.json      |  56 ++++
 9 files changed, 768 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDatasetImpl.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDatasetImpl.java
index 98d0274e689..ce000dc9458 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDatasetImpl.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDatasetImpl.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.lineage;
 
 import org.apache.flink.streaming.api.lineage.LineageDataset;
 import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
-import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -49,8 +48,7 @@ public class TableLineageDatasetImpl implements 
TableLineageDataset {
         this.catalogContext =
                 CatalogContext.createContext(
                         contextResolvedTable.getCatalog().isPresent()
-                                ? ((AbstractCatalog) 
contextResolvedTable.getCatalog().get())
-                                        .getName()
+                                ? 
contextResolvedTable.getIdentifier().getCatalogName()
                                 : "",
                         contextResolvedTable.getCatalog().orElse(null));
         this.catalogBaseTable = contextResolvedTable.getTable();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/PlainTestCatalogFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/PlainTestCatalogFactory.java
new file mode 100644
index 00000000000..41cb6ef0a89
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/PlainTestCatalogFactory.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is a test catalog factory to create a catalog that does not extend 
{@link AbstractCatalog}.
+ */
+public class PlainTestCatalogFactory implements CatalogFactory {
+
+    public static final String IDENTIFIER = "test-plain-catalog";
+
+    public static final ConfigOption<String> DEFAULT_DATABASE =
+            ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+                    .stringType()
+                    .noDefaultValue();
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(DEFAULT_DATABASE);
+        return options;
+    }
+
+    @Override
+    public Catalog createCatalog(CatalogFactory.Context context) {
+        final FactoryUtil.CatalogFactoryHelper helper =
+                FactoryUtil.createCatalogFactoryHelper(this, context);
+        helper.validate();
+
+        return new TestCatalog(context.getName(), 
helper.getOptions().get(DEFAULT_DATABASE));
+    }
+
+    /** Test catalog that does not extend {@link AbstractCatalog}. */
+    public static class TestCatalog implements Catalog {
+        private final GenericInMemoryCatalog innerCatalog;
+
+        public TestCatalog(String name, String defaultDatabase) {
+            this.innerCatalog = new GenericInMemoryCatalog(name, 
defaultDatabase);
+        }
+
+        @Override
+        public void open() throws CatalogException {
+            innerCatalog.open();
+        }
+
+        @Override
+        public void close() throws CatalogException {
+            innerCatalog.close();
+        }
+
+        @Override
+        public String getDefaultDatabase() throws CatalogException {
+            return innerCatalog.getDefaultDatabase();
+        }
+
+        @Override
+        public List<String> listDatabases() throws CatalogException {
+            return innerCatalog.listDatabases();
+        }
+
+        @Override
+        public CatalogDatabase getDatabase(String databaseName)
+                throws DatabaseNotExistException, CatalogException {
+            return innerCatalog.getDatabase(databaseName);
+        }
+
+        @Override
+        public boolean databaseExists(String databaseName) throws 
CatalogException {
+            return innerCatalog.databaseExists(databaseName);
+        }
+
+        @Override
+        public void createDatabase(String name, CatalogDatabase database, 
boolean ignoreIfExists)
+                throws DatabaseAlreadyExistException, CatalogException {
+            innerCatalog.createDatabase(name, database, ignoreIfExists);
+        }
+
+        @Override
+        public void dropDatabase(String name, boolean ignoreIfNotExists, 
boolean cascade)
+                throws DatabaseNotExistException, DatabaseNotEmptyException, 
CatalogException {
+            innerCatalog.dropDatabase(name, cascade);
+        }
+
+        @Override
+        public void alterDatabase(
+                String name, CatalogDatabase newDatabase, boolean 
ignoreIfNotExists)
+                throws DatabaseNotExistException, CatalogException {
+            innerCatalog.alterDatabase(name, newDatabase, ignoreIfNotExists);
+        }
+
+        @Override
+        public List<String> listTables(String databaseName)
+                throws DatabaseNotExistException, CatalogException {
+            return innerCatalog.listTables(databaseName);
+        }
+
+        @Override
+        public List<String> listViews(String databaseName)
+                throws DatabaseNotExistException, CatalogException {
+            return innerCatalog.listViews(databaseName);
+        }
+
+        @Override
+        public CatalogBaseTable getTable(ObjectPath tablePath)
+                throws TableNotExistException, CatalogException {
+            return innerCatalog.getTable(tablePath);
+        }
+
+        @Override
+        public boolean tableExists(ObjectPath tablePath) throws 
CatalogException {
+            return innerCatalog.tableExists(tablePath);
+        }
+
+        @Override
+        public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+                throws TableNotExistException, CatalogException {
+            innerCatalog.dropTable(tablePath, ignoreIfNotExists);
+        }
+
+        @Override
+        public void renameTable(
+                ObjectPath tablePath, String newTableName, boolean 
ignoreIfNotExists)
+                throws TableNotExistException, TableAlreadyExistException, 
CatalogException {
+            innerCatalog.renameTable(tablePath, newTableName, 
ignoreIfNotExists);
+        }
+
+        @Override
+        public void createTable(
+                ObjectPath tablePath, CatalogBaseTable table, boolean 
ignoreIfExists)
+                throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+            innerCatalog.createTable(tablePath, table, ignoreIfExists);
+        }
+
+        @Override
+        public void alterTable(
+                ObjectPath tablePath, CatalogBaseTable newTable, boolean 
ignoreIfNotExists)
+                throws TableNotExistException, CatalogException {
+            innerCatalog.alterTable(tablePath, newTable, ignoreIfNotExists);
+        }
+
+        @Override
+        public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+                throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+            return innerCatalog.listPartitions(tablePath);
+        }
+
+        @Override
+        public List<CatalogPartitionSpec> listPartitions(
+                ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+                throws TableNotExistException,
+                        TableNotPartitionedException,
+                        PartitionSpecInvalidException,
+                        CatalogException {
+            return innerCatalog.listPartitions(tablePath, partitionSpec);
+        }
+
+        @Override
+        public List<CatalogPartitionSpec> listPartitionsByFilter(
+                ObjectPath tablePath, List<Expression> filters)
+                throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+            return innerCatalog.listPartitionsByFilter(tablePath, filters);
+        }
+
+        @Override
+        public CatalogPartition getPartition(
+                ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+                throws PartitionNotExistException, CatalogException {
+            return innerCatalog.getPartition(tablePath, partitionSpec);
+        }
+
+        @Override
+        public boolean partitionExists(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+                throws CatalogException {
+            return innerCatalog.partitionExists(tablePath, partitionSpec);
+        }
+
+        @Override
+        public void createPartition(
+                ObjectPath tablePath,
+                CatalogPartitionSpec partitionSpec,
+                CatalogPartition partition,
+                boolean ignoreIfExists)
+                throws TableNotExistException,
+                        TableNotPartitionedException,
+                        PartitionSpecInvalidException,
+                        PartitionAlreadyExistsException,
+                        CatalogException {
+            innerCatalog.createPartition(tablePath, partitionSpec, partition, 
ignoreIfExists);
+        }
+
+        @Override
+        public void dropPartition(
+                ObjectPath tablePath, CatalogPartitionSpec partitionSpec, 
boolean ignoreIfNotExists)
+                throws PartitionNotExistException, CatalogException {
+            innerCatalog.dropPartition(tablePath, partitionSpec, 
ignoreIfNotExists);
+        }
+
+        @Override
+        public void alterPartition(
+                ObjectPath tablePath,
+                CatalogPartitionSpec partitionSpec,
+                CatalogPartition newPartition,
+                boolean ignoreIfNotExists)
+                throws PartitionNotExistException, CatalogException {
+            innerCatalog.alterPartition(tablePath, partitionSpec, 
newPartition, ignoreIfNotExists);
+        }
+
+        @Override
+        public List<String> listFunctions(String dbName)
+                throws DatabaseNotExistException, CatalogException {
+            return innerCatalog.listFunctions(dbName);
+        }
+
+        @Override
+        public CatalogFunction getFunction(ObjectPath functionPath)
+                throws FunctionNotExistException, CatalogException {
+            return innerCatalog.getFunction(functionPath);
+        }
+
+        @Override
+        public boolean functionExists(ObjectPath functionPath) throws 
CatalogException {
+            return innerCatalog.functionExists(functionPath);
+        }
+
+        @Override
+        public void createFunction(
+                ObjectPath functionPath, CatalogFunction function, boolean 
ignoreIfExists)
+                throws FunctionAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+            innerCatalog.createFunction(functionPath, function, 
ignoreIfExists);
+        }
+
+        @Override
+        public void alterFunction(
+                ObjectPath functionPath, CatalogFunction newFunction, boolean 
ignoreIfNotExists)
+                throws FunctionNotExistException, CatalogException {
+            innerCatalog.alterFunction(functionPath, newFunction, 
ignoreIfNotExists);
+        }
+
+        @Override
+        public void dropFunction(ObjectPath functionPath, boolean 
ignoreIfNotExists)
+                throws FunctionNotExistException, CatalogException {
+            innerCatalog.dropFunction(functionPath, ignoreIfNotExists);
+        }
+
+        @Override
+        public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+                throws TableNotExistException, CatalogException {
+            return innerCatalog.getTableStatistics(tablePath);
+        }
+
+        @Override
+        public CatalogColumnStatistics getTableColumnStatistics(ObjectPath 
tablePath)
+                throws TableNotExistException, CatalogException {
+            return innerCatalog.getTableColumnStatistics(tablePath);
+        }
+
+        @Override
+        public CatalogTableStatistics getPartitionStatistics(
+                ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+                throws PartitionNotExistException, CatalogException {
+            return innerCatalog.getPartitionStatistics(tablePath, 
partitionSpec);
+        }
+
+        @Override
+        public CatalogColumnStatistics getPartitionColumnStatistics(
+                ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+                throws PartitionNotExistException, CatalogException {
+            return innerCatalog.getPartitionColumnStatistics(tablePath, 
partitionSpec);
+        }
+
+        @Override
+        public void alterTableStatistics(
+                ObjectPath tablePath,
+                CatalogTableStatistics tableStatistics,
+                boolean ignoreIfNotExists)
+                throws TableNotExistException, CatalogException {
+            innerCatalog.alterTableStatistics(tablePath, tableStatistics, 
ignoreIfNotExists);
+        }
+
+        @Override
+        public void alterTableColumnStatistics(
+                ObjectPath tablePath,
+                CatalogColumnStatistics columnStatistics,
+                boolean ignoreIfNotExists)
+                throws TableNotExistException, CatalogException {
+            innerCatalog.alterTableColumnStatistics(tablePath, 
columnStatistics, ignoreIfNotExists);
+        }
+
+        @Override
+        public void alterPartitionStatistics(
+                ObjectPath tablePath,
+                CatalogPartitionSpec partitionSpec,
+                CatalogTableStatistics partitionStatistics,
+                boolean ignoreIfNotExists)
+                throws PartitionNotExistException, CatalogException {
+            innerCatalog.alterPartitionStatistics(
+                    tablePath, partitionSpec, partitionStatistics, 
ignoreIfNotExists);
+        }
+
+        @Override
+        public void alterPartitionColumnStatistics(
+                ObjectPath tablePath,
+                CatalogPartitionSpec partitionSpec,
+                CatalogColumnStatistics columnStatistics,
+                boolean ignoreIfNotExists)
+                throws PartitionNotExistException, CatalogException {
+            innerCatalog.alterPartitionColumnStatistics(
+                    tablePath, partitionSpec, columnStatistics, 
ignoreIfNotExists);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/lineage/TableLineageUtilsTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/lineage/TableLineageUtilsTest.java
new file mode 100644
index 00000000000..35d300ea35e
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/lineage/TableLineageUtilsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.lineage;
+
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.planner.lineage.TableLineageUtils.createTableLineageDataset;
+import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_CATALOG;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link TableLineageUtils}. */
+class TableLineageUtilsTest {
+
+    private static final Catalog CATALOG = new 
GenericInMemoryCatalog(DEFAULT_CATALOG, "db1");
+    private static final ResolvedSchema CATALOG_TABLE_RESOLVED_SCHEMA =
+            new ResolvedSchema(
+                    Arrays.asList(
+                            Column.physical("a", DataTypes.STRING()),
+                            Column.physical("b", DataTypes.INT()),
+                            Column.physical("c", DataTypes.BOOLEAN())),
+                    Collections.emptyList(),
+                    null,
+                    Collections.emptyList());
+    private static final Schema CATALOG_TABLE_SCHEMA =
+            
Schema.newBuilder().fromResolvedSchema(CATALOG_TABLE_RESOLVED_SCHEMA).build();
+    private static final Map<String, String> TABLE_OPTIONS = new HashMap<>();
+
+    static {
+        TABLE_OPTIONS.put("a", "1");
+        TABLE_OPTIONS.put("b", "10");
+        TABLE_OPTIONS.put("d", "4");
+    }
+
+    @Test
+    void testCreateTableLineageDatasetWithCatalog() {
+        final ObjectIdentifier objectIdentifier =
+                ObjectIdentifier.of(DEFAULT_CATALOG, "my_db", 
"my_permanent_table");
+        final ContextResolvedTable resolvedTable =
+                ContextResolvedTable.permanent(
+                        objectIdentifier,
+                        CATALOG,
+                        new ResolvedCatalogTable(
+                                CatalogTable.newBuilder()
+                                        .schema(CATALOG_TABLE_SCHEMA)
+                                        .comment("my table")
+                                        .partitionKeys(Collections.emptyList())
+                                        .options(TABLE_OPTIONS)
+                                        .build(),
+                                CATALOG_TABLE_RESOLVED_SCHEMA));
+
+        LineageDataset lineageDataset = 
createTableLineageDataset(resolvedTable, Optional.empty());
+        assertThat(lineageDataset).isInstanceOf(TableLineageDatasetImpl.class);
+
+        TableLineageDatasetImpl tableLineageDataset = 
(TableLineageDatasetImpl) lineageDataset;
+        assertThat(tableLineageDataset.catalogContext().getCatalogName())
+                .isEqualTo(DEFAULT_CATALOG);
+        
assertThat(tableLineageDataset.name()).isEqualTo(objectIdentifier.asSummaryString());
+    }
+
+    @Test
+    void testCreateTableLineageDatasetWithoutCatalog() {
+        final ObjectIdentifier objectIdentifier =
+                ObjectIdentifier.of("default_cat", "default_db", 
"my_temporary_table");
+        final ContextResolvedTable resolvedTable =
+                ContextResolvedTable.temporary(
+                        objectIdentifier,
+                        new ResolvedCatalogTable(
+                                CatalogTable.newBuilder()
+                                        .schema(CATALOG_TABLE_SCHEMA)
+                                        .comment("my table")
+                                        .partitionKeys(Collections.emptyList())
+                                        .options(TABLE_OPTIONS)
+                                        .build(),
+                                CATALOG_TABLE_RESOLVED_SCHEMA));
+
+        LineageDataset lineageDataset = 
createTableLineageDataset(resolvedTable, Optional.empty());
+        assertThat(lineageDataset).isInstanceOf(TableLineageDatasetImpl.class);
+
+        TableLineageDatasetImpl tableLineageDataset = 
(TableLineageDatasetImpl) lineageDataset;
+        
assertThat(tableLineageDataset.catalogContext().getCatalogName()).isEmpty();
+        
assertThat(tableLineageDataset.name()).isEqualTo(objectIdentifier.asSummaryString());
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/TableLineageGraphTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/TableLineageGraphTestBase.java
index fc097cd51e4..4bc2ad47cdf 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/TableLineageGraphTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/TableLineageGraphTestBase.java
@@ -133,6 +133,58 @@ public abstract class TableLineageGraphTestBase extends 
TableTestBase {
         verify(lineageGraph, isBatchMode() ? "union-batch.json" : 
"union-stream.json");
     }
 
+    @Test
+    void testWithNonAbstractCatalog() throws Exception {
+        util.getTableEnv()
+                .executeSql(
+                        "CREATE CATALOG MyPlainCat with ( "
+                                + " 'type' = 'test-plain-catalog', "
+                                + " 'default-database' = 'MyDatabase')");
+        util.getTableEnv().executeSql("USE CATALOG MyPlainCat");
+
+        util.getTableEnv()
+                .executeSql(
+                        "CREATE TABLE Src (\n"
+                                + "  a BIGINT,\n"
+                                + "  b INT NOT NULL\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'bounded' = '"
+                                + isBatchMode()
+                                + "')");
+
+        util.getTableEnv()
+                .executeSql(
+                        "CREATE TABLE Snk (\n"
+                                + "  b INT NOT NULL,\n"
+                                + "  a BIGINT\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'bounded' = '"
+                                + isBatchMode()
+                                + "')");
+
+        List<Transformation<?>> transformations =
+                util.generateTransformations("INSERT INTO Snk SELECT b, a FROM 
Src");
+        LineageGraph lineageGraph = generateLineageGraph(transformations);
+        verify(
+                lineageGraph,
+                isBatchMode() ? "plain-catalog-batch.json" : 
"plain-catalog-stream.json");
+    }
+
+    @Test
+    void testWithoutCatalog() throws Exception {
+        util.getTableEnv().executeSql("CREATE TEMPORARY TABLE Src LIKE 
FirstTable");
+        util.getTableEnv().executeSql("CREATE TEMPORARY TABLE Snk LIKE 
SecondTable");
+
+        List<Transformation<?>> transformations =
+                util.generateTransformations("INSERT INTO Snk SELECT * FROM 
Src");
+        LineageGraph lineageGraph = generateLineageGraph(transformations);
+        verify(
+                lineageGraph,
+                isBatchMode() ? "without-catalog-batch.json" : 
"without-catalog-stream.json");
+    }
+
     private LineageGraph generateLineageGraph(List<Transformation<?>> 
transformations) {
         StreamGraphGenerator streamGraphGenerator =
                 new StreamGraphGenerator(
diff --git 
a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index a7640c70f42..30b07f2bd1e 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -22,4 +22,5 @@ 
org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory
 org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory
 org.apache.flink.table.planner.factories.TestProcedureCatalogFactory
 org.apache.flink.table.planner.utils.TestSimpleDynamicTableSourceFactory
-org.apache.flink.table.planner.factories.TestValuesModelFactory
\ No newline at end of file
+org.apache.flink.table.planner.factories.TestValuesModelFactory
+org.apache.flink.table.planner.factories.PlainTestCatalogFactory
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/lineage-graph/plain-catalog-batch.json
 
b/flink-table/flink-table-planner/src/test/resources/lineage-graph/plain-catalog-batch.json
new file mode 100644
index 00000000000..81dc3106163
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/lineage-graph/plain-catalog-batch.json
@@ -0,0 +1,56 @@
+{
+  "lineageEdges" : [ {
+    "sourceLineageVertex" : {
+      "datasets" : [ {
+        "name" : "MyPlainCat.MyDatabase.Src",
+        "namespace" : "values://FromElementsFunction",
+        "objectPath" : {
+          "databaseName" : "MyDatabase",
+          "objectName" : "Src",
+          "fullName" : "MyDatabase.Src"
+        },
+        "facets" : { }
+      } ],
+      "boundedness" : "BOUNDED"
+    },
+    "sinkVertex" : {
+      "datasets" : [ {
+        "name" : "MyPlainCat.MyDatabase.Snk",
+        "namespace" : "values://AppendingSinkFunction",
+        "objectPath" : {
+          "databaseName" : "MyDatabase",
+          "objectName" : "Snk",
+          "fullName" : "MyDatabase.Snk"
+        },
+        "facets" : { }
+      } ],
+      "modifyType" : "INSERT"
+    }
+  } ],
+  "sources" : [ {
+    "datasets" : [ {
+      "name" : "MyPlainCat.MyDatabase.Src",
+      "namespace" : "values://FromElementsFunction",
+      "objectPath" : {
+        "databaseName" : "MyDatabase",
+        "objectName" : "Src",
+        "fullName" : "MyDatabase.Src"
+      },
+      "facets" : { }
+    } ],
+    "boundedness" : "BOUNDED"
+  } ],
+  "sinks" : [ {
+    "datasets" : [ {
+      "name" : "MyPlainCat.MyDatabase.Snk",
+      "namespace" : "values://AppendingSinkFunction",
+      "objectPath" : {
+        "databaseName" : "MyDatabase",
+        "objectName" : "Snk",
+        "fullName" : "MyDatabase.Snk"
+      },
+      "facets" : { }
+    } ],
+    "modifyType" : "INSERT"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/lineage-graph/plain-catalog-stream.json
 
b/flink-table/flink-table-planner/src/test/resources/lineage-graph/plain-catalog-stream.json
new file mode 100644
index 00000000000..723f1447327
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/lineage-graph/plain-catalog-stream.json
@@ -0,0 +1,56 @@
+{
+  "lineageEdges" : [ {
+    "sourceLineageVertex" : {
+      "datasets" : [ {
+        "name" : "MyPlainCat.MyDatabase.Src",
+        "namespace" : "values://FromElementsFunction",
+        "objectPath" : {
+          "databaseName" : "MyDatabase",
+          "objectName" : "Src",
+          "fullName" : "MyDatabase.Src"
+        },
+        "facets" : { }
+      } ],
+      "boundedness" : "CONTINUOUS_UNBOUNDED"
+    },
+    "sinkVertex" : {
+      "datasets" : [ {
+        "name" : "MyPlainCat.MyDatabase.Snk",
+        "namespace" : "values://AppendingSinkFunction",
+        "objectPath" : {
+          "databaseName" : "MyDatabase",
+          "objectName" : "Snk",
+          "fullName" : "MyDatabase.Snk"
+        },
+        "facets" : { }
+      } ],
+      "modifyType" : "INSERT"
+    }
+  } ],
+  "sources" : [ {
+    "datasets" : [ {
+      "name" : "MyPlainCat.MyDatabase.Src",
+      "namespace" : "values://FromElementsFunction",
+      "objectPath" : {
+        "databaseName" : "MyDatabase",
+        "objectName" : "Src",
+        "fullName" : "MyDatabase.Src"
+      },
+      "facets" : { }
+    } ],
+    "boundedness" : "CONTINUOUS_UNBOUNDED"
+  } ],
+  "sinks" : [ {
+    "datasets" : [ {
+      "name" : "MyPlainCat.MyDatabase.Snk",
+      "namespace" : "values://AppendingSinkFunction",
+      "objectPath" : {
+        "databaseName" : "MyDatabase",
+        "objectName" : "Snk",
+        "fullName" : "MyDatabase.Snk"
+      },
+      "facets" : { }
+    } ],
+    "modifyType" : "INSERT"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/lineage-graph/without-catalog-batch.json
 
b/flink-table/flink-table-planner/src/test/resources/lineage-graph/without-catalog-batch.json
new file mode 100644
index 00000000000..8f818ef4449
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/lineage-graph/without-catalog-batch.json
@@ -0,0 +1,56 @@
+{
+  "lineageEdges" : [ {
+    "sourceLineageVertex" : {
+      "datasets" : [ {
+        "name" : "default_catalog.default_database.Src",
+        "namespace" : "values://FromElementsFunction",
+        "objectPath" : {
+          "databaseName" : "default_database",
+          "objectName" : "Src",
+          "fullName" : "default_database.Src"
+        },
+        "facets" : { }
+      } ],
+      "boundedness" : "BOUNDED"
+    },
+    "sinkVertex" : {
+      "datasets" : [ {
+        "name" : "default_catalog.default_database.Snk",
+        "namespace" : "values://AppendingSinkFunction",
+        "objectPath" : {
+          "databaseName" : "default_database",
+          "objectName" : "Snk",
+          "fullName" : "default_database.Snk"
+        },
+        "facets" : { }
+      } ],
+      "modifyType" : "INSERT"
+    }
+  } ],
+  "sources" : [ {
+    "datasets" : [ {
+      "name" : "default_catalog.default_database.Src",
+      "namespace" : "values://FromElementsFunction",
+      "objectPath" : {
+        "databaseName" : "default_database",
+        "objectName" : "Src",
+        "fullName" : "default_database.Src"
+      },
+      "facets" : { }
+    } ],
+    "boundedness" : "BOUNDED"
+  } ],
+  "sinks" : [ {
+    "datasets" : [ {
+      "name" : "default_catalog.default_database.Snk",
+      "namespace" : "values://AppendingSinkFunction",
+      "objectPath" : {
+        "databaseName" : "default_database",
+        "objectName" : "Snk",
+        "fullName" : "default_database.Snk"
+      },
+      "facets" : { }
+    } ],
+    "modifyType" : "INSERT"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/lineage-graph/without-catalog-stream.json
 
b/flink-table/flink-table-planner/src/test/resources/lineage-graph/without-catalog-stream.json
new file mode 100644
index 00000000000..86549608e0f
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/lineage-graph/without-catalog-stream.json
@@ -0,0 +1,56 @@
+{
+  "lineageEdges" : [ {
+    "sourceLineageVertex" : {
+      "datasets" : [ {
+        "name" : "default_catalog.default_database.Src",
+        "namespace" : "values://FromElementsFunction",
+        "objectPath" : {
+          "databaseName" : "default_database",
+          "objectName" : "Src",
+          "fullName" : "default_database.Src"
+        },
+        "facets" : { }
+      } ],
+      "boundedness" : "CONTINUOUS_UNBOUNDED"
+    },
+    "sinkVertex" : {
+      "datasets" : [ {
+        "name" : "default_catalog.default_database.Snk",
+        "namespace" : "values://AppendingSinkFunction",
+        "objectPath" : {
+          "databaseName" : "default_database",
+          "objectName" : "Snk",
+          "fullName" : "default_database.Snk"
+        },
+        "facets" : { }
+      } ],
+      "modifyType" : "INSERT"
+    }
+  } ],
+  "sources" : [ {
+    "datasets" : [ {
+      "name" : "default_catalog.default_database.Src",
+      "namespace" : "values://FromElementsFunction",
+      "objectPath" : {
+        "databaseName" : "default_database",
+        "objectName" : "Src",
+        "fullName" : "default_database.Src"
+      },
+      "facets" : { }
+    } ],
+    "boundedness" : "CONTINUOUS_UNBOUNDED"
+  } ],
+  "sinks" : [ {
+    "datasets" : [ {
+      "name" : "default_catalog.default_database.Snk",
+      "namespace" : "values://AppendingSinkFunction",
+      "objectPath" : {
+        "databaseName" : "default_database",
+        "objectName" : "Snk",
+        "fullName" : "default_database.Snk"
+      },
+      "facets" : { }
+    } ],
+    "modifyType" : "INSERT"
+  } ]
+}
\ No newline at end of file


Reply via email to