[FLINK-8649] [scala api] Pass on TypeInfo in 
StreamExecutionEnvironment.createInput

This closes #5478.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e43f818
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e43f818
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e43f818

Branch: refs/heads/master
Commit: 7e43f81866d3df8e839c9e01e71d6edca06bc8cd
Parents: 129e215
Author: Gabor Gevay <[email protected]>
Authored: Tue Feb 13 18:09:05 2018 +0100
Committer: zentol <[email protected]>
Committed: Wed Mar 21 21:01:52 2018 +0100

----------------------------------------------------------------------
 .../streaming/api/scala/StreamExecutionEnvironment.scala      | 7 ++++++-
 .../org/apache/flink/streaming/api/scala/DataStreamTest.scala | 7 +++++++
 2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7e43f818/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index cd96dbf..9410a95 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.{Internal, Public, 
PublicEvolving}
 import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, 
InputFormat}
 import 
org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.configuration.Configuration
@@ -594,7 +595,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    */
   @PublicEvolving
   def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): 
DataStream[T] =
-    asScalaStream(javaEnv.createInput(inputFormat))
+    if (inputFormat.isInstanceOf[ResultTypeQueryable[_]]) {
+      asScalaStream(javaEnv.createInput(inputFormat))
+    } else {
+      asScalaStream(javaEnv.createInput(inputFormat, 
implicitly[TypeInformation[T]]))
+    }
 
   /**
    * Create a DataStream using a user defined source function for arbitrary

http://git-wip-us.apache.org/repos/asf/flink/blob/7e43f818/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 51ec5e3..9e1c493 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.scala
 import java.lang
 
 import org.apache.flink.api.common.functions._
+import org.apache.flink.api.java.io.ParallelIteratorInputFormat
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, 
ProcessFunction}
@@ -673,6 +674,12 @@ class DataStreamTest extends AbstractTestBase {
     assert(sg.getIterationSourceSinkPairs.size() == 2)
   }
 
+  @Test
+  def testCreateInputPassesOnTypeInfo(): Unit = {
+    
StreamExecutionEnvironment.getExecutionEnvironment.createInput[Tuple1[Integer]](
+      new ParallelIteratorInputFormat[Tuple1[Integer]](null))
+  }
+
   /////////////////////////////////////////////////////////////
   // Utilities
   /////////////////////////////////////////////////////////////

Reply via email to