[FLINK-8649] [scala api] Pass on TypeInfo in StreamExecutionEnvironment.createInput
This closes #5478. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e43f818 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e43f818 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e43f818 Branch: refs/heads/master Commit: 7e43f81866d3df8e839c9e01e71d6edca06bc8cd Parents: 129e215 Author: Gabor Gevay <[email protected]> Authored: Tue Feb 13 18:09:05 2018 +0100 Committer: zentol <[email protected]> Committed: Wed Mar 21 21:01:52 2018 +0100 ---------------------------------------------------------------------- .../streaming/api/scala/StreamExecutionEnvironment.scala | 7 ++++++- .../org/apache/flink/streaming/api/scala/DataStreamTest.scala | 7 +++++++ 2 files changed, 13 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7e43f818/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index cd96dbf..9410a95 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -23,6 +23,7 @@ import org.apache.flink.annotation.{Internal, Public, PublicEvolving} import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat} import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.scala.ClosureCleaner import org.apache.flink.configuration.Configuration @@ -594,7 +595,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { */ @PublicEvolving def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T] = - asScalaStream(javaEnv.createInput(inputFormat)) + if (inputFormat.isInstanceOf[ResultTypeQueryable[_]]) { + asScalaStream(javaEnv.createInput(inputFormat)) + } else { + asScalaStream(javaEnv.createInput(inputFormat, implicitly[TypeInformation[T]])) + } /** * Create a DataStream using a user defined source function for arbitrary http://git-wip-us.apache.org/repos/asf/flink/blob/7e43f818/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index 51ec5e3..9e1c493 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.scala import java.lang import org.apache.flink.api.common.functions._ +import org.apache.flink.api.java.io.ParallelIteratorInputFormat import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction} @@ -673,6 +674,12 @@ class DataStreamTest extends AbstractTestBase { assert(sg.getIterationSourceSinkPairs.size() == 2) } + @Test + def testCreateInputPassesOnTypeInfo(): Unit = { + StreamExecutionEnvironment.getExecutionEnvironment.createInput[Tuple1[Integer]]( + new ParallelIteratorInputFormat[Tuple1[Integer]](null)) + } + ///////////////////////////////////////////////////////////// // Utilities /////////////////////////////////////////////////////////////
