aljoscha commented on a change in pull request #13752:
URL: https://github.com/apache/flink/pull/13752#discussion_r511910670



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
##########
@@ -1294,6 +1303,92 @@ public ExecutionConfig getExecutionConfig() {
                return sink;
        }
 
+       /**
+        * Triggers the distributed execution of the streaming dataflow and 
returns an iterator over the elements
+        * of the given DataStream.
+        *
+        * <p>The DataStream application is executed in the regular distributed 
manner on the target environment,
+        * and the events from the stream are polled back to this application 
process and thread through
+        * Flink's REST API.
+        *
+        *<p><b>IMPORTANT</b> The returned iterator must be closed to free all 
cluster resources.
+        */
+       public CloseableIterator<T> executeAndCollect() throws Exception {
+               return executeAndCollect("DataStream Collect");
+       }
+
+       /**
+        * Triggers the distributed execution of the streaming dataflow and 
returns an iterator over the elements
+        * of the given DataStream.
+        *
+        * <p>The DataStream application is executed in the regular distributed 
manner on the target environment,
+        * and the events from the stream are polled back to this application 
process and thread through
+        * Flink's REST API.
+        *
+        *<p><b>IMPORTANT</b> The returned iterator must be closed to free all 
cluster resources.
+        */
+       public CloseableIterator<T> executeAndCollect(String jobExecutionName) 
throws Exception {
+               return executeAndCollectWithClient(jobExecutionName).iterator;
+       }
+
+       /**
+        * Triggers the distributed execution of the streaming dataflow and 
returns an iterator over the elements
+        * of the given DataStream.
+        *
+        * <p>The DataStream application is executed in the regular distributed 
manner on the target environment,
+        * and the events from the stream are polled back to this application 
process and thread through
+        * Flink's REST API.
+        */
+       public List<T> executeAndCollect(int limit) throws Exception {
+               return executeAndCollect("DataStream Collect", limit);
+       }
+
+       /**
+        * Triggers the distributed execution of the streaming dataflow and 
returns an iterator over the elements
+        * of the given DataStream.
+        *
+        * <p>The DataStream application is executed in the regular distributed 
manner on the target environment,
+        * and the events from the stream are polled back to this application 
process and thread through
+        * Flink's REST API.
+        */
+       public List<T> executeAndCollect(String jobExecutionName, int limit) 
throws Exception {
+               Preconditions.checkState(limit > 0, "Limit must be greater than 
0");
+
+               ClientAndIterator<T> clientAndIterator = 
executeAndCollectWithClient(jobExecutionName);
+
+               try {
+                       List<T> results = new ArrayList<>(limit);
+                       while (clientAndIterator.iterator.hasNext() && limit > 
0) {
+                               results.add(clientAndIterator.iterator.next());
+                               limit--;
+                       }
+
+                       return results;
+               } finally {
+                       clientAndIterator.iterator.close();

Review comment:
       Makes me wonder why `ClientAndIterator` isn't `AutoCloseable` and does 
these two.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
##########
@@ -1294,6 +1303,92 @@ public ExecutionConfig getExecutionConfig() {
                return sink;
        }
 
+       /**
+        * Triggers the distributed execution of the streaming dataflow and 
returns an iterator over the elements
+        * of the given DataStream.
+        *
+        * <p>The DataStream application is executed in the regular distributed 
manner on the target environment,
+        * and the events from the stream are polled back to this application 
process and thread through
+        * Flink's REST API.
+        *
+        *<p><b>IMPORTANT</b> The returned iterator must be closed to free all 
cluster resources.
+        */
+       public CloseableIterator<T> executeAndCollect() throws Exception {
+               return executeAndCollect("DataStream Collect");
+       }
+
+       /**
+        * Triggers the distributed execution of the streaming dataflow and 
returns an iterator over the elements
+        * of the given DataStream.
+        *
+        * <p>The DataStream application is executed in the regular distributed 
manner on the target environment,
+        * and the events from the stream are polled back to this application 
process and thread through
+        * Flink's REST API.
+        *
+        *<p><b>IMPORTANT</b> The returned iterator must be closed to free all 
cluster resources.
+        */
+       public CloseableIterator<T> executeAndCollect(String jobExecutionName) 
throws Exception {
+               return executeAndCollectWithClient(jobExecutionName).iterator;
+       }
+
+       /**
+        * Triggers the distributed execution of the streaming dataflow and 
returns an iterator over the elements
+        * of the given DataStream.
+        *
+        * <p>The DataStream application is executed in the regular distributed 
manner on the target environment,
+        * and the events from the stream are polled back to this application 
process and thread through
+        * Flink's REST API.
+        */
+       public List<T> executeAndCollect(int limit) throws Exception {
+               return executeAndCollect("DataStream Collect", limit);
+       }
+
+       /**
+        * Triggers the distributed execution of the streaming dataflow and 
returns an iterator over the elements
+        * of the given DataStream.
+        *
+        * <p>The DataStream application is executed in the regular distributed 
manner on the target environment,
+        * and the events from the stream are polled back to this application 
process and thread through
+        * Flink's REST API.
+        */
+       public List<T> executeAndCollect(String jobExecutionName, int limit) 
throws Exception {
+               Preconditions.checkState(limit > 0, "Limit must be greater than 
0");
+
+               ClientAndIterator<T> clientAndIterator = 
executeAndCollectWithClient(jobExecutionName);
+
+               try {
+                       List<T> results = new ArrayList<>(limit);
+                       while (clientAndIterator.iterator.hasNext() && limit > 
0) {
+                               results.add(clientAndIterator.iterator.next());
+                               limit--;
+                       }
+
+                       return results;
+               } finally {
+                       clientAndIterator.iterator.close();
+                       clientAndIterator.client.cancel();
+               }
+       }
+
+       private ClientAndIterator<T> executeAndCollectWithClient(String 
jobExecutionName) throws Exception {

Review comment:
       You're duplicating the code here because we want to remove 
`DataStreamUtils` in the future?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to