This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1b8e776a915fd243e9088fb05be603b446cc663d Author: Chesnay Schepler <[email protected]> AuthorDate: Wed Jul 6 12:00:17 2022 +0200 [FLINK-28644][datastream] Add DataStream#collectAsync --- docs/content.zh/docs/dev/datastream/overview.md | 9 +-- docs/content/docs/dev/datastream/overview.md | 9 +-- ...st_stream_execution_environment_completeness.py | 3 +- .../flink/streaming/api/datastream/DataStream.java | 84 +++++++++++++++++++++- .../environment/StreamExecutionEnvironment.java | 10 +++ .../flink/streaming/api/scala/DataStream.scala | 38 +++++++++- .../scala/StreamingScalaAPICompletenessTest.scala | 2 + .../datastream/DataStreamCollectTestITCase.java | 73 +++++++++++++++++++ 8 files changed, 209 insertions(+), 19 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/overview.md b/docs/content.zh/docs/dev/datastream/overview.md index 71da0af1ab2..2bf572edb45 100644 --- a/docs/content.zh/docs/dev/datastream/overview.md +++ b/docs/content.zh/docs/dev/datastream/overview.md @@ -658,21 +658,16 @@ Flink 还提供了一个 sink 来收集 DataStream 的结果,它用于测试 {{< tab "Java" >}} ```java -import org.apache.flink.streaming.experimental.DataStreamUtils - DataStream<Tuple2<String, Integer>> myResult = ... -Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult) +Iterator<Tuple2<String, Integer>> myOutput = myResult.collectAsync(); ``` {{< /tab >}} {{< tab "Scala" >}} ```scala -import org.apache.flink.streaming.experimental.DataStreamUtils -import scala.collection.JavaConverters.asScalaIteratorConverter - val myResult: DataStream[(String, Int)] = ... -val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala +val myOutput: Iterator[(String, Int)] = myResult.collectAsync() ``` {{< /tab >}} {{< /tabs >}} diff --git a/docs/content/docs/dev/datastream/overview.md b/docs/content/docs/dev/datastream/overview.md index 1d36134b0c3..6b9d8f80713 100644 --- a/docs/content/docs/dev/datastream/overview.md +++ b/docs/content/docs/dev/datastream/overview.md @@ -777,21 +777,16 @@ Flink also provides a sink to collect DataStream results for testing and debuggi {{< tabs "125e228e-13b5-4c77-93a7-c0f436fcdd2f" >}} {{< tab "Java" >}} ```java -import org.apache.flink.streaming.experimental.DataStreamUtils; - DataStream<Tuple2<String, Integer>> myResult = ...; -Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult); +Iterator<Tuple2<String, Integer>> myOutput = myResult.collectAsync(); ``` {{< /tab >}} {{< tab "Scala" >}} ```scala -import org.apache.flink.streaming.experimental.DataStreamUtils -import scala.collection.JavaConverters.asScalaIteratorConverter - val myResult: DataStream[(String, Int)] = ... -val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala +val myOutput: Iterator[(String, Int)] = myResult.collectAsync() ``` {{< /tab >}} {{< /tabs >}} diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py index 2db58fc414c..7fcefe5d21e 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py @@ -49,7 +49,8 @@ class StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase, 'socketTextStream', 'initializeContextEnvironment', 'readTextFile', 'setNumberOfExecutionRetries', 'executeAsync', 'registerJobListener', 'clearJobListeners', 'getJobListeners', 'fromSequence', 'getConfiguration', - 'generateStreamGraph', 'getTransformations', 'areExplicitEnvironmentsAllowed'} + 'generateStreamGraph', 'getTransformations', 'areExplicitEnvironmentsAllowed', + 'registerCollectIterator'} if __name__ == '__main__': diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index b0c3d649665..d2d28b49d6b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.api.datastream; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; @@ -1368,7 +1369,48 @@ public class DataStream<T> { } } - ClientAndIterator<T> executeAndCollectWithClient(String jobExecutionName) throws Exception { + /** + * Sets up the collection of the elements in this {@link DataStream}, and returns an iterator + * over the collected elements that can be used to retrieve elements once the job execution has + * started. + * + * <p>Caution: When multiple streams are being collected it is recommended to consume all + * streams in parallel to not back-pressure the job. + * + * <p>Caution: Closing the returned iterator cancels the job! It is recommended to close all + * iterators once you are no longer interested in any of the collected streams. + * + * <p>This method is functionally equivalent to {@link #collectAsync(Collector)}. + * + * @return iterator over the contained elements + */ + @Experimental + public CloseableIterator<T> collectAsync() { + final Collector<T> collector = new Collector<>(); + collectAsync(collector); + return collector.getOutput(); + } + + /** + * Sets up the collection of the elements in this {@link DataStream}, which can be retrieved + * later via the given {@link Collector}. + * + * <p>Caution: When multiple streams are being collected it is recommended to consume all + * streams in parallel to not back-pressure the job. + * + * <p>Caution: Closing the iterator from the collector cancels the job! It is recommended to + * close all iterators once you are no longer interested in any of the collected streams. + * + * <p>This method is functionally equivalent to {@link #collectAsync()}. + * + * <p>This method is meant to support use-cases where the application of a sink is done via a + * {@code Consumer<DataStream<T>>}, where it wouldn't be possible (or inconvenient) to return an + * iterator. + * + * @param collector a collector that can be used to retrieve the elements + */ + @Experimental + public void collectAsync(Collector<T> collector) { TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig()); String accumulatorName = "dataStreamCollect_" + UUID.randomUUID().toString(); @@ -1387,8 +1429,44 @@ public class DataStream<T> { sink.name("Data stream collect sink"); env.addOperator(sink.getTransformation()); - final JobClient jobClient = env.executeAsync(jobExecutionName); - iterator.setJobClient(jobClient); + env.registerCollectIterator(iterator); + collector.setIterator(iterator); + } + + /** + * This class acts as an accessor to elements collected via {@link #collectAsync(Collector)}. + * + * @param <T> the element type + */ + @Experimental + public static class Collector<T> { + private CloseableIterator<T> iterator; + + @Internal + void setIterator(CloseableIterator<T> iterator) { + this.iterator = iterator; + } + + /** + * Returns an iterator over the collected elements. The returned iterator must only be used + * once the job execution was triggered. + * + * <p>This method will always return the same iterator instance. + * + * @return iterator over collected elements + */ + public CloseableIterator<T> getOutput() { + // we intentionally fail here instead of waiting, because it indicates a + // misunderstanding on the user and would usually just block the application + Preconditions.checkNotNull(iterator, "The job execution was not yet started."); + return iterator; + } + } + + ClientAndIterator<T> executeAndCollectWithClient(String jobExecutionName) throws Exception { + final CloseableIterator<T> iterator = collectAsync(); + + final JobClient jobClient = getExecutionEnvironment().executeAsync(jobExecutionName); return new ClientAndIterator<>(jobClient, iterator); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 0aadce4615d..e298905a566 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -97,6 +97,7 @@ import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; import org.apache.flink.util.DynamicCodeLoadingException; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -143,6 +144,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Public public class StreamExecutionEnvironment { + private static final List<CollectResultIterator<?>> collectIterators = new ArrayList<>(); + + @Internal + public void registerCollectIterator(CollectResultIterator<?> iterator) { + collectIterators.add(iterator); + } + /** * The default name to use for a streaming job if no other name has been specified. * @@ -2168,6 +2176,8 @@ public class StreamExecutionEnvironment { try { JobClient jobClient = jobClientFuture.get(); jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null)); + collectIterators.forEach(iterator -> iterator.setJobClient(jobClient)); + collectIterators.clear(); return jobClient; } catch (ExecutionException executionException) { final Throwable strippedException = diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 2f45948e9fe..e0f05a2cb64 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -17,7 +17,7 @@ */ package org.apache.flink.streaming.api.scala -import org.apache.flink.annotation.{Internal, Public, PublicEvolving} +import org.apache.flink.annotation.{Experimental, Internal, Public, PublicEvolving} import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.eventtime.{TimestampAssigner, WatermarkGenerator, WatermarkStrategy} import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner} @@ -1202,6 +1202,42 @@ class DataStream[T](stream: JavaStream[T]) { def executeAndCollect(jobExecutionName: String, limit: Int): List[T] = stream.executeAndCollect(jobExecutionName, limit).asScala.toList + /** + * Sets up the collection of the elements in this [[DataStream]], and returns an iterator over the + * collected elements that can be used to retrieve elements once the job execution has started. + * + * <p>Caution: When multiple streams are being collected it is recommended to consume all streams + * in parallel to not back-pressure the job. + * + * <p>Caution: Closing the returned iterator cancels the job! It is recommended to close all + * iterators once you are no longer interested in any of the collected streams. + * + * @return + * iterator over the contained elements + */ + @Experimental + def collectAsync(): CloseableIterator[T] = CloseableIterator.fromJava(stream.collectAsync()) + + /** + * Sets up the collection of the elements in this [[DataStream]], which can be retrieved later via + * the given [[Collector]]. + * + * <p>Caution: When multiple streams are being collected it is recommended to consume all streams + * in parallel to not back-pressure the job. + * + * <p>Caution: Closing the iterator from the collector cancels the job! It is recommended to close + * all iterators once you are no longer interested in any of the collected streams. + * + * <p>This method is meant to support use-cases where the application of a sink is done via a + * [[java.util.function.Consumer]], where it wouldn't be possible (or inconvenient) to return an + * iterator. + * + * @param collector + * a collector that can be used to retrieve the elements + */ + @Experimental + def collectAsync(collector: JavaStream.Collector[T]) = stream.collectAsync(collector) + /** * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning is * not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala index 804061090ca..35b205f2b74 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala @@ -63,6 +63,8 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTransformations", "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment" + ".areExplicitEnvironmentsAllowed", + "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment" + + ".registerCollectIterator", // TypeHints are only needed for Java API, Scala API doesn't need them "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.returns", diff --git a/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamCollectTestITCase.java b/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamCollectTestITCase.java index 575c8e7bf2d..5157f418869 100644 --- a/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamCollectTestITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamCollectTestITCase.java @@ -31,6 +31,9 @@ import org.junit.Assert; import org.junit.Test; import java.util.List; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; /** * Tests for {@code DataStream} collect methods. @@ -107,4 +110,74 @@ public class DataStreamCollectTestITCase extends TestLogger { 1, results.size()); } + + @Test + public void testAsyncCollect() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final DataStream<Integer> stream1 = env.fromElements(1, 2, 3, 4, 5); + final DataStream<Integer> stream2 = env.fromElements(6, 7, 8, 9, 10); + + try (final CloseableIterator<Integer> iterator1 = stream1.collectAsync(); + final CloseableIterator<Integer> iterator2 = stream2.collectAsync()) { + env.executeAsync(); + + for (int x = 1; x < 6; x++) { + assertThat(iterator1.hasNext()).isTrue(); + assertThat(iterator1.next()).isEqualTo(x); + } + + for (int x = 6; x < 11; x++) { + assertThat(iterator2.hasNext()).isTrue(); + assertThat(iterator2.next()).isEqualTo(x); + } + } + } + + @Test + public void testAsyncCollectWithCollector() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final DataStream.Collector<Integer> collector1 = new DataStream.Collector<>(); + final DataStream.Collector<Integer> collector2 = new DataStream.Collector<>(); + + defineWorkflowAndApplySink( + env, + stream -> stream.collectAsync(collector1), + stream -> stream.collectAsync(collector2)); + + try (final CloseableIterator<Integer> iterator1 = collector1.getOutput(); + final CloseableIterator<Integer> iterator2 = collector2.getOutput()) { + env.executeAsync(); + + for (int x = 1; x < 6; x++) { + assertThat(iterator1.hasNext()).isTrue(); + assertThat(iterator1.next()).isEqualTo(x); + } + + for (int x = 6; x < 11; x++) { + assertThat(iterator2.hasNext()).isTrue(); + assertThat(iterator2.next()).isEqualTo(x); + } + } + } + + /** + * This method, while looking odd, was intentionally added to show-case what use-case {@link + * DataStream#collectAsync(DataStream.Collector)} serves (w.r.t. the Consumer). + * + * <p>If whatever refactoring you're thinking of doesn't support this method in a convenient way + * then you should reconsider it. + */ + private static void defineWorkflowAndApplySink( + StreamExecutionEnvironment env, + Consumer<DataStream<Integer>> sink1Applier, + Consumer<DataStream<Integer>> sink2Applier) { + + final DataStream<Integer> stream1 = env.fromElements(1, 2, 3, 4, 5); + final DataStream<Integer> stream2 = env.fromElements(6, 7, 8, 9, 10); + + sink1Applier.accept(stream1); + sink2Applier.accept(stream2); + } }
