[ 
https://issues.apache.org/jira/browse/FLINK-2495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661453#comment-14661453
 ] 

ASF GitHub Bot commented on FLINK-2495:
---------------------------------------

GitHub user HuangWHWHW opened a pull request:

    https://github.com/apache/flink/pull/999

    [FLINK-2495][fix]Add a null point check in API DataStream.union

    The API(public DataStream<OUT> union(DataStream<OUT>... streams)) is a  
external interface for user.
    The parameter "streams" maybe null and it will throw NullPointerException 
error.
    
    This test below can be intuitive to explain this problem:
    
    package org.apache.flink.streaming.api;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    import org.junit.Test;
    
    /**
     * Created by HuangWHWHW on 2015/8/7.
     */
    public class test {
    
        public static class sourceFunction extends 
RichParallelSourceFunction<String> {
    
                public sourceFunction() {
                }
    
                @Override
                public void run(SourceContext<String> sourceContext) throws 
Exception {
                        sourceContext.collect("a");
                }
    
                @Override
                public void cancel() {
    
                }
        }
    
        @Test
        public void testUnion(){
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);
                DataStream<String> source = env.addSource(new sourceFunction());
                DataStream<String> temp1 = null;
                DataStream<String> temp2 = source.map(new MapFunction<String, 
String>() {
                        @Override
                        public String map(String value) throws Exception {
                                if (value == "a") {
                                        return "This is for test temp2.";
                                }
                                return null;
                        }
                });
                DataStream<String> sink = temp2.union(temp1);
                sink.print();
                try {
                        env.execute();
                }catch (Exception e){
                        e.printStackTrace();
                }
        }
    
    }


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HuangWHWHW/flink FLINK-2495

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/999.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #999
    
----
commit 89b4aa6c94e4e00ec382746e41ae893d83b55d86
Author: HuangWHWHW <[email protected]>
Date:   2015-08-07T07:38:21Z

    [FLINK-2495][fix]Add a null point check in API DataStream.union

----


> Add a null point check in API DataStream.union
> ----------------------------------------------
>
>                 Key: FLINK-2495
>                 URL: https://issues.apache.org/jira/browse/FLINK-2495
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 0.10
>            Reporter: Huang Wei
>             Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> The API(public DataStream<OUT> union(DataStream<OUT>... streams)) is a  
> external interface for user.
> The parameter "streams" maybe null and it will throw NullPointerException 
> error.
> This test below can be intuitive to explain this problem:
> package org.apache.flink.streaming.api;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import 
> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
> import org.junit.Test;
> /**
>  * Created by HuangWHWHW on 2015/8/7.
>  */
> public class test {
>       public static class sourceFunction extends 
> RichParallelSourceFunction<String> {
>               public sourceFunction() {
>               }
>               @Override
>               public void run(SourceContext<String> sourceContext) throws 
> Exception {
>                       sourceContext.collect("a");
>               }
>               @Override
>               public void cancel() {
>               }
>       }
>       @Test
>       public void testUnion(){
>               StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               env.setParallelism(1);
>               DataStream<String> source = env.addSource(new sourceFunction());
>               DataStream<String> temp1 = null;
>               DataStream<String> temp2 = source.map(new MapFunction<String, 
> String>() {
>                       @Override
>                       public String map(String value) throws Exception {
>                               if (value == "a") {
>                                       return "This is for test temp2.";
>                               }
>                               return null;
>                       }
>               });
>               DataStream<String> sink = temp2.union(temp1);
>               sink.print();
>               try {
>                       env.execute();
>               }catch (Exception e){
>                       e.printStackTrace();
>               }
>       }
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to