This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 2a3b642  [FLINK-18539][datastream] Fix 
StreamExecutionEnvironment#addSource(SourceFunction, TypeInformation) doesn't 
use the user defined type information
2a3b642 is described below

commit 2a3b642b1efb957f3d4f20502c40398786ab1469
Author: Jark Wu <j...@apache.org>
AuthorDate: Tue Jul 14 09:58:18 2020 +0800

    [FLINK-18539][datastream] Fix 
StreamExecutionEnvironment#addSource(SourceFunction, TypeInformation) doesn't 
use the user defined type information
    
    This closes #12878
---
 .../environment/StreamExecutionEnvironment.java    |  2 +-
 .../api/StreamExecutionEnvironmentTest.java        | 33 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 70f2cc1..d7922fc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1573,7 +1573,7 @@ public class StreamExecutionEnvironment {
        @SuppressWarnings("unchecked")
        public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> 
function, String sourceName, TypeInformation<OUT> typeInfo) {
 
-               if (function instanceof ResultTypeQueryable) {
+               if (typeInfo == null && function instanceof 
ResultTypeQueryable) {
                        typeInfo = ((ResultTypeQueryable<OUT>) 
function).getProducedType();
                }
                if (typeInfo == null) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index 65ca0ae..df48cdf 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.api;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -31,6 +34,7 @@ import 
org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.SplittableIterator;
 
@@ -253,6 +257,18 @@ public class StreamExecutionEnvironmentTest {
                }
        }
 
+       @Test
+       public void testAddSourceWithUserDefinedTypeInfo() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               DataStreamSource<Row> source1 = env.addSource(new 
RowSourceFunction(), Types.ROW(Types.STRING));
+               // the source type information should be the user defined type
+               assertEquals(Types.ROW(Types.STRING), source1.getType());
+
+               DataStreamSource<Row> source2 = env.addSource(new 
RowSourceFunction());
+               // the source type information should be derived from 
RowSourceFunction#getProducedType
+               assertEquals(new GenericTypeInfo<>(Row.class), 
source2.getType());
+       }
+
        /////////////////////////////////////////////////////////////
        // Utilities
        /////////////////////////////////////////////////////////////
@@ -315,4 +331,21 @@ public class StreamExecutionEnvironmentTest {
                        super(num, string);
                }
        }
+
+       private static class RowSourceFunction implements SourceFunction<Row>, 
ResultTypeQueryable<Row> {
+               private static final long serialVersionUID = 
5216362688122691404L;
+
+               @Override
+               public TypeInformation<Row> getProducedType() {
+                       return TypeInformation.of(Row.class);
+               }
+
+               @Override
+               public void run(SourceContext<Row> ctx) throws Exception {
+               }
+
+               @Override
+               public void cancel() {
+               }
+       }
 }

Reply via email to