Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1595#discussion_r52301458
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
    @@ -0,0 +1,661 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.codegen
    +
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable._
    +import org.apache.flink.api.common.functions.{FlatMapFunction, Function, 
MapFunction}
    +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
    +import org.apache.flink.api.common.typeutils.CompositeType
    +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
    +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
    +import org.apache.flink.api.table.TableConfig
    +import org.apache.flink.api.table.codegen.CodeGenUtils._
    +import org.apache.flink.api.table.codegen.Indenter._
    +import org.apache.flink.api.table.codegen.OperatorCodeGen._
    +import org.apache.flink.api.table.plan.TypeConverter.sqlTypeToTypeInfo
    +import org.apache.flink.api.table.typeinfo.RowTypeInfo
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable
    +
    +class CodeGenerator(
    +    config: TableConfig,
    +    input1: TypeInformation[Any],
    +    input2: Option[TypeInformation[Any]] = None)
    +  extends RexVisitor[GeneratedExpression] {
    +
    +  // set of member statements that will be added only once
    +  // we use a LinkedHashSet to keep the insertion order
    +  private val reusableMemberStatements = mutable.LinkedHashSet[String]()
    +
    +  // set of constructor statements that will be added only once
    +  // we use a LinkedHashSet to keep the insertion order
    +  private val reusableInitStatements = mutable.LinkedHashSet[String]()
    +
    +  // map of initial input unboxing expressions that will be added only once
    +  // (inputTerm, index) -> expr
    +  private val reusableInputUnboxingExprs = mutable.Map[(String, Int), 
GeneratedExpression]()
    +
    +  def reuseMemberCode(): String = {
    +    reusableMemberStatements.mkString("", "\n", "\n")
    +  }
    +
    +  def reuseInitCode(): String = {
    +    reusableInitStatements.mkString("", "\n", "\n")
    +  }
    +
    +  def reuseInputUnboxingCode(): String = {
    +    reusableInputUnboxingExprs.values.map(_.code).mkString("", "\n", "\n")
    +  }
    +
    +  def input1Term = "in1"
    +
    +  def input2Term = "in2"
    +
    +  def collectorTerm = "c"
    +
    +  def outRecordTerm = "out"
    +
    +  def nullCheck: Boolean = config.getNullCheck
    +
    +  def generateExpression(rex: RexNode): GeneratedExpression = {
    +    rex.accept(this)
    +  }
    +
    +  def generateFunction[T <: Function](
    +      name: String,
    +      clazz: Class[T],
    +      bodyCode: String,
    +      returnType: TypeInformation[Any])
    +    : GeneratedFunction[T] = {
    +    val funcName = newName(name)
    +
    +    // Janino does not support generics, that's why we need
    +    // manual casting here
    +    val samHeader =
    +      if (clazz == classOf[FlatMapFunction[_,_]]) {
    +        val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
    +        (s"void flatMap(Object _in1, org.apache.flink.util.Collector 
$collectorTerm)",
    +          s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
    +      } else if (clazz == classOf[MapFunction[_,_]]) {
    +        val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
    +        ("Object map(Object _in1)",
    +          s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
    +      } else {
    +        // TODO more functions
    +        throw new CodeGenException("Unsupported Function.")
    +      }
    +
    +    val funcCode = j"""
    +      public class $funcName
    +          implements ${clazz.getCanonicalName} {
    +
    +        ${reuseMemberCode()}
    +
    +        public $funcName() {
    +          ${reuseInitCode()}
    +        }
    +
    +        @Override
    +        public ${samHeader._1} {
    +          ${samHeader._2}
    +          ${reuseInputUnboxingCode()}
    +          $bodyCode
    +        }
    +      }
    +    """.stripMargin
    +
    +    GeneratedFunction(funcName, returnType, funcCode)
    +  }
    +
    +  def generateConverterResultExpression(
    +      returnType: TypeInformation[_ <: Any])
    +    : GeneratedExpression = {
    +    val input1AccessExprs = for (i <- 0 until input1.getArity)
    +      yield generateInputAccess(input1, input1Term, i)
    +
    +    val input2AccessExprs = input2 match {
    +      case Some(ti) => for (i <- 0 until ti.getArity)
    +        yield generateInputAccess(ti, input2Term, i)
    +      case None => Seq() // add nothing
    +    }
    +
    +    generateResultExpression(input1AccessExprs ++ input2AccessExprs, 
returnType)
    +  }
    +
    +  def generateResultExpression(
    +      returnType: TypeInformation[_ <: Any],
    +      rexNodes: Seq[RexNode])
    +    : GeneratedExpression = {
    +    val fieldExprs = rexNodes.map(generateExpression)
    +    generateResultExpression(fieldExprs, returnType)
    +  }
    +
    +  def generateResultExpression(
    +      fieldExprs: Seq[GeneratedExpression],
    +      returnType: TypeInformation[_ <: Any])
    +    : GeneratedExpression = {
    +    // initial type check
    +    if (returnType.getArity != fieldExprs.length) {
    +      throw new CodeGenException("Arity of result type does not match 
number of expressions.")
    +    }
    +    // type check
    +    returnType match {
    +      case ct: CompositeType[_] =>
    +        fieldExprs.zipWithIndex foreach {
    +          case (fieldExpr, i) if fieldExpr.resultType != ct.getTypeAt(i) =>
    +            throw new CodeGenException("Incompatible types of expression 
and result type.")
    +          case _ => // ok
    +        }
    +      case at: AtomicType[_] if at != fieldExprs.head.resultType =>
    +        throw new CodeGenException("Incompatible types of expression and 
result type.")
    +      case _ => // ok
    +    }
    +
    +    val returnTypeTerm = boxedTypeTermForTypeInfo(returnType)
    +
    +    // generate result expression
    +    returnType match {
    +      case ri: RowTypeInfo =>
    +        addReusableOutRecord(ri)
    +        val resultSetters: String = fieldExprs.zipWithIndex map {
    +          case (fieldExpr, i) =>
    +            if (nullCheck) {
    +              s"""
    +              |${fieldExpr.code}
    +              |if (${fieldExpr.nullTerm}) {
    +              |  $outRecordTerm.setField($i, null);
    +              |}
    +              |else {
    +              |  $outRecordTerm.setField($i, ${fieldExpr.resultTerm});
    +              |}
    +              |""".stripMargin
    +            }
    +            else {
    +              s"""
    +              |${fieldExpr.code}
    +              |$outRecordTerm.setField($i, ${fieldExpr.resultTerm});
    +              |""".stripMargin
    +            }
    +        } mkString "\n"
    +
    +        GeneratedExpression(outRecordTerm, "false", resultSetters, 
returnType)
    +
    +      case pj: PojoTypeInfo[_] =>
    +        addReusableOutRecord(pj)
    +        val resultSetters: String = fieldExprs.zip(pj.getFieldNames) map {
    +        case (fieldExpr, fieldName) =>
    +          if (nullCheck) {
    +            s"""
    +            |${fieldExpr.code}
    +            |if (${fieldExpr.nullTerm}) {
    +            |  $outRecordTerm.$fieldName = null;
    +            |}
    +            |else {
    +            |  $outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
    +            |}
    +            |""".stripMargin
    +          }
    +          else {
    +            s"""
    +            |${fieldExpr.code}
    +            |$outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
    +            |""".stripMargin
    +          }
    +        } mkString "\n"
    +
    +        GeneratedExpression(outRecordTerm, "false", resultSetters, 
returnType)
    +
    +      case tup: TupleTypeInfo[_] =>
    +        addReusableOutRecord(tup)
    +        val resultSetters: String = fieldExprs.zipWithIndex map {
    +          case (fieldExpr, i) =>
    +            val fieldName = "f" + i
    +            if (nullCheck) {
    +              s"""
    +                |${fieldExpr.code}
    +                |if (${fieldExpr.nullTerm}) {
    +                |  throw new NullPointerException("Null result cannot be 
stored in a Tuple.");
    +                |}
    +                |else {
    +                |  $outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
    +                |}
    +                |""".stripMargin
    +            }
    +            else {
    +              s"""
    +                |${fieldExpr.code}
    +                |$outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
    +                |""".stripMargin
    +            }
    +        } mkString "\n"
    +
    +        GeneratedExpression(outRecordTerm, "false", resultSetters, 
returnType)
    +
    +      case cc: CaseClassTypeInfo[_] =>
    +        val fieldCodes: String = fieldExprs.map(_.code).mkString("\n")
    +        val constructorParams: String = 
fieldExprs.map(_.resultTerm).mkString(", ")
    +        val resultTerm = newName(outRecordTerm)
    +
    +        val nullCheckCode = if (nullCheck) {
    +        fieldExprs map { (fieldExpr) =>
    +          s"""
    +              |if (${fieldExpr.nullTerm}) {
    +              |  throw new NullPointerException("Null result cannot be 
stored in a Case Class.");
    +              |}
    +              |""".stripMargin
    +          } mkString "\n"
    +        } else {
    +          ""
    +        }
    +
    +        val resultCode =
    +          s"""
    +            |$fieldCodes
    +            |$nullCheckCode
    +            |$returnTypeTerm $resultTerm = new 
$returnTypeTerm($constructorParams);
    +            |""".stripMargin
    +
    +        GeneratedExpression(resultTerm, "false", resultCode, returnType)
    +
    +      case a: AtomicType[_] =>
    +        val fieldExpr = fieldExprs.head
    +        val nullCheckCode = if (nullCheck) {
    +          s"""
    +          |if (${fieldExpr.nullTerm}) {
    +          |  throw new NullPointerException("Null result cannot be used 
for atomic types.");
    +          |}
    +          |""".stripMargin
    +        } else {
    +          ""
    +        }
    +        val resultCode =
    +          s"""
    +            |${fieldExpr.code}
    +            |$nullCheckCode
    +            |""".stripMargin
    +
    +        GeneratedExpression(fieldExpr.resultTerm, "false", resultCode, 
returnType)
    +
    +      case _ =>
    +        throw new CodeGenException(s"Unsupported result type: $returnType")
    +    }
    +  }
    +
    +  // 
----------------------------------------------------------------------------------------------
    +
    +  override def visitInputRef(inputRef: RexInputRef): GeneratedExpression = 
{
    +    // if inputRef index is within size of input1 we work with input1, 
input2 otherwise
    +    val input = if (inputRef.getIndex < input1.getArity) {
    +      (input1, input1Term)
    +    } else {
    +      (input2.getOrElse(throw new CodeGenException("Invalid input 
access.")), input2Term)
    +    }
    +
    +    val index = if (input._1 == input1) {
    +      inputRef.getIndex
    +    } else {
    +      inputRef.getIndex - input1.getArity
    +    }
    +
    +    generateInputAccess(input._1, input._2, index)
    +  }
    +
    +  override def visitFieldAccess(rexFieldAccess: RexFieldAccess): 
GeneratedExpression = ???
    +
    +  override def visitLiteral(literal: RexLiteral): GeneratedExpression = {
    +    val resultType = sqlTypeToTypeInfo(literal.getType.getSqlTypeName)
    +    val value = literal.getValue3
    +    literal.getType.getSqlTypeName match {
    +      case BOOLEAN =>
    +        generateNonNullLiteral(resultType, literal.getValue3.toString)
    +      case TINYINT =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidByte) {
    +          generateNonNullLiteral(resultType, decimal.byteValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to 
byte.")
    +        }
    +      case SMALLINT =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidShort) {
    +          generateNonNullLiteral(resultType, decimal.shortValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to 
short.")
    +        }
    +      case INTEGER =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidShort) {
    +          generateNonNullLiteral(resultType, decimal.intValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to 
integer.")
    +        }
    +      case BIGINT =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidLong) {
    +          generateNonNullLiteral(resultType, decimal.longValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to 
long.")
    +        }
    +      case FLOAT =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidFloat) {
    +          generateNonNullLiteral(resultType, decimal.floatValue().toString 
+ "f")
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to 
float.")
    +        }
    +      case DOUBLE =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidDouble) {
    +          generateNonNullLiteral(resultType, 
decimal.doubleValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to 
double.")
    +        }
    +      case VARCHAR | CHAR =>
    +        generateNonNullLiteral(resultType, value.toString)
    --- End diff --
    
    Yes, this is a bug. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to