This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 38495e7  [FLINK-18539][datastream] Fix 
StreamExecutionEnvironment#addSource(SourceFunction, TypeInformation) doesn't 
use the user defined type information
38495e7 is described below

commit 38495e7378f6df5eead9a29448e3944f2b3ecbea
Author: Jark Wu <j...@apache.org>
AuthorDate: Tue Jul 14 09:57:36 2020 +0800

    [FLINK-18539][datastream] Fix 
StreamExecutionEnvironment#addSource(SourceFunction, TypeInformation) doesn't 
use the user defined type information
    
    This closes #12877
---
 .../environment/StreamExecutionEnvironment.java    |  2 +-
 .../api/StreamExecutionEnvironmentTest.java        | 33 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 59837ac..d4ddcbd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -2170,7 +2170,7 @@ public class StreamExecutionEnvironment {
                        Class<?> baseSourceClass,
                        TypeInformation<OUT> typeInfo) {
                TypeInformation<OUT> resolvedTypeInfo = typeInfo;
-               if (source instanceof ResultTypeQueryable) {
+               if (resolvedTypeInfo == null && source instanceof 
ResultTypeQueryable) {
                        resolvedTypeInfo = ((ResultTypeQueryable<OUT>) 
source).getProducedType();
                }
                if (resolvedTypeInfo == null) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index 65ca0ae..df48cdf 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.api;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -31,6 +34,7 @@ import 
org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.SplittableIterator;
 
@@ -253,6 +257,18 @@ public class StreamExecutionEnvironmentTest {
                }
        }
 
+       @Test
+       public void testAddSourceWithUserDefinedTypeInfo() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               DataStreamSource<Row> source1 = env.addSource(new 
RowSourceFunction(), Types.ROW(Types.STRING));
+               // the source type information should be the user defined type
+               assertEquals(Types.ROW(Types.STRING), source1.getType());
+
+               DataStreamSource<Row> source2 = env.addSource(new 
RowSourceFunction());
+               // the source type information should be derived from 
RowSourceFunction#getProducedType
+               assertEquals(new GenericTypeInfo<>(Row.class), 
source2.getType());
+       }
+
        /////////////////////////////////////////////////////////////
        // Utilities
        /////////////////////////////////////////////////////////////
@@ -315,4 +331,21 @@ public class StreamExecutionEnvironmentTest {
                        super(num, string);
                }
        }
+
+       private static class RowSourceFunction implements SourceFunction<Row>, 
ResultTypeQueryable<Row> {
+               private static final long serialVersionUID = 
5216362688122691404L;
+
+               @Override
+               public TypeInformation<Row> getProducedType() {
+                       return TypeInformation.of(Row.class);
+               }
+
+               @Override
+               public void run(SourceContext<Row> ctx) throws Exception {
+               }
+
+               @Override
+               public void cancel() {
+               }
+       }
 }

Reply via email to