This is an automated email from the ASF dual-hosted git repository.
yunfengzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 063be6f9fd [flink] Implement FLIP-314 LineageVertexProvider for source
and sink connectors (#7311)
063be6f9fd is described below
commit 063be6f9fd2eef75e155cc06089d9ca57ca873b9
Author: jsingh-yelp <[email protected]>
AuthorDate: Tue Mar 10 21:33:18 2026 -0400
[flink] Implement FLIP-314 LineageVertexProvider for source and sink
connectors (#7311)
---
.../paimon/flink/PaimonDataStreamScanProvider.java | 26 ++-
.../paimon/flink/PaimonDataStreamSinkProvider.java | 24 ++-
.../apache/paimon/flink/lineage/LineageUtils.java | 98 ++++++++++
.../paimon/flink/lineage/PaimonLineageDataset.java | 72 +++++++
.../flink/lineage/PaimonSinkLineageVertex.java | 39 ++++
.../flink/lineage/PaimonSourceLineageVertex.java | 50 +++++
.../paimon/flink/sink/FlinkFormatTableSink.java | 4 +-
.../paimon/flink/sink/FlinkTableSinkBase.java | 10 +-
.../paimon/flink/source/BaseDataTableSource.java | 5 +-
.../paimon/flink/source/SystemTableSource.java | 4 +-
.../paimon/flink/lineage/LineageUtilsTest.java | 208 +++++++++++++++++++++
.../streaming/api/lineage/DatasetConfigFacet.java | 29 +++
.../streaming/api/lineage/LineageDataset.java | 33 ++++
.../streaming/api/lineage/LineageDatasetFacet.java | 27 +++
.../flink/streaming/api/lineage/LineageVertex.java | 29 +++
.../api/lineage/LineageVertexProvider.java | 32 ++++
.../streaming/api/lineage/SourceLineageVertex.java | 28 +++
17 files changed, 706 insertions(+), 12 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java
index af9df5b933..776575dc0a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java
@@ -18,24 +18,39 @@
package org.apache.paimon.flink;
+import org.apache.paimon.flink.lineage.LineageUtils;
+import org.apache.paimon.table.Table;
+
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.data.RowData;
import java.util.function.Function;
-/** Paimon {@link DataStreamScanProvider}. */
-public class PaimonDataStreamScanProvider implements DataStreamScanProvider {
+/**
+ * Paimon {@link DataStreamScanProvider} that also implements {@link
LineageVertexProvider} so
+ * Flink's lineage graph discovers the Paimon source table.
+ */
+public class PaimonDataStreamScanProvider implements DataStreamScanProvider,
LineageVertexProvider {
private final boolean isBounded;
private final Function<StreamExecutionEnvironment, DataStream<RowData>>
producer;
+ private final String name;
+ private final Table table;
public PaimonDataStreamScanProvider(
- boolean isBounded, Function<StreamExecutionEnvironment,
DataStream<RowData>> producer) {
+ boolean isBounded,
+ Function<StreamExecutionEnvironment, DataStream<RowData>> producer,
+ String name,
+ Table table) {
this.isBounded = isBounded;
this.producer = producer;
+ this.name = name;
+ this.table = table;
}
@Override
@@ -48,4 +63,9 @@ public class PaimonDataStreamScanProvider implements
DataStreamScanProvider {
public boolean isBounded() {
return isBounded;
}
+
+ @Override
+ public LineageVertex getLineageVertex() {
+ return LineageUtils.sourceLineageVertex(name, isBounded, table);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java
index 05eaacf5ab..f9087b49fc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java
@@ -18,21 +18,34 @@
package org.apache.paimon.flink;
+import org.apache.paimon.flink.lineage.LineageUtils;
+import org.apache.paimon.table.Table;
+
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.data.RowData;
import java.util.function.Function;
-/** Paimon {@link DataStreamSinkProvider}. */
-public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider {
+/**
+ * Paimon {@link DataStreamSinkProvider} that also implements {@link
LineageVertexProvider} so
+ * Flink's lineage graph discovers the Paimon sink table.
+ */
+public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider,
LineageVertexProvider {
private final Function<DataStream<RowData>, DataStreamSink<?>> producer;
+ private final String name;
+ private final Table table;
- public PaimonDataStreamSinkProvider(Function<DataStream<RowData>,
DataStreamSink<?>> producer) {
+ public PaimonDataStreamSinkProvider(
+ Function<DataStream<RowData>, DataStreamSink<?>> producer, String
name, Table table) {
this.producer = producer;
+ this.name = name;
+ this.table = table;
}
@Override
@@ -40,4 +53,9 @@ public class PaimonDataStreamSinkProvider implements
DataStreamSinkProvider {
ProviderContext providerContext, DataStream<RowData> dataStream) {
return producer.apply(dataStream);
}
+
+ @Override
+ public LineageVertex getLineageVertex() {
+ return LineageUtils.sinkLineageVertex(name, table);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java
new file mode 100644
index 0000000000..110365c76e
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lineage;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Lineage utilities for building {@link SourceLineageVertex} and {@link
LineageVertex} from a
+ * Paimon table name and its physical warehouse path (namespace).
+ */
+public class LineageUtils {
+
+ private static final String PAIMON_DATASET_PREFIX = "paimon://";
+
+ private static final Set<String> PAIMON_OPTION_KEYS =
+ CoreOptions.getOptions().stream().map(opt ->
opt.key()).collect(Collectors.toSet());
+
+ /**
+ * Builds the config map for a dataset facet from a {@link Table}.
Includes filtered Paimon
+ * {@link CoreOptions}, partition keys, primary keys, and the table
comment (if present).
+ */
+ private static Map<String, String> buildConfigMap(Table table) {
+ Map<String, String> config = new HashMap<>();
+ config.put("partition-keys", String.join(",", table.partitionKeys()));
+ config.put("primary-keys", String.join(",", table.primaryKeys()));
+
+ table.options().entrySet().stream()
+ .filter(e -> PAIMON_OPTION_KEYS.contains(e.getKey()))
+ .forEach(e -> config.put(e.getKey(), e.getValue()));
+
+ return config;
+ }
+
+ /**
+ * Returns the lineage namespace for a Paimon table. The namespace uses
the {@code paimon://}
+ * scheme followed by the table's physical warehouse path, e.g. {@code
+ * "paimon://s3://my-bucket/warehouse/mydb.db/mytable"}.
+ */
+ public static String getNamespace(Table table) {
+ return PAIMON_DATASET_PREFIX + CoreOptions.path(table.options());
+ }
+
+ /**
+ * Creates a {@link SourceLineageVertex} for a Paimon source table.
+ *
+ * @param name fully qualified table name, e.g. {@code
"paimon.mydb.mytable"}
+ * @param isBounded whether the source is bounded (batch) or unbounded
(streaming)
+ * @param table the Paimon table (namespace is derived from its {@code
path} option)
+ */
+ public static SourceLineageVertex sourceLineageVertex(
+ String name, boolean isBounded, Table table) {
+ LineageDataset dataset =
+ new PaimonLineageDataset(name, getNamespace(table),
buildConfigMap(table));
+ Boundedness boundedness =
+ isBounded ? Boundedness.BOUNDED :
Boundedness.CONTINUOUS_UNBOUNDED;
+ return new PaimonSourceLineageVertex(boundedness,
Collections.singletonList(dataset));
+ }
+
+ /**
+ * Creates a {@link LineageVertex} for a Paimon sink table.
+ *
+ * @param name fully qualified table name, e.g. {@code
"paimon.mydb.mytable"}
+ * @param table the Paimon table (namespace is derived from its {@code
path} option)
+ */
+ public static LineageVertex sinkLineageVertex(String name, Table table) {
+ LineageDataset dataset =
+ new PaimonLineageDataset(name, getNamespace(table),
buildConfigMap(table));
+ return new PaimonSinkLineageVertex(Collections.singletonList(dataset));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java
new file mode 100644
index 0000000000..5e99df0b2d
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lineage;
+
+import org.apache.flink.streaming.api.lineage.DatasetConfigFacet;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link LineageDataset} representing a Paimon table, identified by its
fully qualified name and
+ * physical warehouse path as the namespace.
+ */
+public class PaimonLineageDataset implements LineageDataset {
+
+ private final String name;
+ private final String namespace;
+ private final Map<String, String> tableOptions;
+
+ public PaimonLineageDataset(String name, String namespace, Map<String,
String> tableOptions) {
+ this.name = name;
+ this.namespace = namespace;
+ this.tableOptions = tableOptions;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public String namespace() {
+ return namespace;
+ }
+
+ @Override
+ public Map<String, LineageDatasetFacet> facets() {
+ Map<String, LineageDatasetFacet> facets = new HashMap<>();
+ facets.put(
+ "config",
+ new DatasetConfigFacet() {
+ @Override
+ public String name() {
+ return "config";
+ }
+
+ @Override
+ public Map<String, String> config() {
+ return tableOptions;
+ }
+ });
+ return facets;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java
new file mode 100644
index 0000000000..40024da5e9
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSinkLineageVertex.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lineage;
+
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+
+import java.util.List;
+
+/** A {@link LineageVertex} representing a Paimon sink table. */
+public class PaimonSinkLineageVertex implements LineageVertex {
+
+ private final List<LineageDataset> datasets;
+
+ public PaimonSinkLineageVertex(List<LineageDataset> datasets) {
+ this.datasets = datasets;
+ }
+
+ @Override
+ public List<LineageDataset> datasets() {
+ return datasets;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java
new file mode 100644
index 0000000000..cbacce2f8a
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonSourceLineageVertex.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lineage;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+
+import java.util.List;
+
+/**
+ * A {@link SourceLineageVertex} representing a Paimon source table. Carries
the {@link Boundedness}
+ * to indicate whether the source is bounded (batch) or unbounded (streaming).
+ */
+public class PaimonSourceLineageVertex implements SourceLineageVertex {
+
+ private final Boundedness boundedness;
+ private final List<LineageDataset> datasets;
+
+ public PaimonSourceLineageVertex(Boundedness boundedness,
List<LineageDataset> datasets) {
+ this.boundedness = boundedness;
+ this.datasets = datasets;
+ }
+
+ @Override
+ public Boundedness boundedness() {
+ return boundedness;
+ }
+
+ @Override
+ public List<LineageDataset> datasets() {
+ return datasets;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java
index 361323f016..1c48602098 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableSink.java
@@ -60,7 +60,9 @@ public class FlinkFormatTableSink
return new PaimonDataStreamSinkProvider(
(dataStream) ->
new FlinkFormatTableDataStreamSink(table, overwrite,
staticPartitions)
- .sinkFrom(dataStream));
+ .sinkFrom(dataStream),
+ tableIdentifier.asSummaryString(),
+ table);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index fb68793399..2790a8a926 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -105,13 +105,17 @@ public abstract class FlinkTableSinkBase
throw new UnsupportedOperationException(
"Paimon doesn't support streaming INSERT OVERWRITE.");
}
+ String name = tableIdentifier.asSummaryString();
+
if (table instanceof FormatTable) {
FormatTable formatTable = (FormatTable) table;
return new PaimonDataStreamSinkProvider(
(dataStream) ->
new FlinkFormatTableDataStreamSink(
formatTable, overwrite,
staticPartitions)
- .sinkFrom(dataStream));
+ .sinkFrom(dataStream),
+ name,
+ table);
}
Options conf = Options.fromMap(table.options());
@@ -136,7 +140,9 @@ public abstract class FlinkTableSinkBase
}
conf.getOptional(SINK_PARALLELISM).ifPresent(builder::parallelism);
return builder.build();
- });
+ },
+ name,
+ table);
}
protected FlinkSinkBuilder createSinkBuilder() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 8e1ecfc955..5227fa0d27 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -195,14 +195,15 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
.limit(limit)
.watermarkStrategy(watermarkStrategy)
.dynamicPartitionFilteringFields(dynamicPartitionFilteringFields());
-
return new PaimonDataStreamScanProvider(
!unbounded,
env ->
sourceBuilder
.sourceParallelism(inferSourceParallelism(env))
.env(env)
- .build());
+ .build(),
+ tableIdentifier.asSummaryString(),
+ table);
}
private ScanRuntimeProvider createCountStarScan() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index e99f265d03..2bbd749f58 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -130,7 +130,9 @@ public class SystemTableSource extends FlinkTableSource {
dataStreamSource.setParallelism(parallelism);
}
return dataStreamSource;
- });
+ },
+ tableIdentifier.asSummaryString(),
+ table);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java
new file mode 100644
index 0000000000..62d601ec1b
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lineage;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.PaimonDataStreamScanProvider;
+import org.apache.paimon.flink.PaimonDataStreamSinkProvider;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.streaming.api.lineage.DatasetConfigFacet;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
+import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link LineageUtils}. */
+class LineageUtilsTest {
+
+ @TempDir java.nio.file.Path temp;
+
+ private Path tablePath;
+
+ @BeforeEach
+ void setUp() {
+ tablePath = new Path(temp.toUri().toString());
+ }
+
+ private FileStoreTable createTable(
+ Map<String, String> options,
+ java.util.List<String> partitionKeys,
+ java.util.List<String> primaryKeys)
+ throws Exception {
+ new SchemaManager(LocalFileIO.create(), tablePath)
+ .createTable(
+ new Schema(
+ RowType.of(new IntType(), new
VarCharType(100), new IntType())
+ .getFields(),
+ partitionKeys,
+ primaryKeys,
+ options,
+ ""));
+ return FileStoreTableFactory.create(LocalFileIO.create(), tablePath);
+ }
+
+ @Test
+ void testGetNamespace() throws Exception {
+ FileStoreTable table =
+ createTable(new HashMap<>(), Collections.emptyList(),
Arrays.asList("f0"));
+
+ String namespace = LineageUtils.getNamespace(table);
+
+ assertThat(namespace).startsWith("paimon://");
+ assertThat(namespace).contains(tablePath.toString());
+ }
+
+ @Test
+ void testSourceLineageVertexBounded() throws Exception {
+ FileStoreTable table =
+ createTable(new HashMap<>(), Collections.emptyList(),
Arrays.asList("f0"));
+
+ SourceLineageVertex vertex =
LineageUtils.sourceLineageVertex("paimon.db.src", true, table);
+
+ assertThat(vertex).isInstanceOf(PaimonSourceLineageVertex.class);
+ assertThat(vertex.boundedness()).isEqualTo(Boundedness.BOUNDED);
+ assertThat(vertex.datasets()).hasSize(1);
+
+ LineageDataset dataset = vertex.datasets().get(0);
+ assertThat(dataset.name()).isEqualTo("paimon.db.src");
+ assertThat(dataset.namespace()).startsWith("paimon://");
+ }
+
+ @Test
+ void testSourceLineageVertexUnbounded() throws Exception {
+ FileStoreTable table =
+ createTable(new HashMap<>(), Collections.emptyList(),
Arrays.asList("f0"));
+
+ SourceLineageVertex vertex =
+ LineageUtils.sourceLineageVertex("paimon.db.src", false,
table);
+
+
assertThat(vertex.boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
+ }
+
+ @Test
+ void testSinkLineageVertex() throws Exception {
+ FileStoreTable table =
+ createTable(new HashMap<>(), Collections.emptyList(),
Arrays.asList("f0"));
+
+ LineageVertex vertex =
LineageUtils.sinkLineageVertex("paimon.db.sink", table);
+
+ assertThat(vertex).isInstanceOf(PaimonSinkLineageVertex.class);
+ assertThat(vertex.datasets()).hasSize(1);
+
+ LineageDataset dataset = vertex.datasets().get(0);
+ assertThat(dataset.name()).isEqualTo("paimon.db.sink");
+ assertThat(dataset.namespace()).startsWith("paimon://");
+ }
+
+ @Test
+ void testConfigFacetContainsPartitionAndPrimaryKeys() throws Exception {
+ FileStoreTable table =
+ createTable(new HashMap<>(), Arrays.asList("f2"),
Arrays.asList("f0", "f2"));
+
+ LineageVertex vertex = LineageUtils.sinkLineageVertex("paimon.db.t",
table);
+ LineageDataset dataset = vertex.datasets().get(0);
+
+ Map<String, LineageDatasetFacet> facets = dataset.facets();
+ assertThat(facets).containsKey("config");
+
+ DatasetConfigFacet configFacet = (DatasetConfigFacet)
facets.get("config");
+ Map<String, String> config = configFacet.config();
+ assertThat(config).containsEntry("partition-keys", "f2");
+ assertThat(config).containsEntry("primary-keys", "f0,f2");
+ }
+
+ @Test
+ void testConfigFacetIncludesPaimonOptions() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup");
+
+ FileStoreTable table = createTable(options, Collections.emptyList(),
Arrays.asList("f0"));
+
+ SourceLineageVertex vertex =
LineageUtils.sourceLineageVertex("paimon.db.t", true, table);
+ LineageDataset dataset = vertex.datasets().get(0);
+
+ DatasetConfigFacet configFacet = (DatasetConfigFacet)
dataset.facets().get("config");
+ Map<String, String> config = configFacet.config();
+ assertThat(config).containsEntry(CoreOptions.CHANGELOG_PRODUCER.key(),
"lookup");
+ }
+
+ @Test
+ void testConfigFacetWithEmptyKeys() throws Exception {
+ FileStoreTable table =
+ createTable(new HashMap<>(), Collections.emptyList(),
Collections.emptyList());
+
+ LineageVertex vertex = LineageUtils.sinkLineageVertex("paimon.db.t",
table);
+ LineageDataset dataset = vertex.datasets().get(0);
+
+ DatasetConfigFacet configFacet = (DatasetConfigFacet)
dataset.facets().get("config");
+ Map<String, String> config = configFacet.config();
+ assertThat(config).containsEntry("partition-keys", "");
+ assertThat(config).containsEntry("primary-keys", "");
+ }
+
+ @Test
+ void testScanProviderImplementsLineageVertexProvider() throws Exception {
+ FileStoreTable table =
+ createTable(new HashMap<>(), Collections.emptyList(),
Arrays.asList("f0"));
+
+ PaimonDataStreamScanProvider provider =
+ new PaimonDataStreamScanProvider(true, env -> null,
"paimon.db.src", table);
+
+ assertThat(provider).isInstanceOf(LineageVertexProvider.class);
+ LineageVertex vertex = provider.getLineageVertex();
+ assertThat(vertex).isInstanceOf(SourceLineageVertex.class);
+ assertThat(vertex.datasets()).hasSize(1);
+ assertThat(vertex.datasets().get(0).name()).isEqualTo("paimon.db.src");
+ }
+
+ @Test
+ void testSinkProviderImplementsLineageVertexProvider() throws Exception {
+ FileStoreTable table =
+ createTable(new HashMap<>(), Collections.emptyList(),
Arrays.asList("f0"));
+
+ PaimonDataStreamSinkProvider provider =
+ new PaimonDataStreamSinkProvider(dataStream -> null,
"paimon.db.sink", table);
+
+ assertThat(provider).isInstanceOf(LineageVertexProvider.class);
+ LineageVertex vertex = provider.getLineageVertex();
+ assertThat(vertex.datasets()).hasSize(1);
+
assertThat(vertex.datasets().get(0).name()).isEqualTo("paimon.db.sink");
+ }
+}
diff --git
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/DatasetConfigFacet.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/DatasetConfigFacet.java
new file mode 100644
index 0000000000..cbd25f6c59
--- /dev/null
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/DatasetConfigFacet.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+/** Builtin config facet for dataset. Stub for Flink 1.x compatibility. */
+@PublicEvolving
+public interface DatasetConfigFacet extends LineageDatasetFacet {
+ Map<String, String> config();
+}
diff --git
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java
new file mode 100644
index 0000000000..4d49b5ca77
--- /dev/null
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+/** Lineage dataset represents the source or sink in the job. Stub for Flink
1.x compatibility. */
+@PublicEvolving
+public interface LineageDataset {
+ String name();
+
+ String namespace();
+
+ Map<String, LineageDatasetFacet> facets();
+}
diff --git
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDatasetFacet.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDatasetFacet.java
new file mode 100644
index 0000000000..55bf57887a
--- /dev/null
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageDatasetFacet.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Facet interface for dataset. Stub for Flink 1.x compatibility. */
+@PublicEvolving
+public interface LineageDatasetFacet {
+ String name();
+}
diff --git
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertex.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertex.java
new file mode 100644
index 0000000000..43c4f991e1
--- /dev/null
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertex.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.List;
+
+/** Lineage vertex represents the connectors in lineage graph. Stub for Flink
1.x compatibility. */
+@PublicEvolving
+public interface LineageVertex {
+ List<LineageDataset> datasets();
+}
diff --git
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertexProvider.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertexProvider.java
new file mode 100644
index 0000000000..959e2e9625
--- /dev/null
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/LineageVertexProvider.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Create lineage vertex for source and sink in DataStream. Stub for Flink 1.x
compatibility.
+ *
+ * <p>On Flink 1.x the runtime never calls {@link #getLineageVertex()}, so the
implementation is
+ * effectively a no-op.
+ */
+@PublicEvolving
+public interface LineageVertexProvider {
+ LineageVertex getLineageVertex();
+}
diff --git
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/SourceLineageVertex.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/SourceLineageVertex.java
new file mode 100644
index 0000000000..ab36aaee49
--- /dev/null
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/lineage/SourceLineageVertex.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+
+/** Lineage vertex for source which has {@link Boundedness}. Stub for Flink
1.x compatibility. */
+@PublicEvolving
+public interface SourceLineageVertex extends LineageVertex {
+ Boundedness boundedness();
+}