[ 
https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15821899#comment-15821899
 ] 

Evgeny Kincharov edited comment on FLINK-3133 at 1/13/17 3:42 PM:
------------------------------------------------------------------

Hi [~mxm], I assign this issue to me again. I try to explain how I implemented 
this and some problems that I met with.
1. I planned to make a new sink that only save records passing through. It is 
not hard.
2. We need to have the possibility to execute the stream pipeline in the 
nonblocking mode. You propose add the method executeWithControl() instead the 
execute(). I know this is described in subtasks of FLINK-4272 but I implemented 
executeWithControl() in this issue only for debugging the sink from 1 (of 
course I delete it from here before PR)
3. We need to insert this sink into the pipeline. It is the hardest part of the 
implementation. If I add this sink after executeWithControl() - addSink doesn't 
change the pipeline due to some conversion transformations during 
executeWithControl.
My current state here: https://github.com/kenmy/flink/tree/FLINK-3133_temp
The commit is here: 
https://github.com/kenmy/flink/commit/136d51c79008126ed586f2b50a997b03c84b21b3
As a result, I can't see any simple possibility to add "sample" without 
changing pipeline before start executing, It may be a problem. Possible 
solutions (IMHO) :
1 Change the design to
{code:java}
StreamExecutionEnvironment env = 
StramEnvironment.getStreamExecutionEnvironment(); 
DataStream<Integer> printSink = env.addSource(..).print(); 
ResultQueryable queryObject = env.executeWithResultQueryable(); 
List<Integer> sampled = queryObject.retrieve(printSink, Time.seconds(5)); 
{code}
The main idea is to change the pipeline before the starting execution, not 
after.
{code:java}
StreamExecutionEnvironment env = 
StramEnvironment.getStreamExecutionEnvironment();
Sampler sampler = new Sampler();
DataStream<DataType> streamData = env.addSource(..).map(..).sample(sampler);
JobClient jobClient = env.executeWithControl();
Iterable<DataType> sampled = sampler(Time.seconds(5));
{code}
2. Don't use sink, use another mechanism to intercept DataStream (like 
extending DataStream by method getSampler that will return object which allows 
to enable/disable storing transferred data for any DataStream). IMHO "sink" 
approach looks more lightweight.
What solution do you prefer, I recommend first but I may not know all bussiness 
needs. Or may be you know better solution?
Thanks.


was (Author: kenmy):
Hi [~mxm], I assign this issue to me again. I try to explain how I implemented 
this and some problems that I met with.
1. I planned to make the new sink that only save records passing through. It is 
not hard.
2. We need to have the possibility to execute the stream pipeline in the 
nonblocking mode. You propose add the method executeWithControl() instead the 
execute(). I know this is described in subtasks of FLINK-4272 but I implemented 
executeWithControl() in this issue only for debugging the sink from 1 (of 
course I delete it from here before PR)
3. We need to insert this sink into the pipeline. It is the hardest part of the 
implementation. If I add this sink after executeWithControl() - addSink doesn't 
change the pipeline due to some conversion transformations during 
executeWithControl.
The my current state here: https://github.com/kenmy/flink/tree/FLINK-3133_temp
The commit is here: 
https://github.com/kenmy/flink/commit/136d51c79008126ed586f2b50a997b03c84b21b3
As a result, I can't see the simple possibility to add "sample" without 
changing pipeline before start executing, It may be a problem. Possible 
solutions (IMHO) :
1 Change the design to
{code:java}
StreamExecutionEnvironment env = 
StramEnvironment.getStreamExecutionEnvironment(); 
DataStream<Integer> printSink = env.addSource(..).print(); 
ResultQueryable queryObject = env.executeWithResultQueryable(); 
List<Integer> sampled = queryObject.retrieve(printSink, Time.seconds(5)); 
{code}
The main idea is to change the pipeline before the starting execution, not 
after.
{code:java}
StreamExecutionEnvironment env = 
StramEnvironment.getStreamExecutionEnvironment();
Sampler sampler = new Sampler();
DataStream<DataType> streamData = env.addSource(..).map(..).sample(sampler);
JobClient jobClient = env.executeWithControl();
Iterable<DataType> sampled = sampler(Time.seconds(5));
{code}
2. Don't use sink, use another mechanism to intercept DataStream (like 
extending DataStream by method getSampler that will return object which allows 
to enable/disable storing transferred data for any DataStream). IMHO "sink" 
approach looks more lightweight.
What solution do you prefer, I recommend first but I may not know all bussiness 
needs. Or may be you know better solution?
Thanks.

> Introduce collect()/count()/print() methods in DataStream API
> -------------------------------------------------------------
>
>                 Key: FLINK-3133
>                 URL: https://issues.apache.org/jira/browse/FLINK-3133
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API, Streaming
>    Affects Versions: 0.10.0, 1.0.0, 0.10.1
>            Reporter: Maximilian Michels
>            Assignee: Evgeny Kincharov
>             Fix For: 1.0.0
>
>
> The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should 
> be mirrored to the DataStream API. 
> The semantics of the calls are different. We need to be able to sample parts 
> of a stream, e.g. by supplying a time period in the arguments to the methods. 
> Users should use the {{JobClient}} to retrieve the results.
> {code:java}
> StreamExecutionEnvironment env = 
> StramEnvironment.getStreamExecutionEnvironment();
> DataStream<DataType> streamData = env.addSource(..).map(..);
> JobClient jobClient = env.executeWithControl();
> Iterable<DataType> sampled = jobClient.sampleStream(streamData, 
> Time.seconds(5));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to