This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c2d953556a [FLINK-36625] Add lineage helper class for connector 
integration
1c2d953556a is described below

commit 1c2d953556ad384f3fe4eb1e0faff595dd9d0b1d
Author: Peter Huang <[email protected]>
AuthorDate: Mon Nov 17 20:43:16 2025 -0800

    [FLINK-36625] Add lineage helper class for connector integration
---
 .../api/lineage/DefaultLineageVertex.java          |  4 ++
 .../api/lineage/DefaultSourceLineageVertex.java    |  7 ++-
 .../api/lineage/DefaultTypeDatasetFacet.java       | 62 +++++++++++++++++++++
 .../flink/streaming/api/lineage/LineageUtils.java  | 53 ++++++++++++++++++
 ...ultLineageVertex.java => TypeDatasetFacet.java} | 27 +++------
 ...geVertex.java => TypeDatasetFacetProvider.java} | 29 ++++------
 .../api/lineage/LineageGraphUtilsTest.java         |  9 +--
 .../streaming/api/lineage/LineageUtilsTest.java    | 65 ++++++++++++++++++++++
 .../execution/JobStatusChangedListenerITCase.java  |  6 +-
 9 files changed, 212 insertions(+), 50 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
index 289021e3d2d..2396599681e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
@@ -32,6 +32,10 @@ public class DefaultLineageVertex implements LineageVertex {
         this.lineageDatasets = new ArrayList<>();
     }
 
