[
https://issues.apache.org/jira/browse/FLINK-2495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661519#comment-14661519
]
Chesnay Schepler commented on FLINK-2495:
-----------------------------------------
Yes you can get a NullPointerException if you pass if you do temp2.union(null).
I don't see a problem with that though, it accurately describes the problem.
If we want to prevent NullPointerExceptions to occur(which in cases like this
is imo preventing something for the sake of preventing something), I'd propose
to do a full sweep of the Environment/DataSet classes, as i already found a
second function that could have the same problem. I'm not keen of having 25
different issues/PR's for when someone finds an issue like this.
> Add a null point check in API DataStream.union
> ----------------------------------------------
>
> Key: FLINK-2495
> URL: https://issues.apache.org/jira/browse/FLINK-2495
> Project: Flink
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 0.10
> Reporter: Huang Wei
> Fix For: 0.10
>
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> The API(public DataStream<OUT> union(DataStream<OUT>... streams)) is a
> external interface for user.
> The parameter "streams" maybe null and it will throw NullPointerException
> error.
> This test below can be intuitive to explain this problem:
> package org.apache.flink.streaming.api;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
> import org.junit.Test;
> /**
> * Created by HuangWHWHW on 2015/8/7.
> */
> public class test {
> public static class sourceFunction extends
> RichParallelSourceFunction<String> {
> public sourceFunction() {
> }
> @Override
> public void run(SourceContext<String> sourceContext) throws
> Exception {
> sourceContext.collect("a");
> }
> @Override
> public void cancel() {
> }
> }
> @Test
> public void testUnion(){
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataStream<String> source = env.addSource(new sourceFunction());
> DataStream<String> temp1 = null;
> DataStream<String> temp2 = source.map(new MapFunction<String,
> String>() {
> @Override
> public String map(String value) throws Exception {
> if (value == "a") {
> return "This is for test temp2.";
> }
> return null;
> }
> });
> DataStream<String> sink = temp2.union(temp1);
> sink.print();
> try {
> env.execute();
> }catch (Exception e){
> e.printStackTrace();
> }
> }
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)