This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8979b7acf [core][lineage] Introduce lineage meta interfaces (#1531)
8979b7acf is described below

commit 8979b7acfdf0c10f6526ee66f6306b83f5895729
Author: Shammon FY <[email protected]>
AuthorDate: Wed Aug 2 11:19:54 2023 +0800

    [core][lineage] Introduce lineage meta interfaces (#1531)
    
    * [core][lineage] Introduce lineage meta interfaces
---
 .../generated/catalog_configuration.html           |   6 +
 .../apache/paimon/lineage/DataLineageEntity.java   |  33 +++++
 .../org/apache/paimon/lineage/LineageMeta.java     | 103 ++++++++++++++
 .../apache/paimon/lineage/LineageMetaFactory.java  |  35 +++++
 .../apache/paimon/lineage/TableLineageEntity.java  |  32 +++++
 .../paimon/lineage/TableLineageEntityImpl.java     |  56 ++++++++
 .../org/apache/paimon/options/CatalogOptions.java  |  22 +++
 .../org/apache/paimon/catalog/AbstractCatalog.java |  32 ++++-
 .../paimon/table/AbstractFileStoreTable.java       |  27 ++--
 .../paimon/table/AppendOnlyFileStoreTable.java     |  13 +-
 .../apache/paimon/table/CatalogEnvironment.java    |  63 +++++++++
 .../table/ChangelogValueCountFileStoreTable.java   |  12 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |  13 +-
 .../org/apache/paimon/table/FileStoreTable.java    |   2 +
 .../apache/paimon/table/FileStoreTableFactory.java |  40 +++---
 .../apache/paimon/table/sink/TableCommitTest.java  |   4 +-
 ...tinuousAppendAndCompactFollowUpScannerTest.java |   7 +-
 .../table/source/snapshot/ScannerTestBase.java     |   7 +-
 .../paimon/flink/AbstractFlinkTableFactory.java    |  43 +++++-
 .../org/apache/paimon/flink/CatalogITCaseBase.java |  20 ++-
 .../apache/paimon/flink/FlinkLineageITCase.java    | 155 +++++++++++++++++++++
 .../apache/paimon/flink/sink/FlinkSinkTest.java    |   4 +-
 .../services/org.apache.paimon.factories.Factory   |   5 +-
 23 files changed, 658 insertions(+), 76 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html 
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 118ecc44e..d243737a8 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -32,6 +32,12 @@ under the License.
             <td>Boolean</td>
             <td>Allow to fallback to hadoop File IO when no file io found for 
the scheme.</td>
         </tr>
+        <tr>
+            <td><h5>lineage-meta</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The lineage meta to store table and data lineage 
information.<br /><br />Possible values:<br /><ul><li>"jdbc": Use standard jdbc 
to store table and data lineage information.</li></ul><ul><li>"custom": You can 
implement LineageMetaFactory and LineageMeta to store lineage information in 
customized storage.</li></ul></td>
+        </tr>
         <tr>
             <td><h5>lock-acquire-timeout</h5></td>
             <td style="word-wrap: break-word;">8 min</td>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lineage/DataLineageEntity.java 
b/paimon-common/src/main/java/org/apache/paimon/lineage/DataLineageEntity.java
new file mode 100644
index 000000000..e7401a9be
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lineage/DataLineageEntity.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lineage;
+
+import org.apache.paimon.data.Timestamp;
+
+/**
+ * Data lineage entity with table lineage, barrier id and snapshot id for 
table source and sink
+ * lineage.
+ */
+public interface DataLineageEntity extends TableLineageEntity {
+    long getBarrierId();
+
+    long getSnapshotId();
+
+    Timestamp getCreateTime();
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMeta.java 
b/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMeta.java
new file mode 100644
index 000000000..3fce04aaa
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMeta.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lineage;
+
+import org.apache.paimon.predicate.Predicate;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/** Metadata store will manage table lineage and data lineage information for 
the catalog. */
+public interface LineageMeta extends Serializable {
+    /**
+     * Save the source table and job lineage.
+     *
+     * @param entity the table lineage entity
+     */
+    void saveSourceTableLineage(TableLineageEntity entity);
+
+    /**
+     * Delete the source table lineage for given job.
+     *
+     * @param job the job for table lineage
+     */
+    void deleteSourceTableLineage(String job);
+
+    /**
+     * Get source table and job lineages.
+     *
+     * @param predicate the predicate for the table lineages
+     * @return the iterator for source table and job lineages
+     */
+    Iterator<TableLineageEntity> sourceTableLineages(@Nullable Predicate 
predicate);
+
+    /**
+     * Save the sink table and job lineage.
+     *
+     * @param entity the table lineage entity
+     */
+    void saveSinkTableLineage(TableLineageEntity entity);
+
+    /**
+     * Get sink table and job lineages.
+     *
+     * @param predicate the predicate for the table lineages
+     * @return the iterator for sink table and job lineages
+     */
+    Iterator<TableLineageEntity> sinkTableLineages(@Nullable Predicate 
predicate);
+
+    /**
+     * Delete the sink table lineage for given job.
+     *
+     * @param job the job for table lineage
+     */
+    void deleteSinkTableLineage(String job);
+
+    /**
+     * Save the source table and job lineage.
+     *
+     * @param entity the data lineage entity
+     */
+    void saveSourceDataLineage(DataLineageEntity entity);
+
+    /**
+     * Get source data and job lineages.
+     *
+     * @param predicate the predicate for the table lineages
+     * @return the iterator for source table and job lineages
+     */
+    Iterator<DataLineageEntity> sourceDataLineages(@Nullable Predicate 
predicate);
+
+    /**
+     * Save the sink table and job lineage.
+     *
+     * @param entity the data lineage entity
+     */
+    void saveSinkDataLineage(DataLineageEntity entity);
+
+    /**
+     * Get sink data and job lineages.
+     *
+     * @param predicate the predicate for the table lineages
+     * @return the iterator for sink table and job lineages
+     */
+    Iterator<DataLineageEntity> sinkDataLineages(@Nullable Predicate 
predicate);
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java
new file mode 100644
index 000000000..5cb7f8bae
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lineage;
+
+import org.apache.paimon.factories.Factory;
+import org.apache.paimon.options.Options;
+
+/** Factory to create {@link LineageMeta}. Each factory should have a unique 
identifier. */
+public interface LineageMetaFactory extends Factory {
+
+    LineageMeta create(LineageMetaContext context);
+
+    /**
+     * Context has all options in a catalog and is used in factory to create 
{@link LineageMeta}.
+     */
+    interface LineageMetaContext {
+        Options options();
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntity.java 
b/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntity.java
new file mode 100644
index 000000000..c4312c4eb
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntity.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lineage;
+
+import org.apache.paimon.data.Timestamp;
+
+/** Table lineage entity with database, table and job for table source and 
sink lineage. */
+public interface TableLineageEntity {
+    String getDatabase();
+
+    String getTable();
+
+    String getJob();
+
+    Timestamp getCreateTime();
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntityImpl.java
 
b/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntityImpl.java
new file mode 100644
index 000000000..ef11ee87f
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntityImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lineage;
+
+import org.apache.paimon.data.Timestamp;
+
+/** Default implementation for {@link TableLineageEntity}. */
+public class TableLineageEntityImpl implements TableLineageEntity {
+    private final String database;
+    private final String table;
+    private final String job;
+    private final Timestamp timestamp;
+
+    public TableLineageEntityImpl(String database, String table, String job, 
Timestamp timestamp) {
+        this.database = database;
+        this.table = table;
+        this.job = job;
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public String getDatabase() {
+        return database;
+    }
+
+    @Override
+    public String getTable() {
+        return table;
+    }
+
+    @Override
+    public String getJob() {
+        return job;
+    }
+
+    @Override
+    public Timestamp getCreateTime() {
+        return timestamp;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index e853b67c5..0fba499dd 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.options;
 
+import org.apache.paimon.options.description.Description;
+import org.apache.paimon.options.description.TextElement;
 import org.apache.paimon.table.TableType;
 
 import java.time.Duration;
@@ -75,4 +77,24 @@ public class CatalogOptions {
                     .defaultValue(true)
                     .withDescription(
                             "Allow to fallback to hadoop File IO when no file 
io found for the scheme.");
+
+    public static final ConfigOption<String> LINEAGE_META =
+            key("lineage-meta")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The lineage meta to store table 
and data lineage information.")
+                                    .linebreak()
+                                    .linebreak()
+                                    .text("Possible values:")
+                                    .linebreak()
+                                    .list(
+                                            TextElement.text(
+                                                    "\"jdbc\": Use standard 
jdbc to store table and data lineage information."))
+                                    .list(
+                                            TextElement.text(
+                                                    "\"custom\": You can 
implement LineageMetaFactory and LineageMeta to store lineage information in 
customized storage."))
+                                    .build());
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 38d3eef20..f958adda0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -19,10 +19,15 @@
 package org.apache.paimon.catalog;
 
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.lineage.LineageMeta;
+import org.apache.paimon.lineage.LineageMetaFactory;
 import org.apache.paimon.operation.Lock;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.Table;
@@ -30,11 +35,15 @@ import org.apache.paimon.table.system.AllTableOptionsTable;
 import org.apache.paimon.table.system.SystemTableLoader;
 import org.apache.paimon.utils.StringUtils;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
+
 /** Common implementation of {@link Catalog}. */
 public abstract class AbstractCatalog implements Catalog {
 
@@ -46,13 +55,19 @@ public abstract class AbstractCatalog implements Catalog {
     protected final FileIO fileIO;
     protected final Map<String, String> tableDefaultOptions;
 
+    @Nullable protected final LineageMeta lineageMeta;
+
     protected AbstractCatalog(FileIO fileIO) {
         this.fileIO = fileIO;
+        this.lineageMeta = null;
         this.tableDefaultOptions = new HashMap<>();
     }
 
     protected AbstractCatalog(FileIO fileIO, Map<String, String> options) {
         this.fileIO = fileIO;
+        this.lineageMeta =
+                findAndCreateLineageMeta(
+                        Options.fromMap(options), 
AbstractCatalog.class.getClassLoader());
         this.tableDefaultOptions = new HashMap<>();
 
         options.keySet().stream()
@@ -64,6 +79,17 @@ public abstract class AbstractCatalog implements Catalog {
                                         options.get(key)));
     }
 
+    @Nullable
+    private LineageMeta findAndCreateLineageMeta(Options options, ClassLoader 
classLoader) {
+        return options.getOptional(LINEAGE_META)
+                .map(
+                        meta ->
+                                FactoryUtil.discoverFactory(
+                                                classLoader, 
LineageMetaFactory.class, meta)
+                                        .create(() -> options))
+                .orElse(null);
+    }
+
     @Override
     public Table getTable(Identifier identifier) throws TableNotExistException 
{
         if (isSystemDatabase(identifier.getDatabaseName())) {
@@ -95,8 +121,10 @@ public abstract class AbstractCatalog implements Catalog {
                 fileIO,
                 getDataTableLocation(identifier),
                 tableSchema,
-                Lock.factory(lockFactory().orElse(null), identifier),
-                metastoreClientFactory(identifier).orElse(null));
+                new CatalogEnvironment(
+                        Lock.factory(lockFactory().orElse(null), identifier),
+                        metastoreClientFactory(identifier).orElse(null),
+                        lineageMeta));
     }
 
     @VisibleForTesting
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 15bb81e61..597943b20 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -26,10 +26,8 @@ import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.metastore.AddPartitionCommitCallback;
-import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.operation.FileStoreScan;
-import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
@@ -57,8 +55,6 @@ import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
@@ -80,15 +76,13 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
     protected final FileIO fileIO;
     protected final Path path;
     protected final TableSchema tableSchema;
-    protected final Lock.Factory lockFactory;
-    @Nullable protected final MetastoreClient.Factory metastoreClientFactory;
+    protected final CatalogEnvironment catalogEnvironment;
 
     public AbstractFileStoreTable(
             FileIO fileIO,
             Path path,
             TableSchema tableSchema,
-            Lock.Factory lockFactory,
-            @Nullable MetastoreClient.Factory metastoreClientFactory) {
+            CatalogEnvironment catalogEnvironment) {
         this.fileIO = fileIO;
         this.path = path;
         if (!tableSchema.options().containsKey(PATH.key())) {
@@ -98,8 +92,7 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
             tableSchema = tableSchema.copy(newOptions);
         }
         this.tableSchema = tableSchema;
-        this.lockFactory = lockFactory;
-        this.metastoreClientFactory = metastoreClientFactory;
+        this.catalogEnvironment = catalogEnvironment;
     }
 
     @Override
@@ -107,6 +100,11 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
         return store().bucketMode();
     }
 
+    @Override
+    public CatalogEnvironment catalogEnvironment() {
+        return catalogEnvironment;
+    }
+
     public RowKeyExtractor createRowKeyExtractor() {
         switch (bucketMode()) {
             case FIXED:
@@ -263,15 +261,18 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 coreOptions().writeOnly() ? null : store().newExpire(),
                 coreOptions().writeOnly() ? null : 
store().newPartitionExpire(commitUser),
                 coreOptions().writeOnly() ? null : 
store().newTagCreationManager(),
-                lockFactory.create(),
+                catalogEnvironment.lockFactory().create(),
                 CoreOptions.fromMap(options()).consumerExpireTime(),
                 new ConsumerManager(fileIO, path));
     }
 
     private List<CommitCallback> createCommitCallbacks() {
         List<CommitCallback> callbacks = new 
ArrayList<>(loadCommitCallbacks());
-        if (coreOptions().partitionedTableInMetastore() && 
metastoreClientFactory != null) {
-            callbacks.add(new 
AddPartitionCommitCallback(metastoreClientFactory.create()));
+        if (coreOptions().partitionedTableInMetastore()
+                && catalogEnvironment.metastoreClientFactory() != null) {
+            callbacks.add(
+                    new AddPartitionCommitCallback(
+                            
catalogEnvironment.metastoreClientFactory().create()));
         }
         return callbacks;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 787dbeff1..630a100e2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -26,7 +26,6 @@ import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestCacheFilter;
-import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.operation.AppendOnlyFileStoreRead;
 import org.apache.paimon.operation.AppendOnlyFileStoreScan;
 import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
@@ -45,8 +44,6 @@ import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.Preconditions;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.function.BiConsumer;
 
@@ -58,22 +55,20 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
     private transient AppendOnlyFileStore lazyStore;
 
     AppendOnlyFileStoreTable(FileIO fileIO, Path path, TableSchema 
tableSchema) {
-        this(fileIO, path, tableSchema, Lock.emptyFactory(), null);
+        this(fileIO, path, tableSchema, new 
CatalogEnvironment(Lock.emptyFactory(), null, null));
     }
 
     AppendOnlyFileStoreTable(
             FileIO fileIO,
             Path path,
             TableSchema tableSchema,
-            Lock.Factory lockFactory,
-            @Nullable MetastoreClient.Factory metastoreClientFactory) {
-        super(fileIO, path, tableSchema, lockFactory, metastoreClientFactory);
+            CatalogEnvironment catalogEnvironment) {
+        super(fileIO, path, tableSchema, catalogEnvironment);
     }
 
     @Override
     protected FileStoreTable copy(TableSchema newTableSchema) {
-        return new AppendOnlyFileStoreTable(
-                fileIO, path, newTableSchema, lockFactory, 
metastoreClientFactory);
+        return new AppendOnlyFileStoreTable(fileIO, path, newTableSchema, 
catalogEnvironment);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java 
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
new file mode 100644
index 000000000..3fdaea2da
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.lineage.LineageMeta;
+import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.operation.Lock;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+/**
+ * Catalog environment in table which contains log factory, metastore client 
factory and lineage
+ * meta.
+ */
+public class CatalogEnvironment implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Lock.Factory lockFactory;
+    @Nullable private final MetastoreClient.Factory metastoreClientFactory;
+    @Nullable private final LineageMeta lineageMeta;
+
+    public CatalogEnvironment(
+            Lock.Factory lockFactory,
+            @Nullable MetastoreClient.Factory metastoreClientFactory,
+            @Nullable LineageMeta lineageMeta) {
+        this.lockFactory = lockFactory;
+        this.metastoreClientFactory = metastoreClientFactory;
+        this.lineageMeta = lineageMeta;
+    }
+
+    public Lock.Factory lockFactory() {
+        return lockFactory;
+    }
+
+    @Nullable
+    public MetastoreClient.Factory metastoreClientFactory() {
+        return metastoreClientFactory;
+    }
+
+    @Nullable
+    public LineageMeta lineageMeta() {
+        return lineageMeta;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
index d515c5d7d..035816e1c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
@@ -29,7 +29,6 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.mergetree.compact.ValueCountMergeFunction;
-import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.KeyValueFileStoreScan;
 import org.apache.paimon.operation.Lock;
@@ -49,8 +48,6 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 
-import javax.annotation.Nullable;
-
 import java.util.Collections;
 import java.util.List;
 import java.util.function.BiConsumer;
@@ -64,22 +61,21 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
     private transient KeyValueFileStore lazyStore;
 
     ChangelogValueCountFileStoreTable(FileIO fileIO, Path path, TableSchema 
tableSchema) {
-        this(fileIO, path, tableSchema, Lock.emptyFactory(), null);
+        this(fileIO, path, tableSchema, new 
CatalogEnvironment(Lock.emptyFactory(), null, null));
     }
 
     ChangelogValueCountFileStoreTable(
             FileIO fileIO,
             Path path,
             TableSchema tableSchema,
-            Lock.Factory lockFactory,
-            @Nullable MetastoreClient.Factory metastoreClientFactory) {
-        super(fileIO, path, tableSchema, lockFactory, metastoreClientFactory);
+            CatalogEnvironment catalogEnvironment) {
+        super(fileIO, path, tableSchema, catalogEnvironment);
     }
 
     @Override
     protected FileStoreTable copy(TableSchema newTableSchema) {
         return new ChangelogValueCountFileStoreTable(
-                fileIO, path, newTableSchema, lockFactory, 
metastoreClientFactory);
+                fileIO, path, newTableSchema, catalogEnvironment);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index 8c764bca4..624d9fc41 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -33,7 +33,6 @@ import 
org.apache.paimon.mergetree.compact.LookupMergeFunction;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction;
 import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
-import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.KeyValueFileStoreScan;
 import org.apache.paimon.operation.Lock;
@@ -52,8 +51,6 @@ import 
org.apache.paimon.table.source.ValueContentRowDataRecordIterator;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 
-import javax.annotation.Nullable;
-
 import java.util.List;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
@@ -71,22 +68,20 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
     private transient KeyValueFileStore lazyStore;
 
     ChangelogWithKeyFileStoreTable(FileIO fileIO, Path path, TableSchema 
tableSchema) {
-        this(fileIO, path, tableSchema, Lock.emptyFactory(), null);
+        this(fileIO, path, tableSchema, new 
CatalogEnvironment(Lock.emptyFactory(), null, null));
     }
 
     ChangelogWithKeyFileStoreTable(
             FileIO fileIO,
             Path path,
             TableSchema tableSchema,
-            Lock.Factory lockFactory,
-            @Nullable MetastoreClient.Factory metastoreClientFactory) {
-        super(fileIO, path, tableSchema, lockFactory, metastoreClientFactory);
+            CatalogEnvironment catalogEnvironment) {
+        super(fileIO, path, tableSchema, catalogEnvironment);
     }
 
     @Override
     protected FileStoreTable copy(TableSchema newTableSchema) {
-        return new ChangelogWithKeyFileStoreTable(
-                fileIO, path, newTableSchema, lockFactory, 
metastoreClientFactory);
+        return new ChangelogWithKeyFileStoreTable(fileIO, path, 
newTableSchema, catalogEnvironment);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 5206c00c6..860bb91be 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -74,6 +74,8 @@ public interface FileStoreTable extends DataTable {
 
     BucketMode bucketMode();
 
+    CatalogEnvironment catalogEnvironment();
+
     @Override
     FileStoreTable copy(Map<String, String> dynamicOptions);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index 7427d1549..9f58a19c5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -23,14 +23,11 @@ import org.apache.paimon.WriteMode;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.io.UncheckedIOException;
 
@@ -66,21 +63,29 @@ public class FileStoreTableFactory {
                                                 "Schema file not found in 
location "
                                                         + tablePath
                                                         + ". Please create 
table first."));
-        return create(fileIO, tablePath, tableSchema, options, 
Lock.emptyFactory(), null);
+        return create(
+                fileIO,
+                tablePath,
+                tableSchema,
+                options,
+                new CatalogEnvironment(Lock.emptyFactory(), null, null));
     }
 
     public static FileStoreTable create(FileIO fileIO, Path tablePath, 
TableSchema tableSchema) {
-        return create(fileIO, tablePath, tableSchema, new Options(), 
Lock.emptyFactory(), null);
+        return create(
+                fileIO,
+                tablePath,
+                tableSchema,
+                new Options(),
+                new CatalogEnvironment(Lock.emptyFactory(), null, null));
     }
 
     public static FileStoreTable create(
             FileIO fileIO,
             Path tablePath,
             TableSchema tableSchema,
-            Lock.Factory lockFactory,
-            @Nullable MetastoreClient.Factory metastoreClientFactory) {
-        return create(
-                fileIO, tablePath, tableSchema, new Options(), lockFactory, 
metastoreClientFactory);
+            CatalogEnvironment catalogEnvironment) {
+        return create(fileIO, tablePath, tableSchema, new Options(), 
catalogEnvironment);
     }
 
     public static FileStoreTable create(
@@ -88,8 +93,7 @@ public class FileStoreTableFactory {
             Path tablePath,
             TableSchema tableSchema,
             Options dynamicOptions,
-            Lock.Factory lockFactory,
-            @Nullable MetastoreClient.Factory metastoreClientFactory) {
+            CatalogEnvironment catalogEnvironment) {
         FileStoreTable table;
         Options coreOptions = Options.fromMap(tableSchema.options());
         WriteMode writeMode = coreOptions.get(CoreOptions.WRITE_MODE);
@@ -103,24 +107,16 @@ public class FileStoreTableFactory {
         if (writeMode == WriteMode.APPEND_ONLY) {
             table =
                     new AppendOnlyFileStoreTable(
-                            fileIO, tablePath, tableSchema, lockFactory, 
metastoreClientFactory);
+                            fileIO, tablePath, tableSchema, 
catalogEnvironment);
         } else {
             if (tableSchema.primaryKeys().isEmpty()) {
                 table =
                         new ChangelogValueCountFileStoreTable(
-                                fileIO,
-                                tablePath,
-                                tableSchema,
-                                lockFactory,
-                                metastoreClientFactory);
+                                fileIO, tablePath, tableSchema, 
catalogEnvironment);
             } else {
                 table =
                         new ChangelogWithKeyFileStoreTable(
-                                fileIO,
-                                tablePath,
-                                tableSchema,
-                                lockFactory,
-                                metastoreClientFactory);
+                                fileIO, tablePath, tableSchema, 
catalogEnvironment);
             }
         }
         return table.copy(dynamicOptions.toMap());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index fff72a787..d6847cb34 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -30,6 +30,7 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.types.DataType;
@@ -112,8 +113,7 @@ public class TableCommitTest {
                         new FailingFileIO(),
                         new Path(path),
                         tableSchema,
-                        Lock.emptyFactory(),
-                        null);
+                        new CatalogEnvironment(Lock.emptyFactory(), null, 
null));
 
         String commitUser = UUID.randomUUID().toString();
         StreamTableWrite write = table.newWrite(commitUser);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScannerTest.java
index 9b498547d..e2b5d7362 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousAppendAndCompactFollowUpScannerTest.java
@@ -26,6 +26,7 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.sink.CommitMessage;
@@ -144,6 +145,10 @@ public class ContinuousAppendAndCompactFollowUpScannerTest 
extends ScannerTestBa
                                 conf.toMap(),
                                 ""));
         return FileStoreTableFactory.create(
-                fileIO, tablePath, tableSchema, conf, Lock.emptyFactory(), 
null);
+                fileIO,
+                tablePath,
+                tableSchema,
+                conf,
+                new CatalogEnvironment(Lock.emptyFactory(), null, null));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
index 04eea4393..df1d2b533 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
@@ -33,6 +33,7 @@ import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.source.DataSplit;
@@ -138,7 +139,11 @@ public abstract class ScannerTestBase {
                                 conf.toMap(),
                                 ""));
         return FileStoreTableFactory.create(
-                fileIO, tablePath, tableSchema, conf, Lock.emptyFactory(), 
null);
+                fileIO,
+                tablePath,
+                tableSchema,
+                conf,
+                new CatalogEnvironment(Lock.emptyFactory(), null, null));
     }
 
     protected List<Split> toSplits(List<DataSplit> dataSplits) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 4ac6d542a..a9d72f003 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -23,6 +23,7 @@ import org.apache.paimon.CoreOptions.LogConsistency;
 import org.apache.paimon.CoreOptions.StreamingReadMode;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
 import org.apache.paimon.flink.sink.FlinkTableSink;
 import org.apache.paimon.flink.source.DataTableSource;
@@ -30,8 +31,12 @@ import org.apache.paimon.flink.source.SystemTableSource;
 import org.apache.paimon.flink.source.table.PushedRichTableSource;
 import org.apache.paimon.flink.source.table.PushedTableSource;
 import org.apache.paimon.flink.source.table.RichTableSource;
+import org.apache.paimon.lineage.LineageMeta;
+import org.apache.paimon.lineage.TableLineageEntity;
+import org.apache.paimon.lineage.TableLineageEntityImpl;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.Preconditions;
@@ -39,6 +44,7 @@ import org.apache.paimon.utils.Preconditions;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -48,12 +54,15 @@ import 
org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.types.logical.RowType;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.BiConsumer;
 
 import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
@@ -82,6 +91,12 @@ public abstract class AbstractFlinkTableFactory
                     new SystemTableSource(((SystemCatalogTable) 
origin).table(), isStreamingMode));
         } else {
             Table table = buildPaimonTable(context);
+            if (table instanceof FileStoreTable) {
+                storeTableLineage(
+                        ((FileStoreTable) 
table).catalogEnvironment().lineageMeta(),
+                        context,
+                        (entity, lineage) -> 
lineage.saveSourceTableLineage(entity));
+            }
             DataTableSource source =
                     new DataTableSource(
                             context.getObjectIdentifier(),
@@ -97,13 +112,39 @@ public abstract class AbstractFlinkTableFactory
 
     @Override
     public DynamicTableSink createDynamicTableSink(Context context) {
+        Table table = buildPaimonTable(context);
+        if (table instanceof FileStoreTable) {
+            storeTableLineage(
+                    ((FileStoreTable) 
table).catalogEnvironment().lineageMeta(),
+                    context,
+                    (entity, lineage) -> lineage.saveSinkTableLineage(entity));
+        }
         return new FlinkTableSink(
                 context.getObjectIdentifier(),
-                buildPaimonTable(context),
+                table,
                 context,
                 createOptionalLogStoreFactory(context).orElse(null));
     }
 
+    private void storeTableLineage(
+            @Nullable LineageMeta lineageMeta,
+            Context context,
+            BiConsumer<TableLineageEntity, LineageMeta> tableLineage) {
+        if (lineageMeta != null) {
+            String pipelineName = 
context.getConfiguration().get(PipelineOptions.NAME);
+            if (pipelineName == null) {
+                throw new ValidationException("Cannot get pipeline name for 
lineage meta.");
+            }
+            tableLineage.accept(
+                    new TableLineageEntityImpl(
+                            context.getObjectIdentifier().getDatabaseName(),
+                            context.getObjectIdentifier().getObjectName(),
+                            pipelineName,
+                            
Timestamp.fromEpochMillis(System.currentTimeMillis())),
+                    lineageMeta);
+        }
+    }
+
     @Override
     public Set<ConfigOption<?>> requiredOptions() {
         return Collections.emptySet();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
index d49afc681..027f21716 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
@@ -51,7 +51,10 @@ import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
 
@@ -69,14 +72,17 @@ public abstract class CatalogITCaseBase extends 
AbstractTestBase {
         path = getTempDirPath();
         String inferScan =
                 !inferScanParallelism() ? 
",\n'table-default.scan.infer-parallelism'='false'" : "";
+
+        Map<String, String> options = new HashMap<>(catalogOptions());
+        options.put("type", "paimon");
+        options.put("warehouse", toWarehouse(path));
         tEnv.executeSql(
                 String.format(
-                        "CREATE CATALOG %s WITH ("
-                                + "'type'='paimon', 'warehouse'='%s'"
-                                + inferScan
-                                + ")",
+                        "CREATE CATALOG %s WITH (" + "%s" + inferScan + ")",
                         catalog,
-                        toWarehouse(path)));
+                        options.entrySet().stream()
+                                .map(e -> String.format("'%s'='%s'", 
e.getKey(), e.getValue()))
+                                .collect(Collectors.joining(","))));
         tEnv.useCatalog(catalog);
 
         sEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
@@ -88,6 +94,10 @@ public abstract class CatalogITCaseBase extends 
AbstractTestBase {
         prepareEnv();
     }
 
+    protected Map<String, String> catalogOptions() {
+        return Collections.emptyMap();
+    }
+
     protected boolean inferScanParallelism() {
         return false;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
new file mode 100644
index 000000000..e31a73e6c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.lineage.DataLineageEntity;
+import org.apache.paimon.lineage.LineageMeta;
+import org.apache.paimon.lineage.LineageMetaFactory;
+import org.apache.paimon.lineage.TableLineageEntity;
+import org.apache.paimon.predicate.Predicate;
+
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.table.api.ValidationException;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** ITCase for flink table and data lineage. */
+public class FlinkLineageITCase extends CatalogITCaseBase {
+    private static final String THROWING_META = "throwing-meta";
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT, 
b INT, c INT)");
+    }
+
+    @Override
+    protected Map<String, String> catalogOptions() {
+        return Collections.singletonMap(LINEAGE_META.key(), THROWING_META);
+    }
+
+    @Test
+    public void testTableLineage() {
+        // Validate for source and sink lineage when pipeline name is null
+        assertThatThrownBy(
+                        () -> tEnv.executeSql("INSERT INTO T VALUES (1, 2, 
3),(4, 5, 6);").await())
+                .hasCauseExactlyInstanceOf(ValidationException.class)
+                .hasRootCauseMessage("Cannot get pipeline name for lineage 
meta.");
+        assertThatThrownBy(() -> tEnv.executeSql("SELECT * FROM 
T").collect().close())
+                .hasCauseExactlyInstanceOf(ValidationException.class)
+                .hasRootCauseMessage("Cannot get pipeline name for lineage 
meta.");
+
+        // Call storeSinkTableLineage and storeSourceTableLineage methods
+        tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME, 
"insert_t_job");
+        assertThatThrownBy(
+                        () -> tEnv.executeSql("INSERT INTO T VALUES (1, 2, 
3),(4, 5, 6);").await())
+                .hasCauseExactlyInstanceOf(UnsupportedOperationException.class)
+                .hasRootCauseMessage("Method saveSinkTableLineage is not 
supported");
+
+        tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME, 
"select_t_job");
+        assertThatThrownBy(() -> tEnv.executeSql("SELECT * FROM 
T").collect().close())
+                .hasCauseExactlyInstanceOf(UnsupportedOperationException.class)
+                .hasRootCauseMessage("Method saveSourceTableLineage is not 
supported");
+    }
+
+    /** Factory to create throwing lineage meta. */
+    public static class ThrowingLineageMetaFactory implements 
LineageMetaFactory {
+        @Override
+        public String identifier() {
+            return THROWING_META;
+        }
+
+        @Override
+        public LineageMeta create(LineageMetaContext context) {
+            return new ThrowingLineageMeta();
+        }
+    }
+
+    /** Throwing specific exception in each method. */
+    private static class ThrowingLineageMeta implements LineageMeta {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public void saveSourceTableLineage(TableLineageEntity entity) {
+            throw new UnsupportedOperationException(
+                    "Method saveSourceTableLineage is not supported");
+        }
+
+        @Override
+        public void deleteSourceTableLineage(String job) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Iterator<TableLineageEntity> sourceTableLineages(@Nullable 
Predicate predicate) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void saveSinkTableLineage(TableLineageEntity entity) {
+            assertEquals("insert_t_job", entity.getJob());
+            assertEquals("T", entity.getTable());
+            assertEquals("default", entity.getDatabase());
+            throw new UnsupportedOperationException("Method 
saveSinkTableLineage is not supported");
+        }
+
+        @Override
+        public Iterator<TableLineageEntity> sinkTableLineages(@Nullable 
Predicate predicate) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void deleteSinkTableLineage(String job) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void saveSourceDataLineage(DataLineageEntity entity) {
+            assertEquals("select_t_job", entity.getJob());
+            assertEquals("T", entity.getTable());
+            assertEquals("default", entity.getDatabase());
+            throw new UnsupportedOperationException("Method 
saveSinkTableLineage is not supported");
+        }
+
+        @Override
+        public Iterator<DataLineageEntity> sourceDataLineages(@Nullable 
Predicate predicate) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void saveSinkDataLineage(DataLineageEntity entity) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Iterator<DataLineageEntity> sinkDataLineages(@Nullable 
Predicate predicate) {
+            throw new UnsupportedOperationException();
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
index 014b45dc6..eb5278738 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
@@ -30,6 +30,7 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.types.DataType;
@@ -131,7 +132,6 @@ public class FlinkSinkTest {
                 tablePath,
                 tableSchema,
                 conf,
-                Lock.emptyFactory(),
-                null);
+                new CatalogEnvironment(Lock.emptyFactory(), null, null));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
index 6b51f027e..71135fa1e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -16,4 +16,7 @@
 org.apache.paimon.flink.action.cdc.mysql.TestCaseInsensitiveCatalogFactory
 org.apache.paimon.flink.action.cdc.mysql.TestAlterTableCatalogFactory
 
-org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory
\ No newline at end of file
+org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory
+
+# Lineage meta factory
+org.apache.paimon.flink.FlinkLineageITCase$ThrowingLineageMetaFactory
\ No newline at end of file

Reply via email to