This is an automated email from the ASF dual-hosted git repository.

liming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 475e48791b [core] Remove all lineage implementation (#4607)
475e48791b is described below

commit 475e48791b873c516c4c26774cda3b45a268cd70
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Nov 29 20:38:47 2024 +0800

    [core] Remove all lineage implementation (#4607)
---
 .../generated/catalog_configuration.html           |   6 -
 .../java/org/apache/paimon/factories/Factory.java  |   2 +-
 .../apache/paimon/lineage/DataLineageEntity.java   |  33 ----
 .../org/apache/paimon/lineage/LineageMeta.java     | 102 ----------
 .../apache/paimon/lineage/LineageMetaFactory.java  |  37 ----
 .../apache/paimon/lineage/TableLineageEntity.java  |  32 ----
 .../paimon/lineage/TableLineageEntityImpl.java     |  56 ------
 .../org/apache/paimon/options/CatalogOptions.java  |  22 ---
 .../org/apache/paimon/catalog/AbstractCatalog.java |  26 +--
 .../apache/paimon/table/CatalogEnvironment.java    |  18 +-
 .../paimon/table/system/SinkTableLineageTable.java |  64 -------
 .../table/system/SourceTableLineageTable.java      |  64 -------
 .../paimon/table/system/SystemTableLoader.java     |  29 +--
 .../paimon/table/system/TableLineageTable.java     | 168 -----------------
 .../paimon/flink/AbstractFlinkTableFactory.java    |  58 +-----
 .../apache/paimon/flink/CatalogTableITCase.java    |   6 +-
 .../apache/paimon/flink/FlinkLineageITCase.java    | 206 ---------------------
 .../services/org.apache.paimon.factories.Factory   |   3 -
 .../java/org/apache/paimon/hive/HiveCatalog.java   |   3 +-
 19 files changed, 12 insertions(+), 923 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html 
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 6706d5c421..63f7adda1e 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -86,12 +86,6 @@ under the License.
             <td>Boolean</td>
             <td>Whether to support format tables, format table corresponds to 
a regular csv, parquet or orc table, allowing read and write operations. 
However, during these processes, it does not connect to the metastore; hence, 
newly added partitions will not be reflected in the metastore and need to be 
manually added as separate partition operations.</td>
         </tr>
-        <tr>
-            <td><h5>lineage-meta</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>String</td>
-            <td>The lineage meta to store table and data lineage 
information.<br /><br />Possible values:<br /><ul><li>"jdbc": Use standard jdbc 
to store table and data lineage information.</li></ul><ul><li>"custom": You can 
implement LineageMetaFactory and LineageMeta to store lineage information in 
customized storage.</li></ul></td>
-        </tr>
         <tr>
             <td><h5>lock-acquire-timeout</h5></td>
             <td style="word-wrap: break-word;">8 min</td>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java 
b/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java
index b0f1ec84c1..74796879ef 100644
--- a/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java
+++ b/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java
@@ -20,7 +20,7 @@ package org.apache.paimon.factories;
 
 /**
  * Base interface for all kind of factories that create object instances from 
a list of key-value
- * pairs in Paimon's catalog, lineage.
+ * pairs in Paimon's catalog.
  *
  * <p>A factory is uniquely identified by {@link Class} and {@link 
#identifier()}.
  *
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lineage/DataLineageEntity.java 
b/paimon-common/src/main/java/org/apache/paimon/lineage/DataLineageEntity.java
deleted file mode 100644
index e7401a9be3..0000000000
--- 
a/paimon-common/src/main/java/org/apache/paimon/lineage/DataLineageEntity.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.lineage;
-
-import org.apache.paimon.data.Timestamp;
-
-/**
- * Data lineage entity with table lineage, barrier id and snapshot id for 
table source and sink
- * lineage.
- */
-public interface DataLineageEntity extends TableLineageEntity {
-    long getBarrierId();
-
-    long getSnapshotId();
-
-    Timestamp getCreateTime();
-}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMeta.java 
b/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMeta.java
deleted file mode 100644
index 5d1c42daf6..0000000000
--- a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMeta.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.lineage;
-
-import org.apache.paimon.predicate.Predicate;
-
-import javax.annotation.Nullable;
-
-import java.util.Iterator;
-
-/** Metadata store will manage table lineage and data lineage information for 
the catalog. */
-public interface LineageMeta extends AutoCloseable {
-    /**
-     * Save the source table and job lineage.
-     *
-     * @param entity the table lineage entity
-     */
-    void saveSourceTableLineage(TableLineageEntity entity);
-
-    /**
-     * Delete the source table lineage for given job.
-     *
-     * @param job the job for table lineage
-     */
-    void deleteSourceTableLineage(String job);
-
-    /**
-     * Get source table and job lineages.
-     *
-     * @param predicate the predicate for the table lineages
-     * @return the iterator for source table and job lineages
-     */
-    Iterator<TableLineageEntity> sourceTableLineages(@Nullable Predicate 
predicate);
-
-    /**
-     * Save the sink table and job lineage.
-     *
-     * @param entity the table lineage entity
-     */
-    void saveSinkTableLineage(TableLineageEntity entity);
-
-    /**
-     * Get sink table and job lineages.
-     *
-     * @param predicate the predicate for the table lineages
-     * @return the iterator for sink table and job lineages
-     */
-    Iterator<TableLineageEntity> sinkTableLineages(@Nullable Predicate 
predicate);
-
-    /**
-     * Delete the sink table lineage for given job.
-     *
-     * @param job the job for table lineage
-     */
-    void deleteSinkTableLineage(String job);
-
-    /**
-     * Save the source table and job lineage.
-     *
-     * @param entity the data lineage entity
-     */
-    void saveSourceDataLineage(DataLineageEntity entity);
-
-    /**
-     * Get source data and job lineages.
-     *
-     * @param predicate the predicate for the table lineages
-     * @return the iterator for source table and job lineages
-     */
-    Iterator<DataLineageEntity> sourceDataLineages(@Nullable Predicate 
predicate);
-
-    /**
-     * Save the sink table and job lineage.
-     *
-     * @param entity the data lineage entity
-     */
-    void saveSinkDataLineage(DataLineageEntity entity);
-
-    /**
-     * Get sink data and job lineages.
-     *
-     * @param predicate the predicate for the table lineages
-     * @return the iterator for sink table and job lineages
-     */
-    Iterator<DataLineageEntity> sinkDataLineages(@Nullable Predicate 
predicate);
-}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java
deleted file mode 100644
index 11c6d3a117..0000000000
--- 
a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.lineage;
-
-import org.apache.paimon.factories.Factory;
-import org.apache.paimon.options.Options;
-
-import java.io.Serializable;
-
-/** Factory to create {@link LineageMeta}. Each factory should have a unique 
identifier. */
-public interface LineageMetaFactory extends Factory, Serializable {
-
-    LineageMeta create(LineageMetaContext context);
-
-    /**
-     * Context has all options in a catalog and is used in factory to create 
{@link LineageMeta}.
-     */
-    interface LineageMetaContext {
-        Options options();
-    }
-}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntity.java 
b/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntity.java
deleted file mode 100644
index c4312c4eb0..0000000000
--- 
a/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntity.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.lineage;
-
-import org.apache.paimon.data.Timestamp;
-
-/** Table lineage entity with database, table and job for table source and 
sink lineage. */
-public interface TableLineageEntity {
-    String getDatabase();
-
-    String getTable();
-
-    String getJob();
-
-    Timestamp getCreateTime();
-}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntityImpl.java
 
b/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntityImpl.java
deleted file mode 100644
index ef11ee87f1..0000000000
--- 
a/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntityImpl.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.lineage;
-
-import org.apache.paimon.data.Timestamp;
-
-/** Default implementation for {@link TableLineageEntity}. */
-public class TableLineageEntityImpl implements TableLineageEntity {
-    private final String database;
-    private final String table;
-    private final String job;
-    private final Timestamp timestamp;
-
-    public TableLineageEntityImpl(String database, String table, String job, 
Timestamp timestamp) {
-        this.database = database;
-        this.table = table;
-        this.job = job;
-        this.timestamp = timestamp;
-    }
-
-    @Override
-    public String getDatabase() {
-        return database;
-    }
-
-    @Override
-    public String getTable() {
-        return table;
-    }
-
-    @Override
-    public String getJob() {
-        return job;
-    }
-
-    @Override
-    public Timestamp getCreateTime() {
-        return timestamp;
-    }
-}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index f69af2d599..bb8cfae682 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -18,8 +18,6 @@
 
 package org.apache.paimon.options;
 
-import org.apache.paimon.options.description.Description;
-import org.apache.paimon.options.description.TextElement;
 import org.apache.paimon.table.CatalogTableType;
 
 import java.time.Duration;
@@ -130,26 +128,6 @@ public class CatalogOptions {
                     .withDescription(
                             "Controls the max number for snapshots per table 
in the catalog are cached.");
 
-    public static final ConfigOption<String> LINEAGE_META =
-            key("lineage-meta")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "The lineage meta to store table 
and data lineage information.")
-                                    .linebreak()
-                                    .linebreak()
-                                    .text("Possible values:")
-                                    .linebreak()
-                                    .list(
-                                            TextElement.text(
-                                                    "\"jdbc\": Use standard 
jdbc to store table and data lineage information."))
-                                    .list(
-                                            TextElement.text(
-                                                    "\"custom\": You can 
implement LineageMetaFactory and LineageMeta to store lineage information in 
customized storage."))
-                                    .build());
-
     public static final ConfigOption<Boolean> ALLOW_UPPER_CASE =
             ConfigOptions.key("allow-upper-case")
                     .booleanType()
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 16b76513d7..2b277a29b8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -24,7 +24,6 @@ import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.lineage.LineageMetaFactory;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.operation.FileStoreCommit;
@@ -62,7 +61,6 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.CoreOptions.createCommitUser;
 import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
-import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
 import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
@@ -76,19 +74,14 @@ public abstract class AbstractCatalog implements Catalog {
     protected final Map<String, String> tableDefaultOptions;
     protected final Options catalogOptions;
 
-    @Nullable protected final LineageMetaFactory lineageMetaFactory;
-
     protected AbstractCatalog(FileIO fileIO) {
         this.fileIO = fileIO;
-        this.lineageMetaFactory = null;
         this.tableDefaultOptions = new HashMap<>();
         this.catalogOptions = new Options();
     }
 
     protected AbstractCatalog(FileIO fileIO, Options options) {
         this.fileIO = fileIO;
-        this.lineageMetaFactory =
-                findAndCreateLineageMeta(options, 
AbstractCatalog.class.getClassLoader());
         this.tableDefaultOptions = 
Catalog.tableDefaultOptions(options.toMap());
         this.catalogOptions = options;
     }
@@ -377,27 +370,13 @@ public abstract class AbstractCatalog implements Catalog {
     protected abstract void alterTableImpl(Identifier identifier, 
List<SchemaChange> changes)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException;
 
-    @Nullable
-    private LineageMetaFactory findAndCreateLineageMeta(Options options, 
ClassLoader classLoader) {
-        return options.getOptional(LINEAGE_META)
-                .map(
-                        meta ->
-                                FactoryUtil.discoverFactory(
-                                        classLoader, LineageMetaFactory.class, 
meta))
-                .orElse(null);
-    }
-
     @Override
     public Table getTable(Identifier identifier) throws TableNotExistException 
{
         if (isSystemDatabase(identifier.getDatabaseName())) {
             String tableName = identifier.getTableName();
             Table table =
                     SystemTableLoader.loadGlobal(
-                            tableName,
-                            fileIO,
-                            this::allTablePaths,
-                            catalogOptions,
-                            lineageMetaFactory);
+                            tableName, fileIO, this::allTablePaths, 
catalogOptions);
             if (table == null) {
                 throw new TableNotExistException(identifier);
             }
@@ -444,8 +423,7 @@ public abstract class AbstractCatalog implements Catalog {
                                         lockFactory().orElse(null),
                                         lockContext().orElse(null),
                                         identifier),
-                                metastoreClientFactory(identifier, 
tableMeta.schema).orElse(null),
-                                lineageMetaFactory));
+                                metastoreClientFactory(identifier, 
tableMeta.schema).orElse(null)));
         CoreOptions options = table.coreOptions();
         if (options.type() == TableType.OBJECT_TABLE) {
             String objectLocation = options.objectLocation();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java 
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
index 9ff5f9b4f6..a722d9e21a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.table;
 
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.lineage.LineageMetaFactory;
 import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.operation.Lock;
 
@@ -27,10 +26,7 @@ import javax.annotation.Nullable;
 
 import java.io.Serializable;
 
-/**
- * Catalog environment in table which contains log factory, metastore client 
factory and lineage
- * meta.
- */
+/** Catalog environment in table which contains log factory, metastore client 
factory. */
 public class CatalogEnvironment implements Serializable {
 
     private static final long serialVersionUID = 1L;
@@ -39,23 +35,20 @@ public class CatalogEnvironment implements Serializable {
     @Nullable private final String uuid;
     private final Lock.Factory lockFactory;
     @Nullable private final MetastoreClient.Factory metastoreClientFactory;
-    @Nullable private final LineageMetaFactory lineageMetaFactory;
 
     public CatalogEnvironment(
             @Nullable Identifier identifier,
             @Nullable String uuid,
             Lock.Factory lockFactory,
-            @Nullable MetastoreClient.Factory metastoreClientFactory,
-            @Nullable LineageMetaFactory lineageMetaFactory) {
+            @Nullable MetastoreClient.Factory metastoreClientFactory) {
         this.identifier = identifier;
         this.uuid = uuid;
         this.lockFactory = lockFactory;
         this.metastoreClientFactory = metastoreClientFactory;
-        this.lineageMetaFactory = lineageMetaFactory;
     }
 
     public static CatalogEnvironment empty() {
-        return new CatalogEnvironment(null, null, Lock.emptyFactory(), null, 
null);
+        return new CatalogEnvironment(null, null, Lock.emptyFactory(), null);
     }
 
     @Nullable
@@ -76,9 +69,4 @@ public class CatalogEnvironment implements Serializable {
     public MetastoreClient.Factory metastoreClientFactory() {
         return metastoreClientFactory;
     }
-
-    @Nullable
-    public LineageMetaFactory lineageMetaFactory() {
-        return lineageMetaFactory;
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java
deleted file mode 100644
index 71efce0704..0000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.system;
-
-import org.apache.paimon.lineage.LineageMeta;
-import org.apache.paimon.lineage.LineageMetaFactory;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.InnerTableRead;
-
-import java.util.Map;
-
-/**
- * This is a system table to display all the sink table lineages.
- *
- * <pre>
- *  For example:
- *     If we select * from sys.sink_table_lineage, we will get
- *     database_name       table_name       job_name      create_time
- *        default            test0            job1    2023-10-22 20:35:12
- *       database1           test1            job1    2023-10-28 21:35:52
- *          ...               ...             ...             ...
- *     We can write sql to fetch the information we need.
- * </pre>
- */
-public class SinkTableLineageTable extends TableLineageTable {
-
-    public static final String SINK_TABLE_LINEAGE = "sink_table_lineage";
-
-    public SinkTableLineageTable(LineageMetaFactory lineageMetaFactory, 
Options options) {
-        super(lineageMetaFactory, options);
-    }
-
-    @Override
-    public InnerTableRead newRead() {
-        return new TableLineageRead(lineageMetaFactory, options, 
LineageMeta::sinkTableLineages);
-    }
-
-    @Override
-    public String name() {
-        return SINK_TABLE_LINEAGE;
-    }
-
-    @Override
-    public Table copy(Map<String, String> dynamicOptions) {
-        return new SinkTableLineageTable(lineageMetaFactory, options);
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
deleted file mode 100644
index 5d9904fa66..0000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.system;
-
-import org.apache.paimon.lineage.LineageMeta;
-import org.apache.paimon.lineage.LineageMetaFactory;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.InnerTableRead;
-
-import java.util.Map;
-
-/**
- * This is a system table to display all the source table lineages.
- *
- * <pre>
- *  For example:
- *     If we select * from sys.source_table_lineage, we will get
- *     database_name       table_name       job_name      create_time
- *        default            test0            job1    2023-10-22 20:35:12
- *       database1           test1            job1    2023-10-28 21:35:52
- *          ...               ...             ...             ...
- *     We can write sql to fetch the information we need.
- * </pre>
- */
-public class SourceTableLineageTable extends TableLineageTable {
-
-    public static final String SOURCE_TABLE_LINEAGE = "source_table_lineage";
-
-    public SourceTableLineageTable(LineageMetaFactory lineageMetaFactory, 
Options options) {
-        super(lineageMetaFactory, options);
-    }
-
-    @Override
-    public InnerTableRead newRead() {
-        return new TableLineageRead(lineageMetaFactory, options, 
LineageMeta::sourceTableLineages);
-    }
-
-    @Override
-    public String name() {
-        return SOURCE_TABLE_LINEAGE;
-    }
-
-    @Override
-    public Table copy(Map<String, String> dynamicOptions) {
-        return new SourceTableLineageTable(lineageMetaFactory, options);
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 3d5b211316..763e4d1216 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.system;
 
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.lineage.LineageMetaFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
@@ -37,7 +36,6 @@ import java.util.Optional;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
-import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
 import static 
org.apache.paimon.table.system.AggregationFieldsTable.AGGREGATION_FIELDS;
 import static 
org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
 import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG;
@@ -52,12 +50,9 @@ import static 
org.apache.paimon.table.system.OptionsTable.OPTIONS;
 import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS;
 import static org.apache.paimon.table.system.ReadOptimizedTable.READ_OPTIMIZED;
 import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
-import static 
org.apache.paimon.table.system.SinkTableLineageTable.SINK_TABLE_LINEAGE;
 import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
-import static 
org.apache.paimon.table.system.SourceTableLineageTable.SOURCE_TABLE_LINEAGE;
 import static org.apache.paimon.table.system.StatisticTable.STATISTICS;
 import static org.apache.paimon.table.system.TagsTable.TAGS;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** Loader to load system {@link Table}s. */
 public class SystemTableLoader {
@@ -95,38 +90,18 @@ public class SystemTableLoader {
             String tableName,
             FileIO fileIO,
             Supplier<Map<String, Map<String, Path>>> allTablePaths,
-            Options catalogOptions,
-            @Nullable LineageMetaFactory lineageMetaFactory) {
+            Options catalogOptions) {
         switch (tableName.toLowerCase()) {
             case ALL_TABLE_OPTIONS:
                 return new AllTableOptionsTable(fileIO, allTablePaths.get());
             case CATALOG_OPTIONS:
                 return new CatalogOptionsTable(catalogOptions);
-            case SOURCE_TABLE_LINEAGE:
-                {
-                    checkNotNull(
-                            lineageMetaFactory,
-                            String.format(
-                                    "Lineage meta should be configured for 
catalog with %s",
-                                    LINEAGE_META.key()));
-                    return new SourceTableLineageTable(lineageMetaFactory, 
catalogOptions);
-                }
-            case SINK_TABLE_LINEAGE:
-                {
-                    checkNotNull(
-                            lineageMetaFactory,
-                            String.format(
-                                    "Lineage meta should be configured for 
catalog with %s",
-                                    LINEAGE_META.key()));
-                    return new SinkTableLineageTable(lineageMetaFactory, 
catalogOptions);
-                }
             default:
                 return null;
         }
     }
 
     public static List<String> loadGlobalTableNames() {
-        return Arrays.asList(
-                ALL_TABLE_OPTIONS, CATALOG_OPTIONS, SOURCE_TABLE_LINEAGE, 
SINK_TABLE_LINEAGE);
+        return Arrays.asList(ALL_TABLE_OPTIONS, CATALOG_OPTIONS);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java
deleted file mode 100644
index aeaf3ca3b1..0000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.system;
-
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.lineage.LineageMeta;
-import org.apache.paimon.lineage.LineageMetaFactory;
-import org.apache.paimon.lineage.TableLineageEntity;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.ReadonlyTable;
-import org.apache.paimon.table.source.InnerTableRead;
-import org.apache.paimon.table.source.InnerTableScan;
-import org.apache.paimon.table.source.ReadOnceTableScan;
-import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.TimestampType;
-import org.apache.paimon.types.VarCharType;
-import org.apache.paimon.utils.IteratorRecordReader;
-import org.apache.paimon.utils.ProjectedRow;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.function.BiFunction;
-
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
-
-/** Base lineage table for source and sink table lineage. */
-public abstract class TableLineageTable implements ReadonlyTable {
-    protected final LineageMetaFactory lineageMetaFactory;
-    protected final Options options;
-
-    public static final RowType TABLE_TYPE =
-            new RowType(
-                    Arrays.asList(
-                            new DataField(
-                                    0, "database_name", new 
VarCharType(VarCharType.MAX_LENGTH)),
-                            new DataField(1, "table_name", new 
VarCharType(VarCharType.MAX_LENGTH)),
-                            new DataField(2, "job_name", new 
VarCharType(VarCharType.MAX_LENGTH)),
-                            new DataField(3, "create_time", new 
TimestampType())));
-
-    protected TableLineageTable(LineageMetaFactory lineageMetaFactory, Options 
options) {
-        this.lineageMetaFactory = lineageMetaFactory;
-        this.options = options;
-    }
-
-    @Override
-    public InnerTableScan newScan() {
-        return new ReadOnceTableScan() {
-            @Override
-            public InnerTableScan withFilter(Predicate predicate) {
-                return this;
-            }
-
-            @Override
-            protected Plan innerPlan() {
-                /// TODO get the real row count for plan.
-                return () -> Collections.singletonList((Split) () -> 1L);
-            }
-        };
-    }
-
-    @Override
-    public RowType rowType() {
-        return TABLE_TYPE;
-    }
-
-    @Override
-    public List<String> primaryKeys() {
-        return Arrays.asList("database_name", "table_name", "job_name");
-    }
-
-    /** Table lineage read with lineage meta query. */
-    protected static class TableLineageRead implements InnerTableRead {
-        private final LineageMetaFactory lineageMetaFactory;
-        private final Options options;
-        private final BiFunction<LineageMeta, Predicate, 
Iterator<TableLineageEntity>>
-                tableLineageQuery;
-        @Nullable private Predicate predicate;
-        private RowType readType;
-
-        protected TableLineageRead(
-                LineageMetaFactory lineageMetaFactory,
-                Options options,
-                BiFunction<LineageMeta, Predicate, 
Iterator<TableLineageEntity>>
-                        tableLineageQuery) {
-            this.lineageMetaFactory = lineageMetaFactory;
-            this.options = options;
-            this.tableLineageQuery = tableLineageQuery;
-            this.predicate = null;
-        }
-
-        @Override
-        public InnerTableRead withFilter(Predicate predicate) {
-            this.predicate = predicate;
-            return this;
-        }
-
-        @Override
-        public InnerTableRead withReadType(RowType readType) {
-            this.readType = readType;
-            return this;
-        }
-
-        @Override
-        public TableRead withIOManager(IOManager ioManager) {
-            return this;
-        }
-
-        @Override
-        public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
-            try (LineageMeta lineageMeta = lineageMetaFactory.create(() -> 
options)) {
-                Iterator<TableLineageEntity> sourceTableLineages =
-                        tableLineageQuery.apply(lineageMeta, predicate);
-                return new IteratorRecordReader<>(
-                        Iterators.transform(
-                                sourceTableLineages,
-                                entity -> {
-                                    checkNotNull(entity);
-                                    GenericRow row =
-                                            GenericRow.of(
-                                                    
BinaryString.fromString(entity.getDatabase()),
-                                                    
BinaryString.fromString(entity.getTable()),
-                                                    
BinaryString.fromString(entity.getJob()),
-                                                    entity.getCreateTime());
-                                    if (readType != null) {
-                                        return ProjectedRow.from(
-                                                        readType, 
TableLineageTable.TABLE_TYPE)
-                                                .replaceRow(row);
-                                    } else {
-                                        return row;
-                                    }
-                                }));
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 9f90a2cd01..6b10dbb84b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -25,15 +25,10 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
 import org.apache.paimon.flink.sink.FlinkTableSink;
 import org.apache.paimon.flink.source.DataTableSource;
 import org.apache.paimon.flink.source.SystemTableSource;
-import org.apache.paimon.lineage.LineageMeta;
-import org.apache.paimon.lineage.LineageMetaFactory;
-import org.apache.paimon.lineage.TableLineageEntity;
-import org.apache.paimon.lineage.TableLineageEntityImpl;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.options.OptionsUtils;
 import org.apache.paimon.schema.Schema;
@@ -47,7 +42,6 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
-import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.ValidationException;
@@ -71,7 +65,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.function.BiConsumer;
 import java.util.regex.Pattern;
 
 import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
@@ -109,23 +102,9 @@ public abstract class AbstractFlinkTableFactory
                     isStreamingMode,
                     context.getObjectIdentifier());
         } else {
-            Table table = buildPaimonTable(context);
-            if (table instanceof FileStoreTable) {
-                storeTableLineage(
-                        ((FileStoreTable) 
table).catalogEnvironment().lineageMetaFactory(),
-                        context,
-                        (entity, lineageFactory) -> {
-                            try (LineageMeta lineage =
-                                    lineageFactory.create(() -> 
Options.fromMap(table.options()))) {
-                                lineage.saveSourceTableLineage(entity);
-                            } catch (Exception e) {
-                                throw new RuntimeException(e);
-                            }
-                        });
-            }
             return new DataTableSource(
                     context.getObjectIdentifier(),
-                    table,
+                    buildPaimonTable(context),
                     isStreamingMode,
                     context,
                     createOptionalLogStoreFactory(context).orElse(null));
@@ -134,46 +113,13 @@ public abstract class AbstractFlinkTableFactory
 
     @Override
     public DynamicTableSink createDynamicTableSink(Context context) {
-        Table table = buildPaimonTable(context);
-        if (table instanceof FileStoreTable) {
-            storeTableLineage(
-                    ((FileStoreTable) 
table).catalogEnvironment().lineageMetaFactory(),
-                    context,
-                    (entity, lineageFactory) -> {
-                        try (LineageMeta lineage =
-                                lineageFactory.create(() -> 
Options.fromMap(table.options()))) {
-                            lineage.saveSinkTableLineage(entity);
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                    });
-        }
         return new FlinkTableSink(
                 context.getObjectIdentifier(),
-                table,
+                buildPaimonTable(context),
                 context,
                 createOptionalLogStoreFactory(context).orElse(null));
     }
 
-    private void storeTableLineage(
-            @Nullable LineageMetaFactory lineageMetaFactory,
-            Context context,
-            BiConsumer<TableLineageEntity, LineageMetaFactory> tableLineage) {
-        if (lineageMetaFactory != null) {
-            String pipelineName = 
context.getConfiguration().get(PipelineOptions.NAME);
-            if (pipelineName == null) {
-                throw new ValidationException("Cannot get pipeline name for 
lineage meta.");
-            }
-            tableLineage.accept(
-                    new TableLineageEntityImpl(
-                            context.getObjectIdentifier().getDatabaseName(),
-                            context.getObjectIdentifier().getObjectName(),
-                            pipelineName,
-                            
Timestamp.fromEpochMillis(System.currentTimeMillis())),
-                    lineageMetaFactory);
-        }
-    }
-
     @Override
     public Set<ConfigOption<?>> requiredOptions() {
         return Collections.emptySet();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 96334de3f8..10b03b7139 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -21,8 +21,6 @@ package org.apache.paimon.flink;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.table.system.AllTableOptionsTable;
 import org.apache.paimon.table.system.CatalogOptionsTable;
-import org.apache.paimon.table.system.SinkTableLineageTable;
-import org.apache.paimon.table.system.SourceTableLineageTable;
 import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.commons.lang3.StringUtils;
@@ -200,9 +198,7 @@ public class CatalogTableITCase extends CatalogITCaseBase {
         assertThat(sql("SHOW TABLES"))
                 .containsExactlyInAnyOrder(
                         Row.of(AllTableOptionsTable.ALL_TABLE_OPTIONS),
-                        Row.of(CatalogOptionsTable.CATALOG_OPTIONS),
-                        Row.of(SourceTableLineageTable.SOURCE_TABLE_LINEAGE),
-                        Row.of(SinkTableLineageTable.SINK_TABLE_LINEAGE));
+                        Row.of(CatalogOptionsTable.CATALOG_OPTIONS));
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
deleted file mode 100644
index 5b61d5272f..0000000000
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink;
-
-import org.apache.paimon.lineage.DataLineageEntity;
-import org.apache.paimon.lineage.LineageMeta;
-import org.apache.paimon.lineage.LineageMetaFactory;
-import org.apache.paimon.lineage.TableLineageEntity;
-import org.apache.paimon.predicate.Predicate;
-
-import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.api.Test;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** ITCase for flink table and data lineage. */
-public class FlinkLineageITCase extends CatalogITCaseBase {
-    private static final String THROWING_META = "throwing-meta";
-    private static final Map<String, Map<String, TableLineageEntity>> 
jobSourceTableLineages =
-            new HashMap<>();
-    private static final Map<String, Map<String, TableLineageEntity>> 
jobSinkTableLineages =
-            new HashMap<>();
-
-    @Override
-    protected List<String> ddl() {
-        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT, 
b INT, c INT)");
-    }
-
-    @Override
-    protected Map<String, String> catalogOptions() {
-        return Collections.singletonMap(LINEAGE_META.key(), THROWING_META);
-    }
-
-    @Test
-    public void testTableLineage() throws Exception {
-        // Validate for source and sink lineage when pipeline name is null
-        assertThatThrownBy(
-                        () -> tEnv.executeSql("INSERT INTO T VALUES (1, 2, 
3),(4, 5, 6);").await())
-                .hasCauseExactlyInstanceOf(ValidationException.class)
-                .hasRootCauseMessage("Cannot get pipeline name for lineage 
meta.");
-        assertThatThrownBy(() -> tEnv.executeSql("SELECT * FROM 
T").collect().close())
-                .hasCauseExactlyInstanceOf(ValidationException.class)
-                .hasRootCauseMessage("Cannot get pipeline name for lineage 
meta.");
-
-        // Call storeSinkTableLineage and storeSourceTableLineage methods
-        tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME, 
"insert_t_job");
-        tEnv.executeSql("INSERT INTO T VALUES (1, 2, 3),(4, 5, 6);").await();
-        assertThat(jobSinkTableLineages).isNotEmpty();
-        TableLineageEntity sinkTableLineage =
-                
jobSinkTableLineages.get("insert_t_job").get("default.T.insert_t_job");
-        assertThat(sinkTableLineage.getTable()).isEqualTo("T");
-
-        List<Row> sinkTableRows = new ArrayList<>();
-        try (CloseableIterator<Row> iterator =
-                tEnv.executeSql("SELECT * FROM 
sys.sink_table_lineage").collect()) {
-            while (iterator.hasNext()) {
-                sinkTableRows.add(iterator.next());
-            }
-        }
-        assertThat(sinkTableRows.size()).isEqualTo(1);
-        Row sinkTableRow = sinkTableRows.get(0);
-        
assertThat(sinkTableRow.getField("database_name")).isEqualTo("default");
-        assertThat(sinkTableRow.getField("table_name")).isEqualTo("T");
-        
assertThat(sinkTableRow.getField("job_name")).isEqualTo("insert_t_job");
-
-        tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME, 
"select_t_job");
-        tEnv.executeSql("SELECT * FROM T").collect().close();
-        assertThat(jobSourceTableLineages).isNotEmpty();
-        TableLineageEntity sourceTableLineage =
-                
jobSourceTableLineages.get("select_t_job").get("default.T.select_t_job");
-        assertThat(sourceTableLineage.getTable()).isEqualTo("T");
-
-        List<Row> sourceTableRows = new ArrayList<>();
-        try (CloseableIterator<Row> iterator =
-                tEnv.executeSql("SELECT * FROM 
sys.source_table_lineage").collect()) {
-            while (iterator.hasNext()) {
-                sourceTableRows.add(iterator.next());
-            }
-        }
-        assertThat(sourceTableRows.size()).isEqualTo(1);
-        Row sourceTableRow = sourceTableRows.get(0);
-        
assertThat(sourceTableRow.getField("database_name")).isEqualTo("default");
-        assertThat(sourceTableRow.getField("table_name")).isEqualTo("T");
-        
assertThat(sourceTableRow.getField("job_name")).isEqualTo("select_t_job");
-    }
-
-    private static String getTableLineageKey(TableLineageEntity entity) {
-        return String.format("%s.%s.%s", entity.getDatabase(), 
entity.getTable(), entity.getJob());
-    }
-
-    /** Factory to create throwing lineage meta. */
-    public static class TestingMemoryLineageMetaFactory implements 
LineageMetaFactory {
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public String identifier() {
-            return THROWING_META;
-        }
-
-        @Override
-        public LineageMeta create(LineageMetaContext context) {
-            return new TestingMemoryLineageMeta();
-        }
-    }
-
-    /** Throwing specific exception in each method. */
-    private static class TestingMemoryLineageMeta implements LineageMeta {
-
-        @Override
-        public void saveSourceTableLineage(TableLineageEntity entity) {
-            jobSourceTableLineages
-                    .computeIfAbsent(entity.getJob(), key -> new HashMap<>())
-                    .put(getTableLineageKey(entity), entity);
-        }
-
-        @Override
-        public void deleteSourceTableLineage(String job) {
-            jobSourceTableLineages.remove(job);
-        }
-
-        @Override
-        public Iterator<TableLineageEntity> sourceTableLineages(@Nullable 
Predicate predicate) {
-            return jobSourceTableLineages.values().stream()
-                    .flatMap(v -> v.values().stream())
-                    .iterator();
-        }
-
-        @Override
-        public void saveSinkTableLineage(TableLineageEntity entity) {
-            assertThat(entity.getJob()).isEqualTo("insert_t_job");
-            assertThat(entity.getTable()).isEqualTo("T");
-            assertThat(entity.getDatabase()).isEqualTo("default");
-            jobSinkTableLineages
-                    .computeIfAbsent(entity.getJob(), key -> new HashMap<>())
-                    .put(getTableLineageKey(entity), entity);
-        }
-
-        @Override
-        public Iterator<TableLineageEntity> sinkTableLineages(@Nullable 
Predicate predicate) {
-            return jobSinkTableLineages.values().stream()
-                    .flatMap(v -> v.values().stream())
-                    .iterator();
-        }
-
-        @Override
-        public void deleteSinkTableLineage(String job) {
-            jobSinkTableLineages.remove(job);
-        }
-
-        @Override
-        public void saveSourceDataLineage(DataLineageEntity entity) {
-            assertThat(entity.getJob()).isEqualTo("select_t_job");
-            assertThat(entity.getTable()).isEqualTo("T");
-            assertThat(entity.getDatabase()).isEqualTo("default");
-            throw new UnsupportedOperationException("Method 
saveSinkTableLineage is not supported");
-        }
-
-        @Override
-        public Iterator<DataLineageEntity> sourceDataLineages(@Nullable 
Predicate predicate) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void saveSinkDataLineage(DataLineageEntity entity) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public Iterator<DataLineageEntity> sinkDataLineages(@Nullable 
Predicate predicate) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void close() throws Exception {}
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
index fcb6fe9829..3c05b5fba3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -15,8 +15,5 @@
 
 org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory
 
-# Lineage meta factory
-org.apache.paimon.flink.FlinkLineageITCase$TestingMemoryLineageMetaFactory
-
 # Catalog lock factory
 
org.apache.paimon.flink.FileSystemCatalogITCase$FileSystemCatalogDummyLockFactory
\ No newline at end of file
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 9a90995f28..5157e60600 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -610,8 +610,7 @@ public class HiveCatalog extends AbstractCatalog {
                                     lockFactory().orElse(null),
                                     lockContext().orElse(null),
                                     identifier),
-                            metastoreClientFactory(identifier, 
tableMeta.schema()).orElse(null),
-                            lineageMetaFactory));
+                            metastoreClientFactory(identifier, 
tableMeta.schema()).orElse(null)));
         } catch (TableNotExistException ignore) {
         }
 

Reply via email to