This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new 2a3b642 [FLINK-18539][datastream] Fix StreamExecutionEnvironment#addSource(SourceFunction, TypeInformation) doesn't use the user defined type information 2a3b642 is described below commit 2a3b642b1efb957f3d4f20502c40398786ab1469 Author: Jark Wu <j...@apache.org> AuthorDate: Tue Jul 14 09:58:18 2020 +0800 [FLINK-18539][datastream] Fix StreamExecutionEnvironment#addSource(SourceFunction, TypeInformation) doesn't use the user defined type information This closes #12878 --- .../environment/StreamExecutionEnvironment.java | 2 +- .../api/StreamExecutionEnvironmentTest.java | 33 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 70f2cc1..d7922fc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -1573,7 +1573,7 @@ public class StreamExecutionEnvironment { @SuppressWarnings("unchecked") public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) { - if (function instanceof ResultTypeQueryable) { + if (typeInfo == null && function instanceof ResultTypeQueryable) { typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType(); } if (typeInfo == null) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java index 65ca0ae..df48cdf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java @@ -20,6 +20,9 @@ package org.apache.flink.streaming.api; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -31,6 +34,7 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.apache.flink.util.SplittableIterator; @@ -253,6 +257,18 @@ public class StreamExecutionEnvironmentTest { } } + @Test + public void testAddSourceWithUserDefinedTypeInfo() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource<Row> source1 = env.addSource(new RowSourceFunction(), Types.ROW(Types.STRING)); + // the source type information should be the user defined type + assertEquals(Types.ROW(Types.STRING), source1.getType()); + + DataStreamSource<Row> source2 = env.addSource(new RowSourceFunction()); + // the source type information should be derived from RowSourceFunction#getProducedType + assertEquals(new GenericTypeInfo<>(Row.class), source2.getType()); + } + ///////////////////////////////////////////////////////////// // Utilities ///////////////////////////////////////////////////////////// @@ -315,4 +331,21 @@ public class StreamExecutionEnvironmentTest { super(num, string); } } + + private static class RowSourceFunction implements SourceFunction<Row>, ResultTypeQueryable<Row> { + private static final long serialVersionUID = 5216362688122691404L; + + @Override + public TypeInformation<Row> getProducedType() { + return TypeInformation.of(Row.class); + } + + @Override + public void run(SourceContext<Row> ctx) throws Exception { + } + + @Override + public void cancel() { + } + } }