[
https://issues.apache.org/jira/browse/FLINK-5776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885042#comment-15885042
]
ASF GitHub Bot commented on FLINK-5776:
---------------------------------------
Github user wuchong commented on a diff in the pull request:
https://github.com/apache/flink/pull/3418#discussion_r103126204
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/MapRunnerTest.scala
---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.common.functions.{MapFunction,
FlatMapFunction, RichFlatMapFunction,
+RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.table.runtime.MapRunnerTest.StringSink
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+
+import org.junit.Test
+import org.junit.Assert.{assertEquals, _}
+
+import java.util.Collections
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+class MapRunnerTest extends StreamingMultipleProgramsTestBase {
+
+ @Test
+ def testMapRunnerWithConstructorParameter(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val ds: DataStream[Long] = env.generateSequence(0, 5)
+
+ MapRunnerTest.clear
+
+ val body =
+ s"""
+ |return java.lang.Long.valueOf(in1) +
java.lang.Long.valueOf(cardinal);
+ """.stripMargin
+
+ val generator =
+ new CodeGenerator(
+ TableConfig.DEFAULT,
+ false,
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
+
+
generator.addReusableConstructorWithParameterNames(Array[String]("cardinal"),
classOf[Long])
+
+ val genFunction = generator.generateFunction(
+ "myMapFunction",
+ classOf[MapFunction[Long, Long]],
+ body,
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
+
+ val data = Array(java.lang.Long.valueOf(10))
+ val mapRunner =
+ new MapRunner[Long, Long](
+ genFunction.name,
+ genFunction.code,
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+ data,
+ Array[Class[_]](classOf[Long])
+ ).asInstanceOf[RichMapFunction[Long, Long]]
+
+ ds.map(mapRunner).addSink(new StringSink())
+
+ env.execute()
+
+ val expected = mutable.MutableList("10", "11", "12", "13", "14", "15")
+ assertEquals(expected.sorted, MapRunnerTest.testResults.sorted)
+ }
+
+ @Test
+ def testFlatMapRunnerWithConstructorParameter(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val ds: DataStream[Long] = env.generateSequence(0, 5)
+
+ MapRunnerTest.clear
+
+ val body =
+ s"""
+ | c.collect(java.lang.Long.valueOf(in1));
+ | c.collect(java.lang.Long.valueOf(cardinal));
+ """.stripMargin
+
+ val generator =
+ new CodeGenerator(
+ TableConfig.DEFAULT,
+ false,
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
+
+
generator.addReusableConstructorWithParameterNames(Array[String]("cardinal"),
classOf[Long])
+
+ val genFunction = generator.generateFunction(
+ "myFlatMapFunction",
+ classOf[FlatMapFunction[Long, Long]],
+ body,
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
+
+ val data = Array(java.lang.Long.valueOf(10))
+ val mapRunner =
+ new FlatMapRunner[Long, Long](
+ genFunction.name,
+ genFunction.code,
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+ data,
+ Array[Class[_]](classOf[Long])
+ ).asInstanceOf[RichFlatMapFunction[Long, Long]]
+
+ ds.flatMap(mapRunner).addSink(new StringSink())
+
+ env.execute()
+
+ val expected =
+ mutable.MutableList("0", "1", "2", "3", "4", "5", "10", "10", "10",
"10", "10", "10")
+ assertEquals(expected.sorted, MapRunnerTest.testResults.sorted)
+ }
+
+}
+
+object MapRunnerTest {
--- End diff --
Can we use `StreamITCase` instead of this? It seems that this is the same
as `StreamITCase`.
> Improve XXMapRunner support create instance by carrying constructor parameters
> ------------------------------------------------------------------------------
>
> Key: FLINK-5776
> URL: https://issues.apache.org/jira/browse/FLINK-5776
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: sunjincheng
> Assignee: sunjincheng
>
> At present, MapRunner FlatMapRunner only supports create non-parameter
> instance, but sometimes we need to carry constructor parameters to
> instantiate, so I would like to improve XXMapRunner support create instance
> by carrying constructor parameters.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)