This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new 38495e7 [FLINK-18539][datastream] Fix StreamExecutionEnvironment#addSource(SourceFunction, TypeInformation) doesn't use the user defined type information 38495e7 is described below commit 38495e7378f6df5eead9a29448e3944f2b3ecbea Author: Jark Wu <j...@apache.org> AuthorDate: Tue Jul 14 09:57:36 2020 +0800 [FLINK-18539][datastream] Fix StreamExecutionEnvironment#addSource(SourceFunction, TypeInformation) doesn't use the user defined type information This closes #12877 --- .../environment/StreamExecutionEnvironment.java | 2 +- .../api/StreamExecutionEnvironmentTest.java | 33 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 59837ac..d4ddcbd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -2170,7 +2170,7 @@ public class StreamExecutionEnvironment { Class<?> baseSourceClass, TypeInformation<OUT> typeInfo) { TypeInformation<OUT> resolvedTypeInfo = typeInfo; - if (source instanceof ResultTypeQueryable) { + if (resolvedTypeInfo == null && source instanceof ResultTypeQueryable) { resolvedTypeInfo = ((ResultTypeQueryable<OUT>) source).getProducedType(); } if (resolvedTypeInfo == null) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java index 65ca0ae..df48cdf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java @@ -20,6 +20,9 @@ package org.apache.flink.streaming.api; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -31,6 +34,7 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.apache.flink.util.SplittableIterator; @@ -253,6 +257,18 @@ public class StreamExecutionEnvironmentTest { } } + @Test + public void testAddSourceWithUserDefinedTypeInfo() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource<Row> source1 = env.addSource(new RowSourceFunction(), Types.ROW(Types.STRING)); + // the source type information should be the user defined type + assertEquals(Types.ROW(Types.STRING), source1.getType()); + + DataStreamSource<Row> source2 = env.addSource(new RowSourceFunction()); + // the source type information should be derived from RowSourceFunction#getProducedType + assertEquals(new GenericTypeInfo<>(Row.class), source2.getType()); + } + ///////////////////////////////////////////////////////////// // Utilities ///////////////////////////////////////////////////////////// @@ -315,4 +331,21 @@ public class StreamExecutionEnvironmentTest { super(num, string); } } + + private static class RowSourceFunction implements SourceFunction<Row>, ResultTypeQueryable<Row> { + private static final long serialVersionUID = 5216362688122691404L; + + @Override + public TypeInformation<Row> getProducedType() { + return TypeInformation.of(Row.class); + } + + @Override + public void run(SourceContext<Row> ctx) throws Exception { + } + + @Override + public void cancel() { + } + } }