[ 
https://issues.apache.org/jira/browse/FLINK-5031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16100055#comment-16100055
 ] 

Ray Ruvinskiy commented on FLINK-5031:
--------------------------------------

To flesh out the example a little more, I want to create a side output per tag 
value, but I don't know the full list of tag values ahead of time. 
Additionally, I may ultimately only want to consume a relatively small subset 
of the split streams. And, in fact, I'm not interested in the main data stream 
at all. To make things more concrete, suppose I'm consuming a {{Tuple2<String, 
String>}}, where the first element is the tag, and the second is a value. Can I 
then do something as follows?

{code:java}
SingleOutputStreamOperator<Integer> mainDataStream = input
     .process(new ProcessFunction<Tuple2<String, String>, /* what should go 
here? I don't really care about the main output. */>() {
      @Override
      public void processElement(
          Tuple2<String, String> value,
          Context ctx,
          Collector</* ... */> out) throws Exception {
        // don't care about the regular output; what should I do?
        // out.collect(value);

        ctx.output(new OutputTag<String>(value.f0){}, value.f1);
      }
    });
...
mainDataStream.getSideOutput(new OutputTag<String>("tag1"){}).union(
    mainDataStream.getSideOutput(new OutputTag<String>("tag4"){})).union(
    mainDataStream.getSideOutput(new OutputTag<String>("tag7"){}));
{code}

> Consecutive DataStream.split() ignored
> --------------------------------------
>
>                 Key: FLINK-5031
>                 URL: https://issues.apache.org/jira/browse/FLINK-5031
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.0, 1.1.3
>            Reporter: Fabian Hueske
>            Assignee: Renkai Ge
>
> The output of the following program 
> {code}
> static final class ThresholdSelector implements OutputSelector<Long> {
>       long threshold;
>       public ThresholdSelector(long threshold) {
>               this.threshold = threshold;
>       }
>       @Override
>       public Iterable<String> select(Long value) {
>               if (value < threshold) {
>                       return Collections.singletonList("Less");
>               } else {
>                       return Collections.singletonList("GreaterEqual");
>               }
>       }
> }
> public static void main(String[] args) throws Exception {
>       StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>       env.setParallelism(1);
>       SplitStream<Long> split1 = env.generateSequence(1, 11)
>               .split(new ThresholdSelector(6));
>       // stream11 should be [1,2,3,4,5]
>       DataStream<Long> stream11 = split1.select("Less");
>       SplitStream<Long> split2 = stream11
> //            .map(new MapFunction<Long, Long>() {
> //                    @Override
> //                    public Long map(Long value) throws Exception {
> //                            return value;
> //                    }
> //            })
>               .split(new ThresholdSelector(3));
>       DataStream<Long> stream21 = split2.select("Less");
>       // stream21 should be [1,2]
>       stream21.print();
>       env.execute();
> }
> {code}
> should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second 
> {{split}} operation is ignored.
> The program is correctly evaluate if the identity {{MapFunction}} is added to 
> the program.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to