[
https://issues.apache.org/jira/browse/FLINK-5031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082034#comment-16082034
]
Ziv Meri commented on FLINK-5031:
---------------------------------
To my opinion none of the above descriptions tells the actual problem. In my
case the result of split() keeps the correct tags but relates them the wrong
objects. I'll try to show that in detail later if it's important to you. Thanks
for proposed API :-)
> Consecutive DataStream.split() ignored
> --------------------------------------
>
> Key: FLINK-5031
> URL: https://issues.apache.org/jira/browse/FLINK-5031
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.2.0, 1.1.3
> Reporter: Fabian Hueske
> Assignee: Renkai Ge
>
> The output of the following program
> {code}
> static final class ThresholdSelector implements OutputSelector<Long> {
> long threshold;
> public ThresholdSelector(long threshold) {
> this.threshold = threshold;
> }
> @Override
> public Iterable<String> select(Long value) {
> if (value < threshold) {
> return Collections.singletonList("Less");
> } else {
> return Collections.singletonList("GreaterEqual");
> }
> }
> }
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> SplitStream<Long> split1 = env.generateSequence(1, 11)
> .split(new ThresholdSelector(6));
> // stream11 should be [1,2,3,4,5]
> DataStream<Long> stream11 = split1.select("Less");
> SplitStream<Long> split2 = stream11
> // .map(new MapFunction<Long, Long>() {
> // @Override
> // public Long map(Long value) throws Exception {
> // return value;
> // }
> // })
> .split(new ThresholdSelector(3));
> DataStream<Long> stream21 = split2.select("Less");
> // stream21 should be [1,2]
> stream21.print();
> env.execute();
> }
> {code}
> should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second
> {{split}} operation is ignored.
> The program is correctly evaluate if the identity {{MapFunction}} is added to
> the program.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)