[
https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15821899#comment-15821899
]
Evgeny Kincharov commented on FLINK-3133:
-----------------------------------------
Hi [~mxm], I assign this issue to me again. I try to explain how I implemented
this and some problems that I met with.
1. I planned to make the new sink that only save records passing through. It is
not hard.
2. We need to have the possibility to execute the stream pipeline in the
nonblocking mode. You propose add the method executeWithControl() instead the
execute(). I know this is described in subtasks of FLINK-4272 but I implemented
executeWithControl() in this issue only for debugging the sink from 1 (of
course I delete it from here before PR)
3. We need to insert this sink into the pipeline. It is the hardest part of the
implementation. If I add this sink after executeWithControl() - addSink doesn't
change the pipeline due to some conversion transformations during
executeWithControl.
The my current state here: https://github.com/kenmy/flink/tree/FLINK-3133_temp
The commit is here:
https://github.com/kenmy/flink/commit/136d51c79008126ed586f2b50a997b03c84b21b3
As a result, I can't see the simple possibility to add "sample" without
changing pipeline before start executing, It may be a problem. Possible
solutions (IMHO) :
1 Change the design to
{code:java}
StreamExecutionEnvironment env =
StramEnvironment.getStreamExecutionEnvironment();
DataStream<Integer> printSink = env.addSource(..).print();
ResultQueryable queryObject = env.executeWithResultQueryable();
List<Integer> sampled = queryObject.retrieve(printSink, Time.seconds(5));
{code}
The main idea is to change the pipeline before the starting execution, not
after.
{code:java}
StreamExecutionEnvironment env =
StramEnvironment.getStreamExecutionEnvironment();
Sampler sampler = new Sampler();
DataStream<DataType> streamData = env.addSource(..).map(..).sample(sampler);
JobClient jobClient = env.executeWithControl();
Iterable<DataType> sampled = sampler(Time.seconds(5));
{code}
2. Don't use sink, use another mechanism to intercept DataStream (like
extending DataStream by method getSampler that will return object which allows
to enable/disable storing transferred data for any DataStream). IMHO "sink"
approach looks more lightweight.
What solution do you prefer, I recommend [1] but I may not know all bussiness
needs. Or may be you know better solution?
Thanks.
> Introduce collect()/count()/print() methods in DataStream API
> -------------------------------------------------------------
>
> Key: FLINK-3133
> URL: https://issues.apache.org/jira/browse/FLINK-3133
> Project: Flink
> Issue Type: Improvement
> Components: DataStream API, Streaming
> Affects Versions: 0.10.0, 1.0.0, 0.10.1
> Reporter: Maximilian Michels
> Assignee: Evgeny Kincharov
> Fix For: 1.0.0
>
>
> The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should
> be mirrored to the DataStream API.
> The semantics of the calls are different. We need to be able to sample parts
> of a stream, e.g. by supplying a time period in the arguments to the methods.
> Users should use the {{JobClient}} to retrieve the results.
> {code:java}
> StreamExecutionEnvironment env =
> StramEnvironment.getStreamExecutionEnvironment();
> DataStream<DataType> streamData = env.addSource(..).map(..);
> JobClient jobClient = env.executeWithControl();
> Iterable<DataType> sampled = jobClient.sampleStream(streamData,
> Time.seconds(5));
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)