[
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15138764#comment-15138764
]
ASF GitHub Bot commented on FLINK-3226:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1595#discussion_r52291201
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
---
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.flink.api.common.functions.{FlatMapFunction, Function,
MapFunction}
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenUtils._
+import org.apache.flink.api.table.codegen.Indenter._
+import org.apache.flink.api.table.codegen.OperatorCodeGen._
+import org.apache.flink.api.table.plan.TypeConverter.sqlTypeToTypeInfo
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+class CodeGenerator(
+ config: TableConfig,
+ input1: TypeInformation[Any],
+ input2: Option[TypeInformation[Any]] = None)
+ extends RexVisitor[GeneratedExpression] {
+
+ // set of member statements that will be added only once
+ // we use a LinkedHashSet to keep the insertion order
+ private val reusableMemberStatements = mutable.LinkedHashSet[String]()
+
+ // set of constructor statements that will be added only once
+ // we use a LinkedHashSet to keep the insertion order
+ private val reusableInitStatements = mutable.LinkedHashSet[String]()
+
+ // map of initial input unboxing expressions that will be added only once
+ // (inputTerm, index) -> expr
+ private val reusableInputUnboxingExprs = mutable.Map[(String, Int),
GeneratedExpression]()
+
+ def reuseMemberCode(): String = {
+ reusableMemberStatements.mkString("", "\n", "\n")
+ }
+
+ def reuseInitCode(): String = {
+ reusableInitStatements.mkString("", "\n", "\n")
+ }
+
+ def reuseInputUnboxingCode(): String = {
+ reusableInputUnboxingExprs.values.map(_.code).mkString("", "\n", "\n")
+ }
+
+ def input1Term = "in1"
+
+ def input2Term = "in2"
+
+ def collectorTerm = "c"
+
+ def outRecordTerm = "out"
+
+ def nullCheck: Boolean = config.getNullCheck
+
+ def generateExpression(rex: RexNode): GeneratedExpression = {
+ rex.accept(this)
+ }
+
+ def generateFunction[T <: Function](
+ name: String,
+ clazz: Class[T],
+ bodyCode: String,
+ returnType: TypeInformation[Any])
+ : GeneratedFunction[T] = {
+ val funcName = newName(name)
+
+ // Janino does not support generics, that's why we need
+ // manual casting here
+ val samHeader =
+ if (clazz == classOf[FlatMapFunction[_,_]]) {
+ val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
+ (s"void flatMap(Object _in1, org.apache.flink.util.Collector
$collectorTerm)",
+ s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
+ } else if (clazz == classOf[MapFunction[_,_]]) {
+ val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
+ ("Object map(Object _in1)",
+ s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
+ } else {
+ // TODO more functions
+ throw new CodeGenException("Unsupported Function.")
+ }
+
+ val funcCode = j"""
+ public class $funcName
+ implements ${clazz.getCanonicalName} {
+
+ ${reuseMemberCode()}
+
+ public $funcName() {
+ ${reuseInitCode()}
+ }
+
+ @Override
+ public ${samHeader._1} {
+ ${samHeader._2}
+ ${reuseInputUnboxingCode()}
+ $bodyCode
+ }
+ }
+ """.stripMargin
+
+ GeneratedFunction(funcName, returnType, funcCode)
+ }
+
+ def generateConverterResultExpression(
+ returnType: TypeInformation[_ <: Any])
+ : GeneratedExpression = {
+ val input1AccessExprs = for (i <- 0 until input1.getArity)
+ yield generateInputAccess(input1, input1Term, i)
+
+ val input2AccessExprs = input2 match {
+ case Some(ti) => for (i <- 0 until ti.getArity)
+ yield generateInputAccess(ti, input2Term, i)
+ case None => Seq() // add nothing
+ }
+
+ generateResultExpression(input1AccessExprs ++ input2AccessExprs,
returnType)
+ }
+
+ def generateResultExpression(
+ returnType: TypeInformation[_ <: Any],
+ rexNodes: Seq[RexNode])
+ : GeneratedExpression = {
+ val fieldExprs = rexNodes.map(generateExpression)
+ generateResultExpression(fieldExprs, returnType)
+ }
+
+ def generateResultExpression(
+ fieldExprs: Seq[GeneratedExpression],
+ returnType: TypeInformation[_ <: Any])
+ : GeneratedExpression = {
+ // initial type check
+ if (returnType.getArity != fieldExprs.length) {
+ throw new CodeGenException("Arity of result type does not match
number of expressions.")
+ }
+ // type check
+ returnType match {
+ case ct: CompositeType[_] =>
+ fieldExprs.zipWithIndex foreach {
+ case (fieldExpr, i) if fieldExpr.resultType != ct.getTypeAt(i) =>
+ throw new CodeGenException("Incompatible types of expression
and result type.")
+ case _ => // ok
+ }
+ case at: AtomicType[_] if at != fieldExprs.head.resultType =>
+ throw new CodeGenException("Incompatible types of expression and
result type.")
+ case _ => // ok
+ }
+
+ val returnTypeTerm = boxedTypeTermForTypeInfo(returnType)
+
+ // generate result expression
+ returnType match {
+ case ri: RowTypeInfo =>
+ addReusableOutRecord(ri)
+ val resultSetters: String = fieldExprs.zipWithIndex map {
+ case (fieldExpr, i) =>
+ if (nullCheck) {
+ s"""
+ |${fieldExpr.code}
+ |if (${fieldExpr.nullTerm}) {
+ | $outRecordTerm.setField($i, null);
+ |}
+ |else {
+ | $outRecordTerm.setField($i, ${fieldExpr.resultTerm});
+ |}
+ |""".stripMargin
+ }
+ else {
+ s"""
+ |${fieldExpr.code}
+ |$outRecordTerm.setField($i, ${fieldExpr.resultTerm});
+ |""".stripMargin
+ }
+ } mkString "\n"
+
+ GeneratedExpression(outRecordTerm, "false", resultSetters,
returnType)
+
+ case pj: PojoTypeInfo[_] =>
+ addReusableOutRecord(pj)
+ val resultSetters: String = fieldExprs.zip(pj.getFieldNames) map {
+ case (fieldExpr, fieldName) =>
+ if (nullCheck) {
+ s"""
+ |${fieldExpr.code}
+ |if (${fieldExpr.nullTerm}) {
+ | $outRecordTerm.$fieldName = null;
+ |}
+ |else {
+ | $outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
+ |}
+ |""".stripMargin
+ }
+ else {
+ s"""
+ |${fieldExpr.code}
+ |$outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
+ |""".stripMargin
+ }
+ } mkString "\n"
+
+ GeneratedExpression(outRecordTerm, "false", resultSetters,
returnType)
+
+ case tup: TupleTypeInfo[_] =>
+ addReusableOutRecord(tup)
+ val resultSetters: String = fieldExprs.zipWithIndex map {
+ case (fieldExpr, i) =>
+ val fieldName = "f" + i
+ if (nullCheck) {
+ s"""
+ |${fieldExpr.code}
+ |if (${fieldExpr.nullTerm}) {
+ | throw new NullPointerException("Null result cannot be
stored in a Tuple.");
+ |}
+ |else {
+ | $outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
+ |}
+ |""".stripMargin
+ }
+ else {
+ s"""
+ |${fieldExpr.code}
+ |$outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
+ |""".stripMargin
+ }
+ } mkString "\n"
+
+ GeneratedExpression(outRecordTerm, "false", resultSetters,
returnType)
+
+ case cc: CaseClassTypeInfo[_] =>
+ val fieldCodes: String = fieldExprs.map(_.code).mkString("\n")
+ val constructorParams: String =
fieldExprs.map(_.resultTerm).mkString(", ")
+ val resultTerm = newName(outRecordTerm)
+
+ val nullCheckCode = if (nullCheck) {
+ fieldExprs map { (fieldExpr) =>
+ s"""
+ |if (${fieldExpr.nullTerm}) {
+ | throw new NullPointerException("Null result cannot be
stored in a Case Class.");
+ |}
+ |""".stripMargin
+ } mkString "\n"
+ } else {
+ ""
+ }
+
+ val resultCode =
+ s"""
+ |$fieldCodes
+ |$nullCheckCode
+ |$returnTypeTerm $resultTerm = new
$returnTypeTerm($constructorParams);
+ |""".stripMargin
+
+ GeneratedExpression(resultTerm, "false", resultCode, returnType)
+
+ case a: AtomicType[_] =>
+ val fieldExpr = fieldExprs.head
+ val nullCheckCode = if (nullCheck) {
+ s"""
+ |if (${fieldExpr.nullTerm}) {
+ | throw new NullPointerException("Null result cannot be used
for atomic types.");
+ |}
+ |""".stripMargin
+ } else {
+ ""
+ }
+ val resultCode =
+ s"""
+ |${fieldExpr.code}
+ |$nullCheckCode
+ |""".stripMargin
+
+ GeneratedExpression(fieldExpr.resultTerm, "false", resultCode,
returnType)
+
+ case _ =>
+ throw new CodeGenException(s"Unsupported result type: $returnType")
+ }
+ }
+
+ //
----------------------------------------------------------------------------------------------
+
+ override def visitInputRef(inputRef: RexInputRef): GeneratedExpression =
{
+ // if inputRef index is within size of input1 we work with input1,
input2 otherwise
+ val input = if (inputRef.getIndex < input1.getArity) {
+ (input1, input1Term)
+ } else {
+ (input2.getOrElse(throw new CodeGenException("Invalid input
access.")), input2Term)
+ }
+
+ val index = if (input._1 == input1) {
+ inputRef.getIndex
+ } else {
+ inputRef.getIndex - input1.getArity
+ }
+
+ generateInputAccess(input._1, input._2, index)
+ }
+
+ override def visitFieldAccess(rexFieldAccess: RexFieldAccess):
GeneratedExpression = ???
+
+ override def visitLiteral(literal: RexLiteral): GeneratedExpression = {
+ val resultType = sqlTypeToTypeInfo(literal.getType.getSqlTypeName)
+ val value = literal.getValue3
+ literal.getType.getSqlTypeName match {
+ case BOOLEAN =>
+ generateNonNullLiteral(resultType, literal.getValue3.toString)
+ case TINYINT =>
+ val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
+ if (decimal.isValidByte) {
+ generateNonNullLiteral(resultType, decimal.byteValue().toString)
+ }
+ else {
+ throw new CodeGenException("Decimal can not be converted to
byte.")
+ }
+ case SMALLINT =>
+ val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
+ if (decimal.isValidShort) {
+ generateNonNullLiteral(resultType, decimal.shortValue().toString)
+ }
+ else {
+ throw new CodeGenException("Decimal can not be converted to
short.")
+ }
+ case INTEGER =>
+ val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
+ if (decimal.isValidShort) {
+ generateNonNullLiteral(resultType, decimal.intValue().toString)
+ }
+ else {
+ throw new CodeGenException("Decimal can not be converted to
integer.")
+ }
+ case BIGINT =>
+ val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
+ if (decimal.isValidLong) {
+ generateNonNullLiteral(resultType, decimal.longValue().toString)
+ }
+ else {
+ throw new CodeGenException("Decimal can not be converted to
long.")
+ }
+ case FLOAT =>
+ val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
+ if (decimal.isValidFloat) {
+ generateNonNullLiteral(resultType, decimal.floatValue().toString
+ "f")
+ }
+ else {
+ throw new CodeGenException("Decimal can not be converted to
float.")
+ }
+ case DOUBLE =>
+ val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
+ if (decimal.isValidDouble) {
+ generateNonNullLiteral(resultType,
decimal.doubleValue().toString)
+ }
+ else {
+ throw new CodeGenException("Decimal can not be converted to
double.")
+ }
+ case VARCHAR | CHAR =>
+ generateNonNullLiteral(resultType, value.toString)
--- End diff --
Do we need quotes around the string value?
> Translate optimized logical Table API plans into physical plans representing
> DataSet programs
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
> Issue Type: Sub-task
> Components: Table API
> Reporter: Fabian Hueske
> Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all
> relevant operator information (keys, user-code expression, strategy hints,
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink
> RelNodes. We start with a straight-forward mapping and later add rules that
> merge several relational operators into a single Flink operator, e.g., merge
> a join followed by a filter. Timo implemented some rules for the first SQL
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)