Huang Wei created FLINK-2495: -------------------------------- Summary: Add a null point check in API DataStream.union Key: FLINK-2495 URL: https://issues.apache.org/jira/browse/FLINK-2495 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.10 Reporter: Huang Wei Fix For: 0.10
The API(public DataStream<OUT> union(DataStream<OUT>... streams)) is a external interface for user. The parameter "streams" maybe null and it will throw NullPointerException error. This test below can be intuitive to explain this problem: package org.apache.flink.streaming.api; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.junit.Test; /** * Created by HuangWHWHW on 2015/8/7. */ public class test { public static class sourceFunction extends RichParallelSourceFunction<String> { public sourceFunction() { } @Override public void run(SourceContext<String> sourceContext) throws Exception { sourceContext.collect("a"); } @Override public void cancel() { } } @Test public void testUnion(){ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<String> source = env.addSource(new sourceFunction()); DataStream<String> temp1 = null; DataStream<String> temp2 = source.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { if (value == "a") { return "This is for test temp2."; } return null; } }); DataStream<String> sink = temp2.union(temp1); sink.print(); try { env.execute(); }catch (Exception e){ e.printStackTrace(); } } } -- This message was sent by Atlassian JIRA (v6.3.4#6332)