[ 
https://issues.apache.org/jira/browse/FLINK-9141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16426937#comment-16426937
 ] 

Julio Biason commented on FLINK-9141:
-------------------------------------

Just a note: We got a problem with split/side outputs too, but instead of NPE, 
we got an error message:

 
{code:java}
java.lang.UnsupportedOperationException: Cannot use split/select with side 
outputs.                                                                        
                                                                                
  
{code}
... and the "map" trick solved this too.

 

> Calling getSideOutput() and split() on one DataStream causes NPE
> ----------------------------------------------------------------
>
>                 Key: FLINK-9141
>                 URL: https://issues.apache.org/jira/browse/FLINK-9141
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.4.0
>            Reporter: Chesnay Schepler
>            Priority: Critical
>
> Calling both {{getSideOutput()}} and {{split()}} on one DataStream causes a 
> {{NullPointerException}} to be thrown at runtime.
> As a work-around one can add a no-op map function before the split() call.
> Exception:
> {code}
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:79)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:326)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:128)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:274)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> Reproducer:
> {code}
> private static final OutputTag<String> tag = new OutputTag<String>("tag") {};
> public static void main(String[] args) throws Exception {
>       StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>       DataStream<String> dataStream1 = env.fromElements("foo");
>       SingleOutputStreamOperator<String> processedStream = dataStream1
>               .process(new ProcessFunction<String, String>() {
>                       @Override
>                       public void processElement(String value, Context ctx, 
> Collector<String> out) {
>                       }
>               });
>       processedStream.getSideOutput(tag)
>               .print();
>       processedStream
>               .map(record -> record)
>               .split(Collections::singletonList)
>               .select("bar")
>               .print();
>       env.execute();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to