This is an automated email from the ASF dual-hosted git repository.

yunfengzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 063be6f9fd [flink] Implement FLIP-314 LineageVertexProvider for source 
and sink connectors (#7311)
063be6f9fd is described below

commit 063be6f9fd2eef75e155cc06089d9ca57ca873b9
Author: jsingh-yelp <[email protected]>
AuthorDate: Tue Mar 10 21:33:18 2026 -0400

    [flink] Implement FLIP-314 LineageVertexProvider for source and sink 
connectors (#7311)
---
 .../paimon/flink/PaimonDataStreamScanProvider.java |  26 ++-
 .../paimon/flink/PaimonDataStreamSinkProvider.java |  24 ++-
 .../apache/paimon/flink/lineage/LineageUtils.java  |  98 ++++++++++
 .../paimon/flink/lineage/PaimonLineageDataset.java |  72 +++++++
 .../flink/lineage/PaimonSinkLineageVertex.java     |  39 ++++
 .../flink/lineage/PaimonSourceLineageVertex.java   |  50 +++++
 .../paimon/flink/sink/FlinkFormatTableSink.java    |   4 +-
 .../paimon/flink/sink/FlinkTableSinkBase.java      |  10 +-
 .../paimon/flink/source/BaseDataTableSource.java   |   5 +-
 .../paimon/flink/source/SystemTableSource.java     |   4 +-
 .../paimon/flink/lineage/LineageUtilsTest.java     | 208 +++++++++++++++++++++
 .../streaming/api/lineage/DatasetConfigFacet.java  |  29 +++
 .../streaming/api/lineage/LineageDataset.java      |  33 ++++
 .../streaming/api/lineage/LineageDatasetFacet.java |  27 +++
 .../flink/streaming/api/lineage/LineageVertex.java |  29 +++
 .../api/lineage/LineageVertexProvider.java         |  32 ++++
 .../streaming/api/lineage/SourceLineageVertex.java |  28 +++
 17 files changed, 706 insertions(+), 12 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java
index af9df5b933..776575dc0a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java
@@ -18,24 +18,39 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.flink.lineage.LineageUtils;
+import org.apache.paimon.table.Table;
+
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
 import org.apache.flink.table.connector.ProviderContext;
 import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.data.RowData;
 
 import java.util.function.Function;
 
-/** Paimon {@link DataStreamScanProvider}. */
-public class PaimonDataStreamScanProvider implements DataStreamScanProvider {
+/**
+ * Paimon {@link DataStreamScanProvider} that also implements {@link 
LineageVertexProvider} so
+ * Flink's lineage graph discovers the Paimon source table.
+ */
+public class PaimonDataStreamScanProvider implements DataStreamScanProvider, 
LineageVertexProvider {
 
     private final boolean isBounded;
     private final Function<StreamExecutionEnvironment, DataStream<RowData>> 
producer;
+    private final String name;
+    private final Table table;
 
     public PaimonDataStreamScanProvider(
-            boolean isBounded, Function<StreamExecutionEnvironment, 
DataStream<RowData>> producer) {
+            boolean isBounded,
+            Function<StreamExecutionEnvironment, DataStream<RowData>> producer,
+            String name,
+            Table table) {
         this.isBounded = isBounded;
         this.producer = producer;
+        this.name = name;
+        this.table = table;
     }
 
     @Override
@@ -48,4 +63,9 @@ public class PaimonDataStreamScanProvider implements 
DataStreamScanProvider {
     public boolean isBounded() {
         return isBounded;
     }
+
+    @Override
+    public LineageVertex getLineageVertex() {
+        return LineageUtils.sourceLineageVertex(name, isBounded, table);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java
index 05eaacf5ab..f9087b49fc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java
@@ -18,21 +18,34 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.flink.lineage.LineageUtils;
+import org.apache.paimon.table.Table;
+
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
 import org.apache.flink.table.connector.ProviderContext;
 import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
 import org.apache.flink.table.data.RowData;
 
 import java.util.function.Function;
 
-/** Paimon {@link DataStreamSinkProvider}. */
-public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider {
+/**
+ * Paimon {@link DataStreamSinkProvider} that also implements {@link 
LineageVertexProvider} so
+ * Flink's lineage graph discovers the Paimon sink table.
+ */
+public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider, 
LineageVertexProvider {
 
     private final Function<DataStream<RowData>, DataStreamSink<?>> producer;
+    private final String name;
+    private final Table table;
 
-    public PaimonDataStreamSinkProvider(Function<DataStream<RowData>, 
DataStreamSink<?>> producer) {
+    public PaimonDataStreamSinkProvider(
+            Function<DataStream<RowData>, DataStreamSink<?>> producer, String 
name, Table table) {
         this.producer = producer;
+        this.name = name;
+        this.table = table;
     }
 
     @Override
@@ -40,4 +53,9 @@ public class PaimonDataStreamSinkProvider implements 
DataStreamSinkProvider {
             ProviderContext providerContext, DataStream<RowData> dataStream) {
         return producer.apply(dataStream);
     }
+
+    @Override
+    public LineageVertex getLineageVertex() {
+        return LineageUtils.sinkLineageVertex(name, table);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java
new file mode 100644
index 0000000000..110365c76e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lineage;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Lineage utilities for building {@link SourceLineageVertex} and {@link 
LineageVertex} from a
+ * Paimon table name and its physical warehouse path (namespace).
+ */
+public class LineageUtils {
+
+    private static final String PAIMON_DATASET_PREFIX = "paimon://";
+
+    private static final Set<String> PAIMON_OPTION_KEYS =
+            CoreOptions.getOptions().stream().map(opt -> 
opt.key()).collect(Collectors.toSet());
+
+    /**
+     * Builds the config map for a dataset facet from a {@link Table}. 
Includes filtered Paimon
+     * {@link CoreOptions}, partition keys, primary keys, and the table 
comment (if present).
+     */
+    private static Map<String, String> buildConfigMap(Table table) {
+        Map<String, String> config = new HashMap<>();
+        config.put("partition-keys", String.join(",", table.partitionKeys()));
+        config.put("primary-keys", String.join(",", table.primaryKeys()));
+
+        table.options().entrySet().stream()
+                .filter(e -> PAIMON_OPTION_KEYS.contains(e.getKey()))
+                .forEach(e -> config.put(e.getKey(), e.getValue()));
+
+        return config;
+    }
+
+    /**
+     * Returns the lineage namespace for a Paimon table. The namespace uses 
the {@code paimon://}
+     * scheme followed by the table's physical warehouse path, e.g. {@code
+     * "paimon://s3://my-bucket/warehouse/mydb.db/mytable"}.
+     */
+    public static String getNamespace(Table table) {
+        return PAIMON_DATASET_PREFIX + CoreOptions.path(table.options());
+    }
+
+    /**
+     * Creates a {@link SourceLineageVertex} for a Paimon source table.
+     *
+     * @param name fully qualified table name, e.g. {@code 
"paimon.mydb.mytable"}
+     * @param isBounded whether the source is bounded (batch) or unbounded 
(streaming)
+     * @param table the Paimon table (namespace is derived from its {@code 
path} option)
+     */
+    public static SourceLineageVertex sourceLineageVertex(
+            String name, boolean isBounded, Table table) {
+        LineageDataset dataset =
+                new PaimonLineageDataset(name, getNamespace(table), 
buildConfigMap(table));
+        Boundedness boundedness =
+                isBounded ? Boundedness.BOUNDED : 
Boundedness.CONTINUOUS_UNBOUNDED;
+        return new PaimonSourceLineageVertex(boundedness, 
Collections.singletonList(dataset));
+    }
+
+    /**
+     * Creates a {@link LineageVertex} for a Paimon sink table.
+     *
+     * @param name fully qualified table name, e.g. {@code 
"paimon.mydb.mytable"}
+     * @param table the Paimon table (namespace is derived from its {@code 
path} option)
+     */
+    public static LineageVertex sinkLineageVertex(String name, Table table) {
+        LineageDataset dataset =
+                new PaimonLineageDataset(name, getNamespace(table), 
buildConfigMap(table));
+        return new PaimonSinkLineageVertex(Collections.singletonList(dataset));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java
new file mode 100644
index 0000000000..5e99df0b2d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lineage;
+
+import org.apache.flink.streaming.api.lineage.DatasetConfigFacet;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link LineageDataset} representing a Paimon table, identified by its 
fully qualified name and
+ * physical warehouse path as the namespace.
+ */
+public class PaimonLineageDataset implements LineageDataset {
+
+    private final String name;
+    private final String namespace;
+    private final Map<String, String> tableOptions;
+
+    public PaimonLineageDataset(String name, String namespace, Map<String, 
String> tableOptions) {
+        this.name = name;
+        this.namespace = namespace;
+        this.tableOptions = tableOptions;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public String namespace() {
+        return namespace;
+    }
+
+    @Override
+    public Map<String, LineageDatasetFacet> facets() {
+        Map<String, LineageDatasetFacet> facets = new HashMap<>();
+        facets.put(
+                "config",
+                new DatasetConfigFacet() {
+                    @Override
+                    public String name() {
+                        return "config";
+                    }
+
+                    @Override
+                    public Map<String, String> config() {
+                        return tableOptions;
+                    }
+                });
+        return facets;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java
new file mode 100644
index 0000000000..40024da5e9
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lineage;
+
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+
+import java.util.List;
+
+/** A {@link LineageVertex} representing a Paimon sink table. */
+public class PaimonSinkLineageVertex implements LineageVertex {
+
+    private final List<LineageDataset> datasets;
+
+    public PaimonSinkLineageVertex(List<LineageDataset> datasets) {
+        this.datasets = datasets;
+    }
+
+    @Override
+    public List<LineageDataset> datasets() {
+        return datasets;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java
new file mode 100644
index 0000000000..cbacce2f8a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lineage;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+
+import java.util.List;
+
+/**
+ * A {@link SourceLineageVertex} representing a Paimon source table. Carries 
the {@link Boundedness}
+ * to indicate whether the source is bounded (batch) or unbounded (streaming).
+ */
+public class PaimonSourceLineageVertex implements SourceLineageVertex {
+
+    private final Boundedness boundedness;
+    private final List<LineageDataset> datasets;
+
+    public PaimonSourceLineageVertex(Boundedness boundedness, 
List<LineageDataset> datasets) {
+        this.boundedness = boundedness;
+        this.datasets = datasets;
+    }
+
+    @Override
+    public Boundedness boundedness() {
+        return boundedness;
+    }
+
+    @Override
+    public List<LineageDataset> datasets() {
+        return datasets;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java
index 361323f016..1c48602098 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java
@@ -60,7 +60,9 @@ public class FlinkFormatTableSink
         return new PaimonDataStreamSinkProvider(
                 (dataStream) ->
                         new FlinkFormatTableDataStreamSink(table, overwrite, 
staticPartitions)
-                                .sinkFrom(dataStream));
+                                .sinkFrom(dataStream),
+                tableIdentifier.asSummaryString(),
+                table);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index fb68793399..2790a8a926 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -105,13 +105,17 @@ public abstract class FlinkTableSinkBase
             throw new UnsupportedOperationException(
                     "Paimon doesn't support streaming INSERT OVERWRITE.");
         }
+        String name = tableIdentifier.asSummaryString();
+
         if (table instanceof FormatTable) {
             FormatTable formatTable = (FormatTable) table;
             return new PaimonDataStreamSinkProvider(
                     (dataStream) ->
                             new FlinkFormatTableDataStreamSink(
                                             formatTable, overwrite, 
staticPartitions)
-                                    .sinkFrom(dataStream));
+                                    .sinkFrom(dataStream),
+                    name,
+                    table);
         }
 
         Options conf = Options.fromMap(table.options());
@@ -136,7 +140,9 @@ public abstract class FlinkTableSinkBase
                     }
                     
conf.getOptional(SINK_PARALLELISM).ifPresent(builder::parallelism);
                     return builder.build();
-                });
+                },
+                name,
+                table);
     }
 
     protected FlinkSinkBuilder createSinkBuilder() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 8e1ecfc955..5227fa0d27 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -195,14 +195,15 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
                         .limit(limit)
                         .watermarkStrategy(watermarkStrategy)
                         
.dynamicPartitionFilteringFields(dynamicPartitionFilteringFields());
-
         return new PaimonDataStreamScanProvider(
                 !unbounded,
                 env ->
                         sourceBuilder
                                 .sourceParallelism(inferSourceParallelism(env))
                                 .env(env)
-                                .build());
+                                .build(),
+                tableIdentifier.asSummaryString(),
+                table);
     }
 
     private ScanRuntimeProvider createCountStarScan() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index e99f265d03..2bbd749f58 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -130,7 +130,9 @@ public class SystemTableSource extends FlinkTableSource {
                         dataStreamSource.setParallelism(parallelism);
                     }
                     return dataStreamSource;
-                });
+                },
+                tableIdentifier.asSummaryString(),
+                table);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java
new file mode 100644
index 0000000000..62d601ec1b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lineage;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.PaimonDataStreamScanProvider;
+import org.apache.paimon.flink.PaimonDataStreamSinkProvider;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.streaming.api.lineage.DatasetConfigFacet;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link LineageUtils}. */
+class LineageUtilsTest {
+
+    @TempDir java.nio.file.Path temp;
+
+    private Path tablePath;
+
+    @BeforeEach
+    void setUp() {
+        tablePath = new Path(temp.toUri().toString());
+    }
+
+    private FileStoreTable createTable(
+            Map<String, String> options,
+            java.util.List<String> partitionKeys,
+            java.util.List<String> primaryKeys)
+            throws Exception {
+        new SchemaManager(LocalFileIO.create(), tablePath)
+                .createTable(
+                        new Schema(
+                                RowType.of(new IntType(), new 
VarCharType(100), new IntType())
+                                        .getFields(),
+                                partitionKeys,
+                                primaryKeys,
+                                options,
+                                ""));
+        return FileStoreTableFactory.create(LocalFileIO.create(), tablePath);
+    }
+
+    @Test
+    void testGetNamespace() throws Exception {
+        FileStoreTable table =
+                createTable(new HashMap<>(), Collections.emptyList(), 
Arrays.asList("f0"));
+
+        String namespace = LineageUtils.getNamespace(table);
+
+        assertThat(namespace).startsWith("paimon://");
+        assertThat(namespace).contains(tablePath.toString());
+    }
+
+    @Test
+    void testSourceLineageVertexBounded() throws Exception {
+        FileStoreTable table =
+                createTable(new HashMap<>(), Collections.emptyList(), 
Arrays.asList("f0"));
+
+        SourceLineageVertex vertex = 
LineageUtils.sourceLineageVertex("paimon.db.src", true, table);
+
+        assertThat(vertex).isInstanceOf(PaimonSourceLineageVertex.class);
+        assertThat(vertex.boundedness()).isEqualTo(Boundedness.BOUNDED);
+        assertThat(vertex.datasets()).hasSize(1);
+
+        LineageDataset dataset = vertex.datasets().get(0);
+        assertThat(dataset.name()).isEqualTo("paimon.db.src");
+        assertThat(dataset.namespace()).startsWith("paimon://");
+    }
+
+    @Test
+    void testSourceLineageVertexUnbounded() throws Exception {
+        FileStoreTable table =
+                createTable(new HashMap<>(), Collections.emptyList(), 
Arrays.asList("f0"));
+
+        SourceLineageVertex vertex =
+                LineageUtils.sourceLineageVertex("paimon.db.src", false, 
table);
+
+        
assertThat(vertex.boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
+    }
+
+    @Test
+    void testSinkLineageVertex() throws Exception {
+        FileStoreTable table =
+                createTable(new HashMap<>(), Collections.emptyList(), 
Arrays.asList("f0"));
+
+        LineageVertex vertex = 
LineageUtils.sinkLineageVertex("paimon.db.sink", table);
+
+        assertThat(vertex).isInstanceOf(PaimonSinkLineageVertex.class);
+        assertThat(vertex.datasets()).hasSize(1);
+
+        LineageDataset dataset = vertex.datasets().get(0);
+        assertThat(dataset.name()).isEqualTo("paimon.db.sink");
+        assertThat(dataset.namespace()).startsWith("paimon://");
+    }
+
+    @Test
+    void testConfigFacetContainsPartitionAndPrimaryKeys() throws Exception {
+        FileStoreTable table =
+                createTable(new HashMap<>(), Arrays.asList("f2"), 
Arrays.asList("f0", "f2"));
+
+        LineageVertex vertex = LineageUtils.sinkLineageVertex("paimon.db.t", 
table);
+        LineageDataset dataset = vertex.datasets().get(0);
+
+        Map<String, LineageDatasetFacet> facets = dataset.facets();
+        assertThat(facets).containsKey("config");
+
+        DatasetConfigFacet configFacet = (DatasetConfigFacet) 
facets.get("config");
+        Map<String, String> config = configFacet.config();
+        assertThat(config).containsEntry("partition-keys", "f2");
+        assertThat(config).containsEntry("primary-keys", "f0,f2");
+    }
+
+    @Test
+    void testConfigFacetIncludesPaimonOptions() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup");
+
+        FileStoreTable table = createTable(options, Collections.emptyList(), 
Arrays.asList("f0"));
+
+        SourceLineageVertex vertex = 
LineageUtils.sourceLineageVertex("paimon.db.t", true, table);
+        LineageDataset dataset = vertex.datasets().get(0);
+
+        DatasetConfigFacet configFacet = (DatasetConfigFacet) 
dataset.facets().get("config");
+        Map<String, String> config = configFacet.config();
+        assertThat(config).containsEntry(CoreOptions.CHANGELOG_PRODUCER.key(), 
"lookup");
+    }
+
+    @Test
+    void testConfigFacetWithEmptyKeys() throws Exception {
+        FileStoreTable table =
+                createTable(new HashMap<>(), Collections.emptyList(), 
Collections.emptyList());
+
+        LineageVertex vertex = LineageUtils.sinkLineageVertex("paimon.db.t", 
table);
+        LineageDataset dataset = vertex.datasets().get(0);
+
+        DatasetConfigFacet configFacet = (DatasetConfigFacet) 
dataset.facets().get("config");
+        Map<String, String> config = configFacet.config();
+        assertThat(config).containsEntry("partition-keys", "");
+        assertThat(config).containsEntry("primary-keys", "");
+    }
+
+    @Test
+    void testScanProviderImplementsLineageVertexProvider() throws Exception {
+        FileStoreTable table =
+                createTable(new HashMap<>(), Collections.emptyList(), 
Arrays.asList("f0"));
+
+        PaimonDataStreamScanProvider provider =
+                new PaimonDataStreamScanProvider(true, env -> null, 
"paimon.db.src", table);
+
+        assertThat(provider).isInstanceOf(LineageVertexProvider.class);
+        LineageVertex vertex = provider.getLineageVertex();
+        assertThat(vertex).isInstanceOf(SourceLineageVertex.class);
+        assertThat(vertex.datasets()).hasSize(1);
+        assertThat(vertex.datasets().get(0).name()).isEqualTo("paimon.db.src");
+    }
+
+    @Test
+    void testSinkProviderImplementsLineageVertexProvider() throws Exception {
+        FileStoreTable table =
+                createTable(new HashMap<>(), Collections.emptyList(), 
Arrays.asList("f0"));
+
+        PaimonDataStreamSinkProvider provider =
+                new PaimonDataStreamSinkProvider(dataStream -> null, 
"paimon.db.sink", table);
+
+        assertThat(provider).isInstanceOf(LineageVertexProvider.class);
+        LineageVertex vertex = provider.getLineageVertex();
+        assertThat(vertex.datasets()).hasSize(1);
+        
assertThat(vertex.datasets().get(0).name()).isEqualTo("paimon.db.sink");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/DatasetConfigFacet.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/DatasetConfigFacet.java
new file mode 100644
index 0000000000..cbd25f6c59
--- /dev/null
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/DatasetConfigFacet.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+/** Builtin config facet for dataset. Stub for Flink 1.x compatibility. */
+@PublicEvolving
+public interface DatasetConfigFacet extends LineageDatasetFacet {
+    Map<String, String> config();
+}
diff --git 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java
new file mode 100644
index 0000000000..4d49b5ca77
--- /dev/null
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+/** Lineage dataset represents the source or sink in the job. Stub for Flink 
1.x compatibility. */
+@PublicEvolving
+public interface LineageDataset {
+    String name();
+
+    String namespace();
+
+    Map<String, LineageDatasetFacet> facets();
+}
diff --git 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDatasetFacet.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDatasetFacet.java
new file mode 100644
index 0000000000..55bf57887a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDatasetFacet.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Facet interface for dataset. Stub for Flink 1.x compatibility. */
+@PublicEvolving
+public interface LineageDatasetFacet {
+    String name();
+}
diff --git 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertex.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertex.java
new file mode 100644
index 0000000000..43c4f991e1
--- /dev/null
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertex.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.List;
+
+/** Lineage vertex represents the connectors in lineage graph. Stub for Flink 
1.x compatibility. */
+@PublicEvolving
+public interface LineageVertex {
+    List<LineageDataset> datasets();
+}
diff --git 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertexProvider.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertexProvider.java
new file mode 100644
index 0000000000..959e2e9625
--- /dev/null
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertexProvider.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Create lineage vertex for source and sink in DataStream. Stub for Flink 1.x 
compatibility.
+ *
+ * <p>On Flink 1.x the runtime never calls {@link #getLineageVertex()}, so the 
implementation is
+ * effectively a no-op.
+ */
+@PublicEvolving
+public interface LineageVertexProvider {
+    LineageVertex getLineageVertex();
+}
diff --git 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/SourceLineageVertex.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/SourceLineageVertex.java
new file mode 100644
index 0000000000..ab36aaee49
--- /dev/null
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/SourceLineageVertex.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+
+/** Lineage vertex for source which has {@link Boundedness}. Stub for Flink 
1.x compatibility. */
+@PublicEvolving
+public interface SourceLineageVertex extends LineageVertex {
+    Boundedness boundedness();
+}


Reply via email to