[
https://issues.apache.org/jira/browse/FLINK-7959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265083#comment-16265083
]
ASF GitHub Bot commented on FLINK-7959:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/4940#discussion_r152921868
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala
---
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import java.math.{BigDecimal => JBigDecimal}
+
+import org.apache.calcite.avatica.util.DateTimeUtils
+import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import
org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, newName}
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+import org.apache.flink.table.functions.{FunctionContext,
UserDefinedFunction}
+
+import scala.collection.mutable
+
+/**
+ * The context for code generator, maintaining various reusable statements
that could be insert
+ * into different code sections in the final generated class.
+ */
+class CodeGeneratorContext {
+
+ // set of member statements that will be added only once
+ private val reusableMemberStatements: mutable.LinkedHashSet[String] =
+ mutable.LinkedHashSet[String]()
+
+ // set of constructor statements that will be added only once
+ private val reusableInitStatements: mutable.LinkedHashSet[String] =
+ mutable.LinkedHashSet[String]()
+
+ // set of statements that will be added only once per record
+ private val reusablePerRecordStatements: mutable.LinkedHashSet[String] =
+ mutable.LinkedHashSet[String]()
+
+ // set of open statements for RichFunction that will be added only once
+ private val reusableOpenStatements: mutable.LinkedHashSet[String] =
+ mutable.LinkedHashSet[String]()
+
+ // set of close statements for RichFunction that will be added only once
+ private val reusableCloseStatements: mutable.LinkedHashSet[String] =
+ mutable.LinkedHashSet[String]()
+
+ // map of initial input unboxing expressions that will be added only once
+ // (inputTerm, index) -> expr
+ private val reusableInputUnboxingExprs: mutable.Map[(String, Int),
GeneratedExpression] =
+ mutable.Map[(String, Int), GeneratedExpression]()
+
+ // set of constructor statements that will be added only once
+ private val reusableConstructorStatements:
mutable.LinkedHashSet[(String, String)] =
+ mutable.LinkedHashSet[(String, String)]()
+
+ /**
+ * @return code block of statements that need to be placed in the member
area of the class
+ * (e.g. member variables and their initialization)
+ */
+ def reuseMemberCode(): String = {
+ reusableMemberStatements.mkString("")
+ }
+
+ /**
+ * @return code block of statements that need to be placed in the
constructor
+ */
+ def reuseInitCode(): String = {
+ reusableInitStatements.mkString("")
+ }
+
+ /**
+ * @return code block of statements that need to be placed in the per
recode process block
+ * (e.g. Function or StreamOperator's processElement)
+ */
+ def reusePerRecordCode(): String = {
+ reusablePerRecordStatements.mkString("")
+ }
+
+ /**
+ * @return code block of statements that need to be placed in the open()
method
+ * (e.g. RichFunction or StreamOperator)
+ */
+ def reuseOpenCode(): String = {
+ reusableOpenStatements.mkString("")
+ }
+
+ /**
+ * @return code block of statements that need to be placed in the
close() method
+ * (e.g. RichFunction or StreamOperator)
+ */
+ def reuseCloseCode(): String = {
+ reusableCloseStatements.mkString("")
+ }
+
+ /**
+ * @return code block of statements that unbox input variables to a
primitive variable
+ * and a corresponding null flag variable
+ */
+ def reuseInputUnboxingCode(): String = {
+ reusableInputUnboxingExprs.values.map(_.code).mkString("")
+ }
+
+ /**
+ * @return code block of constructor statements
+ */
+ def reuseConstructorCode(className: String): String = {
+ reusableConstructorStatements.map { case (params, body) =>
+ s"""
+ |public $className($params) throws Exception {
+ | this();
+ | $body
+ |}
+ |""".stripMargin
+ }.mkString("", "\n", "\n")
+ }
+
+ def addReusableMember(declareCode: String, initCode: String = ""): Unit
= {
+ reusableMemberStatements.add(declareCode)
+ if (!initCode.isEmpty) reusableInitStatements.add(initCode)
+ }
+
+ def addPerRecordStatement(s: String): Unit =
reusablePerRecordStatements.add(s)
+
+ def addReusableOpenStatement(s: String): Unit =
reusableOpenStatements.add(s)
+
+ def addReusableCloseStatement(s: String): Unit =
reusableCloseStatements.add(s)
+
+ def getReusableInputUnboxingExprs(inputTerm: String, index: Int):
Option[GeneratedExpression] =
+ reusableInputUnboxingExprs.get((inputTerm, index))
+
+ def addReusableInputUnboxingExprs(
+ inputTerm: String,
+ index: Int,
+ expr: GeneratedExpression): Unit =
+ reusableInputUnboxingExprs((inputTerm, index)) = expr
+
+ /**
+ * Adds a reusable output record to the member area of the generated
class.
+ * The passed [[TypeInformation]] defines the type class to be
instantiated.
+ */
+ def addReusableOutRecord(ti: TypeInformation[_], outRecordTerm: String):
Unit = {
+ val statement = ti match {
+ case rt: RowTypeInfo =>
+ s"""
+ |final ${ti.getTypeClass.getCanonicalName} $outRecordTerm =
+ | new ${ti.getTypeClass.getCanonicalName}(${rt.getArity});
+ |""".stripMargin
+ case _ =>
+ s"""
+ |final ${ti.getTypeClass.getCanonicalName} $outRecordTerm =
+ | new ${ti.getTypeClass.getCanonicalName}();
+ |""".stripMargin
+ }
+ reusableMemberStatements.add(statement)
+ }
+
+ /**
+ * Adds a reusable [[java.math.BigDecimal]] to the member area of the
generated class.
+ *
+ * @param decimal decimal object to be instantiated during runtime
+ * @return member variable term
+ */
+ def addReusableDecimal(decimal: JBigDecimal): String = decimal match {
+ case JBigDecimal.ZERO => "java.math.BigDecimal.ZERO"
+ case JBigDecimal.ONE => "java.math.BigDecimal.ONE"
+ case JBigDecimal.TEN => "java.math.BigDecimal.TEN"
+ case _ =>
+ val fieldTerm = newName("decimal")
+ val fieldDecimal =
+ s"""
+ |transient java.math.BigDecimal $fieldTerm =
+ | new java.math.BigDecimal("${decimal.toString}");
+ |""".stripMargin
+ reusableMemberStatements.add(fieldDecimal)
+ fieldTerm
+ }
+
+ /**
+ * Adds a reusable [[java.lang.reflect.Field]] to the member area of the
generated class.
+ * The field can be used for accessing POJO fields more efficiently
during runtime, however,
+ * the field does not have to be public.
+ */
+ def addReusablePrivateFieldAccess(clazz: Class[_], fieldName: String):
String = {
+ val fieldTerm = s"field_${clazz.getCanonicalName.replace('.',
'$')}_$fieldName"
+ val fieldExtraction =
+ s"""
+ |final java.lang.reflect.Field $fieldTerm =
+ |
org.apache.flink.api.java.typeutils.TypeExtractor.getDeclaredField(
+ | ${clazz.getCanonicalName}.class, "$fieldName");
+ |""".stripMargin
+ val fieldAccessibility =
+ s"""
+ |$fieldTerm.setAccessible(true);
+ |""".stripMargin
+
+ addReusableMember(fieldExtraction, fieldAccessibility)
+ fieldTerm
+ }
+
+ /**
+ * Adds a reusable [[java.util.HashSet]] to the member area of the
generated class.
+ */
+ def addReusableSet(elements: Seq[GeneratedExpression]): String = {
+ val fieldTerm = newName("set")
+ val field =
+ s"""
+ |final java.util.Set $fieldTerm;
+ |""".stripMargin
+ val init =
+ s"""
+ |$fieldTerm = new java.util.HashSet();
+ |""".stripMargin
+
+ addReusableMember(field, init)
+
+ elements.foreach { element =>
+ val content =
+ s"""
+ |${element.code}
+ |if (${element.nullTerm}) {
+ | $fieldTerm.add(null);
+ |} else {
+ | $fieldTerm.add(${element.resultTerm});
+ |}
+ |""".stripMargin
+ reusableInitStatements.add(content)
+ }
+
+ fieldTerm
+ }
+
+ /**
+ * Adds a reusable array to the member area of the generated class.
+ */
+ def addReusableArray(clazz: Class[_], size: Int): String = {
+ val fieldTerm = newName("array")
+ val classQualifier = clazz.getCanonicalName // works also for int[]
etc.
+ val initArray = classQualifier.replaceFirst("\\[", s"[$size")
+ val fieldArray =
+ s"""
+ |final $classQualifier $fieldTerm = new $initArray;
+ |""".stripMargin
+ reusableMemberStatements.add(fieldArray)
+ fieldTerm
+ }
+
+ /**
+ * Adds a reusable timestamp to the beginning of the SAM of the
generated class.
+ */
+ def addReusableTimestamp(): String = {
+ val fieldTerm = s"timestamp"
+ val field =
+ s"""
+ |final long $fieldTerm = java.lang.System.currentTimeMillis();
+ |""".stripMargin
+ reusablePerRecordStatements.add(field)
+ fieldTerm
+ }
+
+ /**
+ * Adds a reusable local timestamp to the beginning of the SAM of the
generated class.
+ */
+ def addReusableLocalTimestamp(): String = {
+ val fieldTerm = s"localtimestamp"
+ val timestamp = addReusableTimestamp()
+ val field =
+ s"""
+ |final long $fieldTerm = $timestamp +
java.util.TimeZone.getDefault().getOffset(timestamp);
+ |""".stripMargin
+ reusablePerRecordStatements.add(field)
+ fieldTerm
+ }
+
+ /**
+ * Adds a reusable time to the beginning of the SAM of the generated
class.
+ */
+ def addReusableTime(): String = {
+ val fieldTerm = s"time"
+ val timestamp = addReusableTimestamp()
+ // adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()
+ val field =
+ s"""
+ |final int $fieldTerm = (int) ($timestamp %
${DateTimeUtils.MILLIS_PER_DAY});
+ |if (time < 0) {
+ | time += ${DateTimeUtils.MILLIS_PER_DAY};
+ |}
+ |""".stripMargin
+ reusablePerRecordStatements.add(field)
+ fieldTerm
+ }
+
+ /**
+ * Adds a reusable local time to the beginning of the SAM of the
generated class.
+ */
+ def addReusableLocalTime(): String = {
+ val fieldTerm = s"localtime"
+ val localtimestamp = addReusableLocalTimestamp()
+ // adopted from org.apache.calcite.runtime.SqlFunctions.localTime()
+ val field =
+ s"""
+ |final int $fieldTerm = (int) ($localtimestamp %
${DateTimeUtils.MILLIS_PER_DAY});
+ |""".stripMargin
+ reusablePerRecordStatements.add(field)
+ fieldTerm
+ }
+
+ /**
+ * Adds a reusable date to the beginning of the SAM of the generated
class.
+ */
+ def addReusableDate(): String = {
+ val fieldTerm = s"date"
+ val timestamp = addReusableTimestamp()
+ val time = addReusableTime()
+
+ // adopted from org.apache.calcite.runtime.SqlFunctions.currentDate()
+ val field =
+ s"""
+ |final int $fieldTerm = (int) ($timestamp /
${DateTimeUtils.MILLIS_PER_DAY});
+ |if ($time < 0) {
+ | $fieldTerm -= 1;
+ |}
+ |""".stripMargin
+ reusablePerRecordStatements.add(field)
+ fieldTerm
+ }
+
+ /**
+ * Adds a reusable DateFormatter to the member area of the generated
[[Function]].
+ */
+ def addReusableDateFormatter(format: GeneratedExpression): String = {
+ val fieldTerm = newName("dateFormatter")
+
+ val field =
+ s"""
+ |final org.joda.time.format.DateTimeFormatter $fieldTerm;
--- End diff --
Don't forget to generalize this string when rebasing.
> Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator
> -------------------------------------------------------------------
>
> Key: FLINK-7959
> URL: https://issues.apache.org/jira/browse/FLINK-7959
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: Kurt Young
> Assignee: Kurt Young
>
> Right now {{CodeGenerator}} actually acts two roles, one is responsible for
> generating codes from RexNode, and the other one is keeping lots of reusable
> statements. It makes more sense to split these logic into two dedicated
> classes.
> The new {{CodeGeneratorContext}} will keep all the reusable statements, while
> the new {{ExprCodeGenerator}} will only do generating codes from RexNode.
> And for classes like {{AggregationCodeGenerator}} or
> {{FunctionCodeGenerator}}, I think the should not be the subclasses of the
> {{CodeGenerator}}, but should all as standalone classes. They can create
> {{ExprCodeGenerator}} when they need to generating codes from RexNode, and
> they can also generating codes by themselves. The {{CodeGeneratorContext}}
> can be passed around to collect all reusable statements, and list them in the
> final generated class.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)