Gábor Hermann commented on FLINK-3133:

What's the status of this? We already have collect() implemented 
[FLINK-1670|https://issues.apache.org/jira/browse/FLINK-1670], but it starts 
the execution, so we can only use it for one DataStream. For testing, it would 
be great to have the output of multiple DataStreams, like the design [~kenmy] 

One another design I can imagine:
StreamExecutionEnvironment env = 
DataStream<Integer> printSink = env.addSource(..).print(); 
DataStream<String> otherSink = env.addSource(..).map(..).filter(..).print();
Future<List<Integer>> printSinkResults = printSink.collect()
Future<List<String>> otherSinkResults = otherSink.collect()
Where the Futures would complete when the execution is finished. Although, this 
would require the users to know how to use Futures.

Do you know of any other effort for making testing easier (apart from external 
libraries such as https://github.com/ottogroup/flink-spector)?

I see that [~kenmy] has already put effort into this, but I'd also be happy 
take up this issue if nobody's willing to work on it.

> Introduce collect()/count()/print() methods in DataStream API
> -------------------------------------------------------------
>                 Key: FLINK-3133
>                 URL: https://issues.apache.org/jira/browse/FLINK-3133
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>    Affects Versions: 0.10.0, 1.0.0, 0.10.1
>            Reporter: Maximilian Michels
>            Assignee: Evgeny Kincharov
>             Fix For: 1.0.0
> The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should 
> be mirrored to the DataStream API. 
> The semantics of the calls are different. We need to be able to sample parts 
> of a stream, e.g. by supplying a time period in the arguments to the methods. 
> Users should use the {{JobClient}} to retrieve the results.
> {code:java}
> StreamExecutionEnvironment env = 
> StramEnvironment.getStreamExecutionEnvironment();
> DataStream<DataType> streamData = env.addSource(..).map(..);
> JobClient jobClient = env.executeWithControl();
> Iterable<DataType> sampled = jobClient.sampleStream(streamData, 
> Time.seconds(5));
> {code}

This message was sent by Atlassian JIRA

Reply via email to