[ 
https://issues.apache.org/jira/browse/FLINK-2495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661525#comment-14661525
 ] 

Fabian Hueske edited comment on FLINK-2495 at 8/7/15 9:00 AM:
--------------------------------------------------------------

[~Huangwei], you are right. I did't look close enough, sorry.

However, I don't think it is a good idea to silently ignore {{null}} inputs in 
a {{union}}. First, a {{null}} input indicates a serious problem with the user 
program that should be clearly indicated. Second, ignoring a {{null}} input is 
possible for a {{union}} but not for other operators such as {{comap}} or 
{{join}}. Hence ignoring the {{null}} input for {{union}} would lead to 
inconsistent behavior of the API, in my opinion.


was (Author: fhueske):
[~Huangwei], you are right. I did't look close enough, sorry.

However, I don't think it is a good idea to silently ignore {{null}} inputs in 
a {{union}}. First, a {{null}} input indicates a serious problem with the user 
program that should be clearly indicated. Second, ignoring a {{null}} input is 
possible for a {{union}} but not for a {{join}}, {{cogroup}}, {{cross}}, or 
iteration. Hence ignoring the {{null}} input for {{union}} would lead to 
inconsistent behavior of the API, in my opinion.

> Add a null point check in API DataStream.union
> ----------------------------------------------
>
>                 Key: FLINK-2495
>                 URL: https://issues.apache.org/jira/browse/FLINK-2495
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 0.10
>            Reporter: Huang Wei
>             Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> The API(public DataStream<OUT> union(DataStream<OUT>... streams)) is a  
> external interface for user.
> The parameter "streams" maybe null and it will throw NullPointerException 
> error.
> This test below can be intuitive to explain this problem:
> package org.apache.flink.streaming.api;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import 
> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
> import org.junit.Test;
> /**
>  * Created by HuangWHWHW on 2015/8/7.
>  */
> public class test {
>       public static class sourceFunction extends 
> RichParallelSourceFunction<String> {
>               public sourceFunction() {
>               }
>               @Override
>               public void run(SourceContext<String> sourceContext) throws 
> Exception {
>                       sourceContext.collect("a");
>               }
>               @Override
>               public void cancel() {
>               }
>       }
>       @Test
>       public void testUnion(){
>               StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               env.setParallelism(1);
>               DataStream<String> source = env.addSource(new sourceFunction());
>               DataStream<String> temp1 = null;
>               DataStream<String> temp2 = source.map(new MapFunction<String, 
> String>() {
>                       @Override
>                       public String map(String value) throws Exception {
>                               if (value == "a") {
>                                       return "This is for test temp2.";
>                               }
>                               return null;
>                       }
>               });
>               DataStream<String> sink = temp2.union(temp1);
>               sink.print();
>               try {
>                       env.execute();
>               }catch (Exception e){
>                       e.printStackTrace();
>               }
>       }
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to