This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new bcacea9ca37 [FLINK-34657] extract lineage info for stream API (#25056)
bcacea9ca37 is described below
commit bcacea9ca376be62e83f9927393af0418f6f0711
Author: Peter Huang <[email protected]>
AuthorDate: Sat Jul 20 07:01:31 2024 -0700
[FLINK-34657] extract lineage info for stream API (#25056)
---
.../streaming/api/datastream/DataStreamSink.java | 12 +-
.../api/lineage/DefaultLineageVertex.java | 43 +++++
.../api/lineage/DefaultSourceLineageVertex.java | 52 ++++++
.../LegacySourceTransformation.java | 10 ++
.../api/transformations/SourceTransformation.java | 9 +
.../api/lineage/LineageGraphUtilsTest.java | 188 +++++++++++++++++++++
.../execution/JobStatusChangedListenerITCase.java | 68 +++++++-
7 files changed, 379 insertions(+), 3 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 7b8d241e6ec..7bcc7b3e95e 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
@@ -55,13 +56,18 @@ public class DataStreamSink<T> {
StreamSink<T> sinkOperator = new StreamSink<>(sinkFunction);
final StreamExecutionEnvironment executionEnvironment =
inputStream.getExecutionEnvironment();
- PhysicalTransformation<T> transformation =
+ LegacySinkTransformation<T> transformation =
new LegacySinkTransformation<>(
inputStream.getTransformation(),
"Unnamed",
sinkOperator,
executionEnvironment.getParallelism(),
false);
+ if (sinkFunction instanceof LineageVertexProvider) {
+ transformation.setLineageVertex(
+ ((LineageVertexProvider) sinkFunction).getLineageVertex());
+ }
+
executionEnvironment.addOperator(transformation);
return new DataStreamSink<>(transformation);
}
@@ -82,6 +88,10 @@ public class DataStreamSink<T> {
executionEnvironment.getParallelism(),
false,
customSinkOperatorUidHashes);
+ if (sink instanceof LineageVertexProvider) {
+ transformation.setLineageVertex(((LineageVertexProvider)
sink).getLineageVertex());
+ }
+
executionEnvironment.addOperator(transformation);
return new DataStreamSink<>(transformation);
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
new file mode 100644
index 00000000000..289021e3d2d
--- /dev/null
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageVertex.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Default implementation for {@link LineageVertex}. */
+@Internal
+public class DefaultLineageVertex implements LineageVertex {
+ private List<LineageDataset> lineageDatasets;
+
+ public DefaultLineageVertex() {
+ this.lineageDatasets = new ArrayList<>();
+ }
+
+ public void addLineageDataset(LineageDataset lineageDataset) {
+ this.lineageDatasets.add(lineageDataset);
+ }
+
+ @Override
+ public List<LineageDataset> datasets() {
+ return lineageDatasets;
+ }
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java
new file mode 100644
index 00000000000..fbc4ac4b2d4
--- /dev/null
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultSourceLineageVertex.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.Boundedness;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Default implementation for {@link SourceLineageVertex}. */
+@Internal
+public class DefaultSourceLineageVertex implements SourceLineageVertex {
+ private Boundedness boundedness;
+ private List<LineageDataset> lineageDatasets;
+
+ public DefaultSourceLineageVertex(Boundedness boundedness) {
+ this.lineageDatasets = new ArrayList<>();
+ this.boundedness = boundedness;
+ }
+
+ public void addDataset(LineageDataset lineageDataset) {
+ this.lineageDatasets.add(lineageDataset);
+ }
+
+ @Override
+ public List<LineageDataset> datasets() {
+ return this.lineageDatasets;
+ }
+
+ @Override
+ public Boundedness boundedness() {
+ return this.boundedness;
+ }
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java
index 51b101e47b6..c77fc4d154b 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java
@@ -23,6 +23,8 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
@@ -69,6 +71,7 @@ public class LegacySourceTransformation<T> extends
TransformationWithLineage<T>
super(name, outputType, parallelism, parallelismConfigured);
this.operatorFactory =
checkNotNull(SimpleOperatorFactory.of(operator));
this.boundedness = checkNotNull(boundedness);
+ this.extractLineageVertex(operator);
}
/** Mutable for legacy sources in the Table API. */
@@ -105,4 +108,11 @@ public class LegacySourceTransformation<T> extends
TransformationWithLineage<T>
public final void setChainingStrategy(ChainingStrategy strategy) {
operatorFactory.setChainingStrategy(strategy);
}
+
+ private void extractLineageVertex(StreamSource<T, ?> operator) {
+ SourceFunction sourceFunction = operator.getUserFunction();
+ if (sourceFunction instanceof LineageVertexProvider) {
+ setLineageVertex(((LineageVertexProvider)
sourceFunction).getLineageVertex());
+ }
+ }
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
index c5190d4477d..18389746a74 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import javax.annotation.Nullable;
@@ -64,6 +65,7 @@ public class SourceTransformation<OUT, SplitT extends
SourceSplit, EnumChkT>
super(name, outputType, parallelism);
this.source = source;
this.watermarkStrategy = watermarkStrategy;
+ this.extractLineageVertex();
}
public SourceTransformation(
@@ -76,6 +78,7 @@ public class SourceTransformation<OUT, SplitT extends
SourceSplit, EnumChkT>
super(name, outputType, parallelism, parallelismConfigured);
this.source = source;
this.watermarkStrategy = watermarkStrategy;
+ this.extractLineageVertex();
}
public Source<OUT, SplitT, EnumChkT> getSource() {
@@ -118,4 +121,10 @@ public class SourceTransformation<OUT, SplitT extends
SourceSplit, EnumChkT>
public String getCoordinatorListeningID() {
return coordinatorListeningID;
}
+
+ private void extractLineageVertex() {
+ if (source instanceof LineageVertexProvider) {
+ setLineageVertex(((LineageVertexProvider)
source).getLineageVertex());
+ }
+ }
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java
new file mode 100644
index 00000000000..75898fedee5
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Testing for lineage graph util. */
+class LineageGraphUtilsTest {
+ private static final String SOURCE_DATASET_NAME = "LineageSource";
+ private static final String SOURCE_DATASET_NAMESPACE =
"source://LineageSource";
+ private static final String SINK_DATASET_NAME = "LineageSink";
+ private static final String SINK_DATASET_NAMESPACE = "sink://LineageSink";
+
+ private static final String LEGACY_SOURCE_DATASET_NAME =
"LineageSourceFunction";
+ private static final String LEGACY_SOURCE_DATASET_NAMESPACE =
"source://LineageSourceFunction";
+ private static final String LEGACY_SINK_DATASET_NAME =
"LineageSinkFunction";
+ private static final String LEGACY_SINK_DATASET_NAMESPACE =
"sink://LineageSinkFunction";
+
+ @Test
+ void testExtractLineageGraphFromLegacyTransformations() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStreamSource<Long> source = env.addSource(new
LineageSourceFunction());
+ DataStreamSink<Long> sink = source.addSink(new LineageSinkFunction());
+
+ LineageGraph lineageGraph =
+
LineageGraphUtils.convertToLineageGraph(Arrays.asList(sink.getTransformation()));
+
+ assertThat(lineageGraph.sources().size()).isEqualTo(1);
+ assertThat(lineageGraph.sources().get(0).boundedness())
+ .isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
+
assertThat(lineageGraph.sources().get(0).datasets().size()).isEqualTo(1);
+ assertThat(lineageGraph.sources().get(0).datasets().get(0).name())
+ .isEqualTo(LEGACY_SOURCE_DATASET_NAME);
+ assertThat(lineageGraph.sources().get(0).datasets().get(0).namespace())
+ .isEqualTo(LEGACY_SOURCE_DATASET_NAMESPACE);
+
+ assertThat(lineageGraph.sinks().size()).isEqualTo(1);
+ assertThat(lineageGraph.sinks().get(0).datasets().size()).isEqualTo(1);
+ assertThat(lineageGraph.sinks().get(0).datasets().get(0).name())
+ .isEqualTo(LEGACY_SINK_DATASET_NAME);
+ assertThat(lineageGraph.sinks().get(0).datasets().get(0).namespace())
+ .isEqualTo(LEGACY_SINK_DATASET_NAMESPACE);
+
+ assertThat(lineageGraph.relations().size()).isEqualTo(1);
+ }
+
+ @Test
+ void testExtractLineageGraphFromTransformations() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStreamSource<Long> source =
+ env.fromSource(new LineageSource(1L, 5L),
WatermarkStrategy.noWatermarks(), "");
+ DataStreamSink<Long> sink = source.sinkTo(new LineageSink());
+
+ LineageGraph lineageGraph =
+
LineageGraphUtils.convertToLineageGraph(Arrays.asList(sink.getTransformation()));
+
+ assertThat(lineageGraph.sources().size()).isEqualTo(1);
+
assertThat(lineageGraph.sources().get(0).boundedness()).isEqualTo(Boundedness.BOUNDED);
+
assertThat(lineageGraph.sources().get(0).datasets().size()).isEqualTo(1);
+ assertThat(lineageGraph.sources().get(0).datasets().get(0).name())
+ .isEqualTo(SOURCE_DATASET_NAME);
+ assertThat(lineageGraph.sources().get(0).datasets().get(0).namespace())
+ .isEqualTo(SOURCE_DATASET_NAMESPACE);
+
+ assertThat(lineageGraph.sinks().size()).isEqualTo(1);
+ assertThat(lineageGraph.sinks().get(0).datasets().size()).isEqualTo(1);
+ assertThat(lineageGraph.sinks().get(0).datasets().get(0).name())
+ .isEqualTo(SINK_DATASET_NAME);
+ assertThat(lineageGraph.sinks().get(0).datasets().get(0).namespace())
+ .isEqualTo(SINK_DATASET_NAMESPACE);
+
+ assertThat(lineageGraph.relations().size()).isEqualTo(1);
+ }
+
+ private static class LineageSink extends DiscardingSink<Long> implements
LineageVertexProvider {
+ public LineageSink() {
+ super();
+ }
+
+ @Override
+ public LineageVertex getLineageVertex() {
+ LineageDataset lineageDataset =
+ new DefaultLineageDataset(
+ SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new
HashMap<>());
+ DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
+ lineageVertex.addLineageDataset(lineageDataset);
+ return lineageVertex;
+ }
+ }
+
+ private static class LineageSource extends NumberSequenceSource
+ implements LineageVertexProvider {
+
+ public LineageSource(long from, long to) {
+ super(from, to);
+ }
+
+ @Override
+ public LineageVertex getLineageVertex() {
+ LineageDataset lineageDataset =
+ new DefaultLineageDataset(
+ SOURCE_DATASET_NAME, SOURCE_DATASET_NAMESPACE, new
HashMap<>());
+ DefaultSourceLineageVertex lineageVertex =
+ new DefaultSourceLineageVertex(Boundedness.BOUNDED);
+ lineageVertex.addDataset(lineageDataset);
+ return lineageVertex;
+ }
+ }
+
+ private static class LineageSourceFunction
+ implements SourceFunction<Long>, LineageVertexProvider {
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext<Long> ctx) throws Exception {
+ long next = 0;
+ while (running) {
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collect(next++);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public LineageVertex getLineageVertex() {
+ LineageDataset lineageDataset =
+ new DefaultLineageDataset(
+ LEGACY_SOURCE_DATASET_NAME,
+ LEGACY_SOURCE_DATASET_NAMESPACE,
+ new HashMap<>());
+ DefaultSourceLineageVertex lineageVertex =
+ new
DefaultSourceLineageVertex(Boundedness.CONTINUOUS_UNBOUNDED);
+ lineageVertex.addDataset(lineageDataset);
+ return lineageVertex;
+ }
+ }
+
+ private static class LineageSinkFunction implements SinkFunction<Long>,
LineageVertexProvider {
+
+ @Override
+ public LineageVertex getLineageVertex() {
+ LineageDataset lineageDataset =
+ new DefaultLineageDataset(
+ LEGACY_SINK_DATASET_NAME,
+ LEGACY_SINK_DATASET_NAMESPACE,
+ new HashMap<>());
+ DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
+ lineageVertex.addLineageDataset(lineageDataset);
+ return lineageVertex;
+ }
+ }
+}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
index dd58ef184dc..8530aec01c6 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.DefaultJobExecutionStatusEvent;
@@ -36,7 +37,15 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.lineage.DefaultLineageDataset;
+import org.apache.flink.streaming.api.lineage.DefaultLineageVertex;
+import org.apache.flink.streaming.api.lineage.DefaultSourceLineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageDataset;
+import org.apache.flink.streaming.api.lineage.LineageGraph;
+import org.apache.flink.streaming.api.lineage.LineageVertex;
+import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent;
+import org.apache.flink.streaming.runtime.execution.JobCreatedEvent;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.TestLogger;
@@ -51,6 +60,7 @@ import org.junit.runners.MethodSorters;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import static
org.apache.flink.configuration.DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS;
@@ -61,6 +71,11 @@ import static org.assertj.core.api.Assertions.assertThat;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class JobStatusChangedListenerITCase extends TestLogger {
private static final int PARALLELISM = 4;
+ private static final String SOURCE_DATASET_NAME = "LineageSource";
+ private static final String SOURCE_DATASET_NAMESPACE =
"source://LineageSource";
+ private static final String SINK_DATASET_NAME = "LineageSink";
+ private static final String SINK_DATASET_NAMESPACE = "sink://LineageSink";
+
@ClassRule public static final TemporaryFolder TMP_FOLDER = new
TemporaryFolder();
@ClassRule
@@ -162,6 +177,7 @@ public class JobStatusChangedListenerITCase extends
TestLogger {
StreamGraph streamGraph = env.getStreamGraph();
JobGraph jobGraph = streamGraph.getJobGraph();
+ verifyLineageGraph(streamGraph.getLineageGraph());
ClusterClient<?> client = MINI_CLUSTER.getClusterClient();
JobID jobID = client.submitJob(jobGraph).get();
waitForAllTaskRunning(MINI_CLUSTER.getMiniCluster(), jobID, false);
@@ -183,9 +199,35 @@ public class JobStatusChangedListenerITCase extends
TestLogger {
|| (status.oldStatus() ==
JobStatus.CANCELLING
&& status.newStatus() ==
JobStatus.CANCELED))
.isTrue();
+
+ if (event instanceof JobCreatedEvent) {
+ LineageGraph lineageGraph = ((JobCreatedEvent)
event).lineageGraph();
+ assertThat(lineageGraph.sources().size()).isEqualTo(1);
+ assertThat(lineageGraph.sinks().size()).isEqualTo(1);
+ }
});
}
+ void verifyLineageGraph(LineageGraph lineageGraph) {
+ assertThat(lineageGraph.sources().size()).isEqualTo(1);
+ assertThat(lineageGraph.sources().get(0).boundedness())
+ .isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
+
assertThat(lineageGraph.sources().get(0).datasets().size()).isEqualTo(1);
+ assertThat(lineageGraph.sources().get(0).datasets().get(0).name())
+ .isEqualTo(SOURCE_DATASET_NAME);
+ assertThat(lineageGraph.sources().get(0).datasets().get(0).namespace())
+ .isEqualTo(SOURCE_DATASET_NAMESPACE);
+
+ assertThat(lineageGraph.sinks().size()).isEqualTo(1);
+ assertThat(lineageGraph.sinks().get(0).datasets().size()).isEqualTo(1);
+ assertThat(lineageGraph.sinks().get(0).datasets().get(0).name())
+ .isEqualTo(SINK_DATASET_NAME);
+ assertThat(lineageGraph.sinks().get(0).datasets().get(0).namespace())
+ .isEqualTo(SINK_DATASET_NAMESPACE);
+
+ assertThat(lineageGraph.relations().size()).isEqualTo(1);
+ }
+
void verifyEventMetaData() {
assertThat(statusChangedEvents.size()).isEqualTo(3);
assertThat(statusChangedEvents.get(0).jobId())
@@ -238,7 +280,8 @@ public class JobStatusChangedListenerITCase extends
TestLogger {
public void cancel() {}
}
- private static class InfiniteLongSourceFunction implements
SourceFunction<Long> {
+ private static class InfiniteLongSourceFunction
+ implements SourceFunction<Long>, LineageVertexProvider {
private volatile boolean running = true;
@Override
@@ -255,12 +298,33 @@ public class JobStatusChangedListenerITCase extends
TestLogger {
public void cancel() {
running = false;
}
+
+ @Override
+ public LineageVertex getLineageVertex() {
+ LineageDataset lineageDataset =
+ new DefaultLineageDataset(
+ SOURCE_DATASET_NAME, SOURCE_DATASET_NAMESPACE, new
HashMap<>());
+ DefaultSourceLineageVertex lineageVertex =
+ new
DefaultSourceLineageVertex(Boundedness.CONTINUOUS_UNBOUNDED);
+ lineageVertex.addDataset(lineageDataset);
+ return lineageVertex;
+ }
}
- private static class SleepingSink implements SinkFunction<Long> {
+ private static class SleepingSink implements SinkFunction<Long>,
LineageVertexProvider {
@Override
public void invoke(Long value, Context context) throws Exception {
Thread.sleep(1_000);
}
+
+ @Override
+ public LineageVertex getLineageVertex() {
+ LineageDataset lineageDataset =
+ new DefaultLineageDataset(
+ SINK_DATASET_NAME, SINK_DATASET_NAMESPACE, new
HashMap<>());
+ DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
+ lineageVertex.addLineageDataset(lineageDataset);
+ return lineageVertex;
+ }
}
}