This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit ae23ab6c9afa6707f8fe3472ac4caa286e875ef7 Author: Stephan Ewen <[email protected]> AuthorDate: Wed Sep 2 15:23:27 2020 +0200 [refactor][DataStream API] Make DataStreamUtils.collect() methods more flexible. This supports simple ways of pulling bounded streams to the client, as well as a defined number of elements from an unbounded stream. --- .../streaming/api/datastream/DataStreamUtils.java | 136 +++++++++++++++++++-- 1 file changed, 124 insertions(+), 12 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java index a4f1560..a1acc9b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java @@ -31,9 +31,14 @@ import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.UUID; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A collection of utilities for {@link DataStream DataStreams}. */ @@ -41,34 +46,124 @@ import java.util.UUID; public final class DataStreamUtils { /** - * Returns an iterator to iterate over the elements of the DataStream. - * @return The iterator + * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements + * of the given DataStream. + * + * <p>The DataStream application is executed in the regular distributed manner on the target environment, + * and the events from the stream are polled back to this application process and thread through + * Flink's REST API. */ public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) { + return collect(stream, "Data Stream Collect"); + } + + /** + * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements + * of the given DataStream. + * + * <p>The DataStream application is executed in the regular distributed manner on the target environment, + * and the events from the stream are polled back to this application process and thread through + * Flink's REST API. + */ + public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream, String executionJobName) { + try { + return collectWithClient(stream, executionJobName).iterator; + } catch (Exception e) { + // this "wrap as unchecked" step is here only to preserve the exception signature + // backwards compatible. + throw new RuntimeException("Failed to execute data stream", e); + } + } + + /** + * Starts the execution of the program and returns an iterator to read the result of the + * given data stream, plus a {@link JobClient} to interact with the application execution. + */ + public static <OUT> ClientAndIterator<OUT> collectWithClient( + DataStream<OUT> stream, + String jobExecutionName) throws Exception { + TypeSerializer<OUT> serializer = stream.getType().createSerializer( - stream.getExecutionEnvironment().getConfig()); + stream.getExecutionEnvironment().getConfig()); String accumulatorName = "dataStreamCollect_" + UUID.randomUUID().toString(); + StreamExecutionEnvironment env = stream.getExecutionEnvironment(); CollectSinkOperatorFactory<OUT> factory = new CollectSinkOperatorFactory<>(serializer, accumulatorName); CollectSinkOperator<OUT> operator = (CollectSinkOperator<OUT>) factory.getOperator(); CollectResultIterator<OUT> iterator = new CollectResultIterator<>( - operator.getOperatorIdFuture(), serializer, accumulatorName); + operator.getOperatorIdFuture(), serializer, accumulatorName); CollectStreamSink<OUT> sink = new CollectStreamSink<>(stream, factory); sink.name("Data stream collect sink"); - - StreamExecutionEnvironment env = stream.getExecutionEnvironment(); env.addOperator(sink.getTransformation()); - try { - JobClient jobClient = env.executeAsync("Data Stream Collect"); - iterator.setJobClient(jobClient); - } catch (Exception e) { - throw new RuntimeException("Failed to execute data stream", e); + final JobClient jobClient = env.executeAsync(jobExecutionName); + iterator.setJobClient(jobClient); + + return new ClientAndIterator<>(jobClient, iterator); + } + + /** + * Collects contents the given DataStream into a list, assuming that the stream is a bounded stream. + * + * <p>This method blocks until the job execution is complete. By the time the method returns, the + * job will have reached its FINISHED status. + * + * <p>Note that if the stream is unbounded, this method will never return and might fail with an + * Out-of-Memory Error because it attempts to collect an infinite stream into a list. + * + * @throws Exception Exceptions that occur during the execution are forwarded. + */ + public static <E> List<E> collectBoundedStream(DataStream<E> stream, String jobName) throws Exception { + final ArrayList<E> list = new ArrayList<>(); + final Iterator<E> iter = collectWithClient(stream, jobName).iterator; + while (iter.hasNext()) { + list.add(iter.next()); } + list.trimToSize(); + return list; + } + + /** + * Triggers execution of the DataStream application and collects the given number of records from the stream. + * After the records are received, the execution is canceled. + */ + public static <E> List<E> collectUnboundedStream(DataStream<E> stream, int numElements, String jobName) throws Exception { + final ClientAndIterator<E> clientAndIterator = collectWithClient(stream, jobName); + final List<E> result = collectRecordsFromUnboundedStream(clientAndIterator, numElements); - return iterator; + // cancel the job not that we have received enough elements + clientAndIterator.client.cancel().get(); + + return result; } + public static <E> List<E> collectRecordsFromUnboundedStream( + final ClientAndIterator<E> client, + final int numElements) { + + checkNotNull(client, "client"); + checkArgument(numElements > 0, "numElement must be > 0"); + + final ArrayList<E> result = new ArrayList<>(numElements); + final Iterator<E> iterator = client.iterator; + + while (iterator.hasNext()) { + result.add(iterator.next()); + if (result.size() == numElements) { + return result; + } + } + + throw new IllegalArgumentException(String.format( + "The stream ended before reaching the requested %d records. Only %d records were received.", + numElements, result.size())); + } + + // ------------------------------------------------------------------------ + // Deriving a KeyedStream from a stream already partitioned by key + // without a shuffle + // ------------------------------------------------------------------------ + /** * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with the given * {@link KeySelector}. @@ -130,4 +225,21 @@ public final class DataStreamUtils { * Private constructor to prevent instantiation. */ private DataStreamUtils() {} + + // ------------------------------------------------------------------------ + + /** + * A pair of an {@link Iterator} to receive results from a streaming application and a + * {@link JobClient} to interact with the program. + */ + public static final class ClientAndIterator<E> { + + public final JobClient client; + public final Iterator<E> iterator; + + ClientAndIterator(JobClient client, Iterator<E> iterator) { + this.client = checkNotNull(client); + this.iterator = checkNotNull(iterator); + } + } }
