This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ae23ab6c9afa6707f8fe3472ac4caa286e875ef7
Author: Stephan Ewen <[email protected]>
AuthorDate: Wed Sep 2 15:23:27 2020 +0200

    [refactor][DataStream API] Make DataStreamUtils.collect() methods more 
flexible.
    
    This supports simple ways of pulling bounded streams to the client, as well 
as a defined number of
    elements from an unbounded stream.
---
 .../streaming/api/datastream/DataStreamUtils.java  | 136 +++++++++++++++++++--
 1 file changed, 124 insertions(+), 12 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
index a4f1560..a1acc9b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
@@ -31,9 +31,14 @@ import 
org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A collection of utilities for {@link DataStream DataStreams}.
  */
@@ -41,34 +46,124 @@ import java.util.UUID;
 public final class DataStreamUtils {
 
        /**
-        * Returns an iterator to iterate over the elements of the DataStream.
-        * @return The iterator
+        * Triggers the distributed execution of the streaming dataflow and 
returns an iterator over the elements
+        * of the given DataStream.
+        *
+        * <p>The DataStream application is executed in the regular distributed 
manner on the target environment,
+        * and the events from the stream are polled back to this application 
process and thread through
+        * Flink's REST API.
         */
        public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) {
+               return collect(stream, "Data Stream Collect");
+       }
+
+       /**
+        * Triggers the distributed execution of the streaming dataflow and 
returns an iterator over the elements
+        * of the given DataStream.
+        *
+        * <p>The DataStream application is executed in the regular distributed 
manner on the target environment,
+        * and the events from the stream are polled back to this application 
process and thread through
+        * Flink's REST API.
+        */
+       public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream, 
String executionJobName) {
+               try {
+                       return collectWithClient(stream, 
executionJobName).iterator;
+               } catch (Exception e) {
+                       // this "wrap as unchecked" step is here only to 
preserve the exception signature
+                       // backwards compatible.
+                       throw new RuntimeException("Failed to execute data 
stream", e);
+               }
+       }
+
+       /**
+        * Starts the execution of the program and returns an iterator to read 
the result of the
+        * given data stream, plus a {@link JobClient} to interact with the 
application execution.
+        */
+       public static <OUT> ClientAndIterator<OUT> collectWithClient(
+                       DataStream<OUT> stream,
+                       String jobExecutionName) throws Exception {
+
                TypeSerializer<OUT> serializer = 
stream.getType().createSerializer(
-                       stream.getExecutionEnvironment().getConfig());
+                               stream.getExecutionEnvironment().getConfig());
                String accumulatorName = "dataStreamCollect_" + 
UUID.randomUUID().toString();
 
+               StreamExecutionEnvironment env = 
stream.getExecutionEnvironment();
                CollectSinkOperatorFactory<OUT> factory = new 
CollectSinkOperatorFactory<>(serializer, accumulatorName);
                CollectSinkOperator<OUT> operator = (CollectSinkOperator<OUT>) 
factory.getOperator();
                CollectResultIterator<OUT> iterator = new 
CollectResultIterator<>(
-                       operator.getOperatorIdFuture(), serializer, 
accumulatorName);
+                               operator.getOperatorIdFuture(), serializer, 
accumulatorName);
                CollectStreamSink<OUT> sink = new CollectStreamSink<>(stream, 
factory);
                sink.name("Data stream collect sink");
-
-               StreamExecutionEnvironment env = 
stream.getExecutionEnvironment();
                env.addOperator(sink.getTransformation());
 
-               try {
-                       JobClient jobClient = env.executeAsync("Data Stream 
Collect");
-                       iterator.setJobClient(jobClient);
-               } catch (Exception e) {
-                       throw new RuntimeException("Failed to execute data 
stream", e);
+               final JobClient jobClient = env.executeAsync(jobExecutionName);
+               iterator.setJobClient(jobClient);
+
+               return new ClientAndIterator<>(jobClient, iterator);
+       }
+
+       /**
+        * Collects contents the given DataStream into a list, assuming that 
the stream is a bounded stream.
+        *
+        * <p>This method blocks until the job execution is complete. By the 
time the method returns, the
+        * job will have reached its FINISHED status.
+        *
+        * <p>Note that if the stream is unbounded, this method will never 
return and might fail with an
+        * Out-of-Memory Error because it attempts to collect an infinite 
stream into a list.
+        *
+        * @throws Exception Exceptions that occur during the execution are 
forwarded.
+        */
+       public static <E> List<E> collectBoundedStream(DataStream<E> stream, 
String jobName) throws Exception {
+               final ArrayList<E> list = new ArrayList<>();
+               final Iterator<E> iter = collectWithClient(stream, 
jobName).iterator;
+               while (iter.hasNext()) {
+                       list.add(iter.next());
                }
+               list.trimToSize();
+               return list;
+       }
+
+       /**
+        * Triggers execution of the DataStream application and collects the 
given number of records from the stream.
+        * After the records are received, the execution is canceled.
+        */
+       public static <E> List<E> collectUnboundedStream(DataStream<E> stream, 
int numElements, String jobName) throws Exception {
+               final ClientAndIterator<E> clientAndIterator = 
collectWithClient(stream, jobName);
+               final List<E> result = 
collectRecordsFromUnboundedStream(clientAndIterator, numElements);
 
-               return iterator;
+               // cancel the job not that we have received enough elements
+               clientAndIterator.client.cancel().get();
+
+               return result;
        }
 
+       public static <E> List<E> collectRecordsFromUnboundedStream(
+                       final ClientAndIterator<E> client,
+                       final int numElements) {
+
+               checkNotNull(client, "client");
+               checkArgument(numElements > 0, "numElement must be > 0");
+
+               final ArrayList<E> result = new ArrayList<>(numElements);
+               final Iterator<E> iterator = client.iterator;
+
+               while (iterator.hasNext()) {
+                       result.add(iterator.next());
+                       if (result.size() == numElements) {
+                               return result;
+                       }
+               }
+
+               throw new IllegalArgumentException(String.format(
+                               "The stream ended before reaching the requested 
%d records. Only %d records were received.",
+                               numElements, result.size()));
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Deriving a KeyedStream from a stream already partitioned by key
+       //  without a shuffle
+       // 
------------------------------------------------------------------------
+
        /**
         * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, 
which extracts keys with the given
         * {@link KeySelector}.
@@ -130,4 +225,21 @@ public final class DataStreamUtils {
         * Private constructor to prevent instantiation.
         */
        private DataStreamUtils() {}
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A pair of an {@link Iterator} to receive results from a streaming 
application and a
+        * {@link JobClient} to interact with the program.
+        */
+       public static final class ClientAndIterator<E> {
+
+               public final JobClient client;
+               public final Iterator<E> iterator;
+
+               ClientAndIterator(JobClient client, Iterator<E> iterator) {
+                       this.client = checkNotNull(client);
+                       this.iterator = checkNotNull(iterator);
+               }
+       }
 }

Reply via email to