[ 
https://issues.apache.org/jira/browse/FLINK-5776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885042#comment-15885042
 ] 

ASF GitHub Bot commented on FLINK-5776:
---------------------------------------

Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3418#discussion_r103126204
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/MapRunnerTest.scala
 ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.common.functions.{MapFunction, 
FlatMapFunction, RichFlatMapFunction,
    +RichMapFunction}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.table.api.TableConfig
    +import org.apache.flink.table.api.scala.stream.utils.StreamITCase
    +import org.apache.flink.table.runtime.MapRunnerTest.StringSink
    +import org.apache.flink.table.codegen.CodeGenerator
    +import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
    +
    +import org.junit.Test
    +import org.junit.Assert.{assertEquals, _}
    +
    +import java.util.Collections
    +
    +import scala.collection.mutable
    +import scala.collection.JavaConverters._
    +
    +class MapRunnerTest extends StreamingMultipleProgramsTestBase {
    +
    +  @Test
    +  def testMapRunnerWithConstructorParameter(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val ds: DataStream[Long] = env.generateSequence(0, 5)
    +
    +    MapRunnerTest.clear
    +
    +    val body =
    +      s"""
    +         |return java.lang.Long.valueOf(in1) + 
java.lang.Long.valueOf(cardinal);
    +         """.stripMargin
    +
    +    val generator =
    +      new CodeGenerator(
    +        TableConfig.DEFAULT,
    +        false,
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
    +
    +    
generator.addReusableConstructorWithParameterNames(Array[String]("cardinal"), 
classOf[Long])
    +
    +    val genFunction = generator.generateFunction(
    +      "myMapFunction",
    +      classOf[MapFunction[Long, Long]],
    +      body,
    +      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
    +
    +    val data = Array(java.lang.Long.valueOf(10))
    +    val mapRunner =
    +      new MapRunner[Long, Long](
    +        genFunction.name,
    +        genFunction.code,
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
    +        data,
    +        Array[Class[_]](classOf[Long])
    +      ).asInstanceOf[RichMapFunction[Long, Long]]
    +
    +    ds.map(mapRunner).addSink(new StringSink())
    +
    +    env.execute()
    +
    +    val expected = mutable.MutableList("10", "11", "12", "13", "14", "15")
    +    assertEquals(expected.sorted, MapRunnerTest.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testFlatMapRunnerWithConstructorParameter(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val ds: DataStream[Long] = env.generateSequence(0, 5)
    +
    +    MapRunnerTest.clear
    +
    +    val body =
    +      s"""
    +         | c.collect(java.lang.Long.valueOf(in1));
    +         | c.collect(java.lang.Long.valueOf(cardinal));
    +         """.stripMargin
    +
    +    val generator =
    +      new CodeGenerator(
    +        TableConfig.DEFAULT,
    +        false,
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
    +
    +    
generator.addReusableConstructorWithParameterNames(Array[String]("cardinal"), 
classOf[Long])
    +
    +    val genFunction = generator.generateFunction(
    +      "myFlatMapFunction",
    +      classOf[FlatMapFunction[Long, Long]],
    +      body,
    +      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
    +
    +    val data = Array(java.lang.Long.valueOf(10))
    +    val mapRunner =
    +      new FlatMapRunner[Long, Long](
    +        genFunction.name,
    +        genFunction.code,
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
    +        data,
    +        Array[Class[_]](classOf[Long])
    +      ).asInstanceOf[RichFlatMapFunction[Long, Long]]
    +
    +    ds.flatMap(mapRunner).addSink(new StringSink())
    +
    +    env.execute()
    +
    +    val expected =
    +      mutable.MutableList("0", "1", "2", "3", "4", "5", "10", "10", "10", 
"10", "10", "10")
    +    assertEquals(expected.sorted, MapRunnerTest.testResults.sorted)
    +  }
    +
    +}
    +
    +object MapRunnerTest {
    --- End diff --
    
    Can we use `StreamITCase` instead of this? It seems that this is the same 
as `StreamITCase`.


> Improve XXMapRunner support create instance by carrying constructor parameters
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-5776
>                 URL: https://issues.apache.org/jira/browse/FLINK-5776
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>
> At present, MapRunner FlatMapRunner only supports create non-parameter 
> instance, but sometimes we need to carry constructor parameters to 
> instantiate, so I would like to improve XXMapRunner support create instance 
> by carrying constructor parameters.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to