+    public DefaultLineageVertex(List<LineageDataset> lineageDatasets) {
+        this.lineageDatasets = lineageDatasets;
+    }
+
     public void addLineageDataset(LineageDataset lineageDataset) {
         this.lineageDatasets.add(lineageDataset);
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java
index fbc4ac4b2d4..58d2006cf7e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java
@@ -32,8 +32,13 @@ public class DefaultSourceLineageVertex implements 
SourceLineageVertex {
     private List<LineageDataset> lineageDatasets;
 
     public DefaultSourceLineageVertex(Boundedness boundedness) {
-        this.lineageDatasets = new ArrayList<>();
+        this(boundedness, new ArrayList<>());
+    }
+
+    public DefaultSourceLineageVertex(
+            Boundedness boundedness, List<LineageDataset> lineageDatasets) {
         this.boundedness = boundedness;
+        this.lineageDatasets = lineageDatasets;
     }
 
     public void addDataset(LineageDataset lineageDataset) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultTypeDatasetFacet.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultTypeDatasetFacet.java
new file mode 100644
index 00000000000..b3675f2ae92
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultTypeDatasetFacet.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Objects;
+
+/** Default implementation of {@link TypeDatasetFacet}. */
+@PublicEvolving
+public class DefaultTypeDatasetFacet implements TypeDatasetFacet {
+
+    public static final String TYPE_FACET_NAME = "type";
+
+    private final TypeInformation typeInformation;
+
+    public DefaultTypeDatasetFacet(TypeInformation typeInformation) {
+        this.typeInformation = typeInformation;
+    }
+
+    public TypeInformation getTypeInformation() {
+        return typeInformation;
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o;
+        return Objects.equals(typeInformation, that.typeInformation);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(typeInformation);
+    }
+
+    @Override
+    public String name() {
+        return TYPE_FACET_NAME;
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageUtils.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageUtils.java
new file mode 100644
index 00000000000..363def24a13
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.Boundedness;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Util class for creating default LineageDataset and LineageVertex. */
+@Internal
+public class LineageUtils {
+
+    public static LineageDataset datasetOf(
+            String name, String namespace, TypeDatasetFacet typeDatasetFacet) {
+        return datasetOf(name, namespace, 
Collections.singletonList(typeDatasetFacet));
+    }
+
+    public static LineageDataset datasetOf(
+            String name, String namespace, List<LineageDatasetFacet> facets) {
+        return new DefaultLineageDataset(
+                name,
+                namespace,
+                
facets.stream().collect(Collectors.toMap(LineageDatasetFacet::name, item -> 
item)));
+    }
+
+    public static SourceLineageVertex sourceLineageVertexOf(
+            Boundedness boundedness, LineageDataset dataset) {
+        return new DefaultSourceLineageVertex(boundedness, 
Collections.singletonList(dataset));
+    }
+
+    public static LineageVertex lineageVertexOf(LineageDataset dataset) {
+        return new DefaultLineageVertex(Collections.singletonList(dataset));
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacet.java
similarity index 59%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacet.java
index 289021e3d2d..54d383c7532 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacet.java
@@ -18,26 +18,15 @@
 
 package org.apache.flink.streaming.api.lineage;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 
-import java.util.ArrayList;
-import java.util.List;
+import javax.annotation.Nonnull;
 
-/** Default implementation for {@link LineageVertex}. */
-@Internal
-public class DefaultLineageVertex implements LineageVertex {
-    private List<LineageDataset> lineageDatasets;
+/** Facet definition to contain type information of source and sink. */
+@PublicEvolving
+public interface TypeDatasetFacet extends LineageDatasetFacet {
 
-    public DefaultLineageVertex() {
-        this.lineageDatasets = new ArrayList<>();
-    }
-
-    public void addLineageDataset(LineageDataset lineageDataset) {
-        this.lineageDatasets.add(lineageDataset);
-    }
-
-    @Override
-    public List<LineageDataset> datasets() {
-        return lineageDatasets;
-    }
+    @Nonnull
+    TypeInformation getTypeInformation();
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacetProvider.java
similarity index 59%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacetProvider.java
index 289021e3d2d..2ff65599ea3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacetProvider.java
@@ -18,26 +18,17 @@
 
 package org.apache.flink.streaming.api.lineage;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Optional;
 
-/** Default implementation for {@link LineageVertex}. */
-@Internal
-public class DefaultLineageVertex implements LineageVertex {
-    private List<LineageDataset> lineageDatasets;
+/** Contains method to extract {@link TypeDatasetFacet}. */
+@PublicEvolving
+public interface TypeDatasetFacetProvider {
 
-    public DefaultLineageVertex() {
-        this.lineageDatasets = new ArrayList<>();
-    }
-
-    public void addLineageDataset(LineageDataset lineageDataset) {
-        this.lineageDatasets.add(lineageDataset);
-    }
-
-    @Override
-    public List<LineageDataset> datasets() {
-        return lineageDatasets;
-    }
+    /**
+     * Returns a type dataset facet or `Optional.empty` in case an 
implementing class is not able to
+     * resolve type.
+     */
+    Optional<TypeDatasetFacet> getTypeDatasetFacet();
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java
index f9fe10f9ae4..1226c8a0665 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java
@@ -194,9 +194,7 @@ class LineageGraphUtilsTest {
             LineageDataset lineageDataset =
                     new DefaultLineageDataset(
                             SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new 
HashMap<>());
-            DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
-            lineageVertex.addLineageDataset(lineageDataset);
-            return lineageVertex;
+            return LineageUtils.lineageVertexOf(lineageDataset);
         }
     }
 
@@ -212,10 +210,7 @@ class LineageGraphUtilsTest {
             LineageDataset lineageDataset =
                     new DefaultLineageDataset(
                             SOURCE_DATASET_NAME, SOURCE_DATASET_NAMESPACE, new 
HashMap<>());
-            DefaultSourceLineageVertex lineageVertex =
-                    new DefaultSourceLineageVertex(Boundedness.BOUNDED);
-            lineageVertex.addDataset(lineageDataset);
-            return lineageVertex;
+            return LineageUtils.sourceLineageVertexOf(Boundedness.BOUNDED, 
lineageDataset);
         }
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageUtilsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageUtilsTest.java
new file mode 100644
index 00000000000..0a2e9a2afb6
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageUtilsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.Boundedness;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Testing for lineage util. */
+public class LineageUtilsTest {
+    private static final String TEST_NAME = "testName";
+    private static final String TEST_NAMESPACE = "testNameSpace";
+
+    @Test
+    public void testDataSetOf() {
+        DefaultTypeDatasetFacet typeDatasetFacet = new 
DefaultTypeDatasetFacet(Types.BIG_INT);
+        LineageDataset dataset =
+                LineageUtils.datasetOf(TEST_NAME, TEST_NAMESPACE, 
typeDatasetFacet);
+
+        assertThat(dataset.name()).isEqualTo(TEST_NAME);
+        assertThat(dataset.namespace()).isEqualTo(TEST_NAMESPACE);
+        assertThat(dataset.facets()).size().isEqualTo(1);
+        
assertThat(dataset.facets().get(typeDatasetFacet.name())).isEqualTo(typeDatasetFacet);
+    }
+
+    @Test
+    public void testSourceLineageVertexOf() {
+        LineageDataset dataset =
+                LineageUtils.datasetOf(
+                        TEST_NAME, TEST_NAMESPACE, new 
DefaultTypeDatasetFacet(Types.BIG_INT));
+        SourceLineageVertex sourceLineageVertex =
+                
LineageUtils.sourceLineageVertexOf(Boundedness.CONTINUOUS_UNBOUNDED, dataset);
+
+        
assertThat(sourceLineageVertex.boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
+        assertThat(sourceLineageVertex.datasets()).containsExactly(dataset);
+    }
+
+    @Test
+    public void testLineageVertexOf() {
+        LineageDataset dataset =
+                LineageUtils.datasetOf(
+                        TEST_NAME, TEST_NAMESPACE, new 
DefaultTypeDatasetFacet(Types.BIG_INT));
+        LineageVertex lineageVertex = LineageUtils.lineageVertexOf(dataset);
+        assertThat(lineageVertex.datasets()).containsExactly(dataset);
+    }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
index 0478b6d409f..b9cc3c1313a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
@@ -37,10 +37,10 @@ import 
org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.lineage.DefaultLineageDataset;
-import org.apache.flink.streaming.api.lineage.DefaultLineageVertex;
 import org.apache.flink.streaming.api.lineage.DefaultSourceLineageVertex;
 import org.apache.flink.streaming.api.lineage.LineageDataset;
 import org.apache.flink.streaming.api.lineage.LineageGraph;
+import org.apache.flink.streaming.api.lineage.LineageUtils;
 import org.apache.flink.streaming.api.lineage.LineageVertex;
 import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
 import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent;
@@ -322,9 +322,7 @@ public class JobStatusChangedListenerITCase extends 
TestLogger {
             LineageDataset lineageDataset =
                     new DefaultLineageDataset(
                             SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new 
HashMap<>());
-            DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
-            lineageVertex.addLineageDataset(lineageDataset);
-            return lineageVertex;
+            return LineageUtils.lineageVertexOf(lineageDataset);
         }
     }
 }

Reply via email to