[ 
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15138876#comment-15138876
 ] 

ASF GitHub Bot commented on FLINK-3226:
---------------------------------------

Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1595#discussion_r52301458
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
    @@ -0,0 +1,661 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.codegen
    +
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable._
    +import org.apache.flink.api.common.functions.{FlatMapFunction, Function, 
MapFunction}
    +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
    +import org.apache.flink.api.common.typeutils.CompositeType
    +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
    +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
    +import org.apache.flink.api.table.TableConfig
    +import org.apache.flink.api.table.codegen.CodeGenUtils._
    +import org.apache.flink.api.table.codegen.Indenter._
    +import org.apache.flink.api.table.codegen.OperatorCodeGen._
    +import org.apache.flink.api.table.plan.TypeConverter.sqlTypeToTypeInfo
    +import org.apache.flink.api.table.typeinfo.RowTypeInfo
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable
    +
    +class CodeGenerator(
    +    config: TableConfig,
    +    input1: TypeInformation[Any],
    +    input2: Option[TypeInformation[Any]] = None)
    +  extends RexVisitor[GeneratedExpression] {
    +
    +  // set of member statements that will be added only once
    +  // we use a LinkedHashSet to keep the insertion order
    +  private val reusableMemberStatements = mutable.LinkedHashSet[String]()
    +
    +  // set of constructor statements that will be added only once
    +  // we use a LinkedHashSet to keep the insertion order
    +  private val reusableInitStatements = mutable.LinkedHashSet[String]()
    +
    +  // map of initial input unboxing expressions that will be added only once
    +  // (inputTerm, index) -> expr
    +  private val reusableInputUnboxingExprs = mutable.Map[(String, Int), 
GeneratedExpression]()
    +
    +  def reuseMemberCode(): String = {
    +    reusableMemberStatements.mkString("", "\n", "\n")
    +  }
    +
    +  def reuseInitCode(): String = {
    +    reusableInitStatements.mkString("", "\n", "\n")
    +  }
    +
    +  def reuseInputUnboxingCode(): String = {
    +    reusableInputUnboxingExprs.values.map(_.code).mkString("", "\n", "\n")
    +  }
    +
    +  def input1Term = "in1"
    +
    +  def input2Term = "in2"
    +
    +  def collectorTerm = "c"
    +
    +  def outRecordTerm = "out"
    +
    +  def nullCheck: Boolean = config.getNullCheck
    +
    +  def generateExpression(rex: RexNode): GeneratedExpression = {
    +    rex.accept(this)
    +  }
    +
    +  def generateFunction[T <: Function](
    +      name: String,
    +      clazz: Class[T],
    +      bodyCode: String,
    +      returnType: TypeInformation[Any])
    +    : GeneratedFunction[T] = {
    +    val funcName = newName(name)
    +
    +    // Janino does not support generics, that's why we need
    +    // manual casting here
    +    val samHeader =
    +      if (clazz == classOf[FlatMapFunction[_,_]]) {
    +        val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
    +        (s"void flatMap(Object _in1, org.apache.flink.util.Collector 
$collectorTerm)",
    +          s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
    +      } else if (clazz == classOf[MapFunction[_,_]]) {
    +        val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
    +        ("Object map(Object _in1)",
    +          s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
    +      } else {
    +        // TODO more functions
    +        throw new CodeGenException("Unsupported Function.")
    +      }
    +
    +    val funcCode = j"""
    +      public class $funcName
    +          implements ${clazz.getCanonicalName} {
    +
    +        ${reuseMemberCode()}
    +
    +        public $funcName() {
    +          ${reuseInitCode()}
    +        }
    +
    +        @Override
    +        public ${samHeader._1} {
    +          ${samHeader._2}
    +          ${reuseInputUnboxingCode()}
    +          $bodyCode
    +        }
    +      }
    +    """.stripMargin
    +
    +    GeneratedFunction(funcName, returnType, funcCode)
    +  }
    +
    +  def generateConverterResultExpression(
    +      returnType: TypeInformation[_ <: Any])
    +    : GeneratedExpression = {
    +    val input1AccessExprs = for (i <- 0 until input1.getArity)
    +      yield generateInputAccess(input1, input1Term, i)
    +
    +    val input2AccessExprs = input2 match {
    +      case Some(ti) => for (i <- 0 until ti.getArity)
    +        yield generateInputAccess(ti, input2Term, i)
    +      case None => Seq() // add nothing
    +    }
    +
    +    generateResultExpression(input1AccessExprs ++ input2AccessExprs, 
returnType)
    +  }
    +
    +  def generateResultExpression(
    +      returnType: TypeInformation[_ <: Any],
    +      rexNodes: Seq[RexNode])
    +    : GeneratedExpression = {
    +    val fieldExprs = rexNodes.map(generateExpression)
    +    generateResultExpression(fieldExprs, returnType)
    +  }
    +
    +  def generateResultExpression(
    +      fieldExprs: Seq[GeneratedExpression],
    +      returnType: TypeInformation[_ <: Any])
    +    : GeneratedExpression = {
    +    // initial type check
    +    if (returnType.getArity != fieldExprs.length) {
    +      throw new CodeGenException("Arity of result type does not match 
number of expressions.")
    +    }
    +    // type check
    +    returnType match {
    +      case ct: CompositeType[_] =>
    +        fieldExprs.zipWithIndex foreach {
    +          case (fieldExpr, i) if fieldExpr.resultType != ct.getTypeAt(i) =>
    +            throw new CodeGenException("Incompatible types of expression 
and result type.")
    +          case _ => // ok
    +        }
    +      case at: AtomicType[_] if at != fieldExprs.head.resultType =>
    +        throw new CodeGenException("Incompatible types of expression and 
result type.")
    +      case _ => // ok
    +    }
    +
    +    val returnTypeTerm = boxedTypeTermForTypeInfo(returnType)
    +
    +    // generate result expression
    +    returnType match {
    +      case ri: RowTypeInfo =>
    +        addReusableOutRecord(ri)
    +        val resultSetters: String = fieldExprs.zipWithIndex map {
    +          case (fieldExpr, i) =>
    +            if (nullCheck) {
    +              s"""
    +              |${fieldExpr.code}
    +              |if (${fieldExpr.nullTerm}) {
    +              |  $outRecordTerm.setField($i, null);
    +              |}
    +              |else {
    +              |  $outRecordTerm.setField($i, ${fieldExpr.resultTerm});
    +              |}
    +              |""".stripMargin
    +            }
    +            else {
    +              s"""
    +              |${fieldExpr.code}
    +              |$outRecordTerm.setField($i, ${fieldExpr.resultTerm});
    +              |""".stripMargin
    +            }
    +        } mkString "\n"
    +
    +        GeneratedExpression(outRecordTerm, "false", resultSetters, 
returnType)
    +
    +      case pj: PojoTypeInfo[_] =>
    +        addReusableOutRecord(pj)
    +        val resultSetters: String = fieldExprs.zip(pj.getFieldNames) map {
    +        case (fieldExpr, fieldName) =>
    +          if (nullCheck) {
    +            s"""
    +            |${fieldExpr.code}
    +            |if (${fieldExpr.nullTerm}) {
    +            |  $outRecordTerm.$fieldName = null;
    +            |}
    +            |else {
    +            |  $outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
    +            |}
    +            |""".stripMargin
    +          }
    +          else {
    +            s"""
    +            |${fieldExpr.code}
    +            |$outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
    +            |""".stripMargin
    +          }
    +        } mkString "\n"
    +
    +        GeneratedExpression(outRecordTerm, "false", resultSetters, 
returnType)
    +
    +      case tup: TupleTypeInfo[_] =>
    +        addReusableOutRecord(tup)
    +        val resultSetters: String = fieldExprs.zipWithIndex map {
    +          case (fieldExpr, i) =>
    +            val fieldName = "f" + i
    +            if (nullCheck) {
    +              s"""
    +                |${fieldExpr.code}
    +                |if (${fieldExpr.nullTerm}) {
    +                |  throw new NullPointerException("Null result cannot be 
stored in a Tuple.");
    +                |}
    +                |else {
    +                |  $outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
    +                |}
    +                |""".stripMargin
    +            }
    +            else {
    +              s"""
    +                |${fieldExpr.code}
    +                |$outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
    +                |""".stripMargin
    +            }
    +        } mkString "\n"
    +
    +        GeneratedExpression(outRecordTerm, "false", resultSetters, 
returnType)
    +
    +      case cc: CaseClassTypeInfo[_] =>
    +        val fieldCodes: String = fieldExprs.map(_.code).mkString("\n")
    +        val constructorParams: String = 
fieldExprs.map(_.resultTerm).mkString(", ")
    +        val resultTerm = newName(outRecordTerm)
    +
    +        val nullCheckCode = if (nullCheck) {
    +        fieldExprs map { (fieldExpr) =>
    +          s"""
    +              |if (${fieldExpr.nullTerm}) {
    +              |  throw new NullPointerException("Null result cannot be 
stored in a Case Class.");
    +              |}
    +              |""".stripMargin
    +          } mkString "\n"
    +        } else {
    +          ""
    +        }
    +
    +        val resultCode =
    +          s"""
    +            |$fieldCodes
    +            |$nullCheckCode
    +            |$returnTypeTerm $resultTerm = new 
$returnTypeTerm($constructorParams);
    +            |""".stripMargin
    +
    +        GeneratedExpression(resultTerm, "false", resultCode, returnType)
    +
    +      case a: AtomicType[_] =>
    +        val fieldExpr = fieldExprs.head
    +        val nullCheckCode = if (nullCheck) {
    +          s"""
    +          |if (${fieldExpr.nullTerm}) {
    +          |  throw new NullPointerException("Null result cannot be used 
for atomic types.");
    +          |}
    +          |""".stripMargin
    +        } else {
    +          ""
    +        }
    +        val resultCode =
    +          s"""
    +            |${fieldExpr.code}
    +            |$nullCheckCode
    +            |""".stripMargin
    +
    +        GeneratedExpression(fieldExpr.resultTerm, "false", resultCode, 
returnType)
    +
    +      case _ =>
    +        throw new CodeGenException(s"Unsupported result type: $returnType")
    +    }
    +  }
    +
    +  // 
----------------------------------------------------------------------------------------------
    +
    +  override def visitInputRef(inputRef: RexInputRef): GeneratedExpression = 
{
    +    // if inputRef index is within size of input1 we work with input1, 
input2 otherwise
    +    val input = if (inputRef.getIndex < input1.getArity) {
    +      (input1, input1Term)
    +    } else {
    +      (input2.getOrElse(throw new CodeGenException("Invalid input 
access.")), input2Term)
    +    }
    +
    +    val index = if (input._1 == input1) {
    +      inputRef.getIndex
    +    } else {
    +      inputRef.getIndex - input1.getArity
    +    }
    +
    +    generateInputAccess(input._1, input._2, index)
    +  }
    +
    +  override def visitFieldAccess(rexFieldAccess: RexFieldAccess): 
GeneratedExpression = ???
    +
    +  override def visitLiteral(literal: RexLiteral): GeneratedExpression = {
    +    val resultType = sqlTypeToTypeInfo(literal.getType.getSqlTypeName)
    +    val value = literal.getValue3
    +    literal.getType.getSqlTypeName match {
    +      case BOOLEAN =>
    +        generateNonNullLiteral(resultType, literal.getValue3.toString)
    +      case TINYINT =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidByte) {
    +          generateNonNullLiteral(resultType, decimal.byteValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to 
byte.")
    +        }
    +      case SMALLINT =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidShort) {
    +          generateNonNullLiteral(resultType, decimal.shortValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to 
short.")
    +        }
    +      case INTEGER =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidShort) {
    +          generateNonNullLiteral(resultType, decimal.intValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to 
integer.")
    +        }
    +      case BIGINT =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidLong) {
    +          generateNonNullLiteral(resultType, decimal.longValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to 
long.")
    +        }
    +      case FLOAT =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidFloat) {
    +          generateNonNullLiteral(resultType, decimal.floatValue().toString 
+ "f")
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to 
float.")
    +        }
    +      case DOUBLE =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidDouble) {
    +          generateNonNullLiteral(resultType, 
decimal.doubleValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to 
double.")
    +        }
    +      case VARCHAR | CHAR =>
    +        generateNonNullLiteral(resultType, value.toString)
    --- End diff --
    
    Yes, this is a bug. Thanks.


> Translate optimized logical Table API plans into physical plans representing 
> DataSet programs
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-3226
>                 URL: https://issues.apache.org/jira/browse/FLINK-3226
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API
>            Reporter: Fabian Hueske
>            Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see 
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1 
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream 
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one 
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all 
> relevant operator information (keys, user-code expression, strategy hints, 
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink 
> RelNodes. We start with a straight-forward mapping and later add rules that 
> merge several relational operators into a single Flink operator, e.g., merge 
> a join followed by a filter. Timo implemented some rules for the first SQL 
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to