This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1c2d953556a [FLINK-36625] Add lineage helper class for connector
integration
1c2d953556a is described below
commit 1c2d953556ad384f3fe4eb1e0faff595dd9d0b1d
Author: Peter Huang <[email protected]>
AuthorDate: Mon Nov 17 20:43:16 2025 -0800
[FLINK-36625] Add lineage helper class for connector integration
---
.../api/lineage/DefaultLineageVertex.java | 4 ++
.../api/lineage/DefaultSourceLineageVertex.java | 7 ++-
.../api/lineage/DefaultTypeDatasetFacet.java | 62 +++++++++++++++++++++
.../flink/streaming/api/lineage/LineageUtils.java | 53 ++++++++++++++++++
...ultLineageVertex.java => TypeDatasetFacet.java} | 27 +++------
...geVertex.java => TypeDatasetFacetProvider.java} | 29 ++++------
.../api/lineage/LineageGraphUtilsTest.java | 9 +--
.../streaming/api/lineage/LineageUtilsTest.java | 65 ++++++++++++++++++++++
.../execution/JobStatusChangedListenerITCase.java | 6 +-
9 files changed, 212 insertions(+), 50 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
index 289021e3d2d..2396599681e 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
@@ -32,6 +32,10 @@ public class DefaultLineageVertex implements LineageVertex {
this.lineageDatasets = new ArrayList<>();
}
+ public DefaultLineageVertex(List<LineageDataset> lineageDatasets) {
+ this.lineageDatasets = lineageDatasets;
+ }
+
public void addLineageDataset(LineageDataset lineageDataset) {
this.lineageDatasets.add(lineageDataset);
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java
index fbc4ac4b2d4..58d2006cf7e 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java
@@ -32,8 +32,13 @@ public class DefaultSourceLineageVertex implements
SourceLineageVertex {
private List<LineageDataset> lineageDatasets;
public DefaultSourceLineageVertex(Boundedness boundedness) {
- this.lineageDatasets = new ArrayList<>();
+ this(boundedness, new ArrayList<>());
+ }
+
+ public DefaultSourceLineageVertex(
+ Boundedness boundedness, List<LineageDataset> lineageDatasets) {
this.boundedness = boundedness;
+ this.lineageDatasets = lineageDatasets;
}
public void addDataset(LineageDataset lineageDataset) {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultTypeDatasetFacet.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultTypeDatasetFacet.java
new file mode 100644
index 00000000000..b3675f2ae92
--- /dev/null
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultTypeDatasetFacet.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Objects;
+
+/** Default implementation of {@link TypeDatasetFacet}. */
+@PublicEvolving
+public class DefaultTypeDatasetFacet implements TypeDatasetFacet {
+
+ public static final String TYPE_FACET_NAME = "type";
+
+ private final TypeInformation typeInformation;
+
+ public DefaultTypeDatasetFacet(TypeInformation typeInformation) {
+ this.typeInformation = typeInformation;
+ }
+
+ public TypeInformation getTypeInformation() {
+ return typeInformation;
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o;
+ return Objects.equals(typeInformation, that.typeInformation);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(typeInformation);
+ }
+
+ @Override
+ public String name() {
+ return TYPE_FACET_NAME;
+ }
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageUtils.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageUtils.java
new file mode 100644
index 00000000000..363def24a13
--- /dev/null
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.Boundedness;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Util class for creating default LineageDataset and LineageVertex. */
+@Internal
+public class LineageUtils {
+
+ public static LineageDataset datasetOf(
+ String name, String namespace, TypeDatasetFacet typeDatasetFacet) {
+ return datasetOf(name, namespace,
Collections.singletonList(typeDatasetFacet));
+ }
+
+ public static LineageDataset datasetOf(
+ String name, String namespace, List<LineageDatasetFacet> facets) {
+ return new DefaultLineageDataset(
+ name,
+ namespace,
+
facets.stream().collect(Collectors.toMap(LineageDatasetFacet::name, item ->
item)));
+ }
+
+ public static SourceLineageVertex sourceLineageVertexOf(
+ Boundedness boundedness, LineageDataset dataset) {
+ return new DefaultSourceLineageVertex(boundedness,
Collections.singletonList(dataset));
+ }
+
+ public static LineageVertex lineageVertexOf(LineageDataset dataset) {
+ return new DefaultLineageVertex(Collections.singletonList(dataset));
+ }
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacet.java
similarity index 59%
copy from
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
copy to
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacet.java
index 289021e3d2d..54d383c7532 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacet.java
@@ -18,26 +18,15 @@
package org.apache.flink.streaming.api.lineage;
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
-import java.util.ArrayList;
-import java.util.List;
+import javax.annotation.Nonnull;
-/** Default implementation for {@link LineageVertex}. */
-@Internal
-public class DefaultLineageVertex implements LineageVertex {
- private List<LineageDataset> lineageDatasets;
+/** Facet definition to contain type information of source and sink. */
+@PublicEvolving
+public interface TypeDatasetFacet extends LineageDatasetFacet {
- public DefaultLineageVertex() {
- this.lineageDatasets = new ArrayList<>();
- }
-
- public void addLineageDataset(LineageDataset lineageDataset) {
- this.lineageDatasets.add(lineageDataset);
- }
-
- @Override
- public List<LineageDataset> datasets() {
- return lineageDatasets;
- }
+ @Nonnull
+ TypeInformation getTypeInformation();
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacetProvider.java
similarity index 59%
copy from
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
copy to
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacetProvider.java
index 289021e3d2d..2ff65599ea3 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/TypeDatasetFacetProvider.java
@@ -18,26 +18,17 @@
package org.apache.flink.streaming.api.lineage;
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Optional;
-/** Default implementation for {@link LineageVertex}. */
-@Internal
-public class DefaultLineageVertex implements LineageVertex {
- private List<LineageDataset> lineageDatasets;
+/** Contains method to extract {@link TypeDatasetFacet}. */
+@PublicEvolving
+public interface TypeDatasetFacetProvider {
- public DefaultLineageVertex() {
- this.lineageDatasets = new ArrayList<>();
- }
-
- public void addLineageDataset(LineageDataset lineageDataset) {
- this.lineageDatasets.add(lineageDataset);
- }
-
- @Override
- public List<LineageDataset> datasets() {
- return lineageDatasets;
- }
+ /**
+ * Returns a type dataset facet or `Optional.empty` in case an
implementing class is not able to
+ * resolve type.
+ */
+ Optional<TypeDatasetFacet> getTypeDatasetFacet();
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java
index f9fe10f9ae4..1226c8a0665 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java
@@ -194,9 +194,7 @@ class LineageGraphUtilsTest {
LineageDataset lineageDataset =
new DefaultLineageDataset(
SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new
HashMap<>());
- DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
- lineageVertex.addLineageDataset(lineageDataset);
- return lineageVertex;
+ return LineageUtils.lineageVertexOf(lineageDataset);
}
}
@@ -212,10 +210,7 @@ class LineageGraphUtilsTest {
LineageDataset lineageDataset =
new DefaultLineageDataset(
SOURCE_DATASET_NAME, SOURCE_DATASET_NAMESPACE, new
HashMap<>());
- DefaultSourceLineageVertex lineageVertex =
- new DefaultSourceLineageVertex(Boundedness.BOUNDED);
- lineageVertex.addDataset(lineageDataset);
- return lineageVertex;
+ return LineageUtils.sourceLineageVertexOf(Boundedness.BOUNDED,
lineageDataset);
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageUtilsTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageUtilsTest.java
new file mode 100644
index 00000000000..0a2e9a2afb6
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageUtilsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.Boundedness;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Testing for lineage util. */
+public class LineageUtilsTest {
+ private static final String TEST_NAME = "testName";
+ private static final String TEST_NAMESPACE = "testNameSpace";
+
+ @Test
+ public void testDataSetOf() {
+ DefaultTypeDatasetFacet typeDatasetFacet = new
DefaultTypeDatasetFacet(Types.BIG_INT);
+ LineageDataset dataset =
+ LineageUtils.datasetOf(TEST_NAME, TEST_NAMESPACE,
typeDatasetFacet);
+
+ assertThat(dataset.name()).isEqualTo(TEST_NAME);
+ assertThat(dataset.namespace()).isEqualTo(TEST_NAMESPACE);
+ assertThat(dataset.facets()).size().isEqualTo(1);
+
assertThat(dataset.facets().get(typeDatasetFacet.name())).isEqualTo(typeDatasetFacet);
+ }
+
+ @Test
+ public void testSourceLineageVertexOf() {
+ LineageDataset dataset =
+ LineageUtils.datasetOf(
+ TEST_NAME, TEST_NAMESPACE, new
DefaultTypeDatasetFacet(Types.BIG_INT));
+ SourceLineageVertex sourceLineageVertex =
+
LineageUtils.sourceLineageVertexOf(Boundedness.CONTINUOUS_UNBOUNDED, dataset);
+
+
assertThat(sourceLineageVertex.boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
+ assertThat(sourceLineageVertex.datasets()).containsExactly(dataset);
+ }
+
+ @Test
+ public void testLineageVertexOf() {
+ LineageDataset dataset =
+ LineageUtils.datasetOf(
+ TEST_NAME, TEST_NAMESPACE, new
DefaultTypeDatasetFacet(Types.BIG_INT));
+ LineageVertex lineageVertex = LineageUtils.lineageVertexOf(dataset);
+ assertThat(lineageVertex.datasets()).containsExactly(dataset);
+ }
+}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
index 0478b6d409f..b9cc3c1313a 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
@@ -37,10 +37,10 @@ import
org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.lineage.DefaultLineageDataset;
-import org.apache.flink.streaming.api.lineage.DefaultLineageVertex;
import org.apache.flink.streaming.api.lineage.DefaultSourceLineageVertex;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageGraph;
+import org.apache.flink.streaming.api.lineage.LineageUtils;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent;
@@ -322,9 +322,7 @@ public class JobStatusChangedListenerITCase extends
TestLogger {
LineageDataset lineageDataset =
new DefaultLineageDataset(
SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new
HashMap<>());
- DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
- lineageVertex.addLineageDataset(lineageDataset);
- return lineageVertex;
+ return LineageUtils.lineageVertexOf(lineageDataset);
}
}
}