JingsongLi commented on a change in pull request #7906: [FLINK-11830][table-planner-blink] Introduce CodeGeneratorContext to maintain reusable statements URL: https://github.com/apache/flink/pull/7906#discussion_r262783703
########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala ########## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen + +import org.apache.calcite.avatica.util.DateTimeUtils +import org.apache.flink.api.common.functions.{Function, RuntimeContext} +import org.apache.flink.table.`type`.{InternalType, InternalTypes, RowType} +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.codegen.CodeGenUtils._ +import org.apache.flink.table.dataformat.GenericRow +import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction} +import org.apache.flink.util.InstantiationUtil + +import scala.collection.mutable + +/** + * The context for code generator, maintaining various reusable statements that could be insert + * into different code sections in the final generated class. + */ +class CodeGeneratorContext(val tableConfig: TableConfig) { + + // referenced objects of code generator classes to avoid serialization and base64. + val references: mutable.ArrayBuffer[AnyRef] = new mutable.ArrayBuffer[AnyRef]() + + // set of member statements that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableMemberStatements: mutable.LinkedHashSet[String] = + mutable.LinkedHashSet[String]() + + // set of constructor statements that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableInitStatements: mutable.LinkedHashSet[String] = + mutable.LinkedHashSet[String]() + + // set of open statements for RichFunction that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableOpenStatements: mutable.LinkedHashSet[String] = + mutable.LinkedHashSet[String]() + + // set of close statements for RichFunction that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableCloseStatements: mutable.LinkedHashSet[String] = + mutable.LinkedHashSet[String]() + + // set of endInput statements for StreamOperator that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableEndInputStatements: mutable.LinkedHashSet[String] = + mutable.LinkedHashSet[String]() + + // set of statements for cleanup dataview that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableCleanupStatements = mutable.LinkedHashSet[String]() + + // set of statements that will be added only once per record; + // code should only update member variables because local variables are not accessible if + // the code needs to be split; + // we use a LinkedHashSet to keep the insertion order + private val reusablePerRecordStatements: mutable.LinkedHashSet[String] = + mutable.LinkedHashSet[String]() + + // map of initial input unboxing expressions that will be added only once + // (inputTerm, index) -> expr + private val reusableInputUnboxingExprs: mutable.Map[(String, Int), GeneratedExpression] = + mutable.Map[(String, Int), GeneratedExpression]() + + // set of constructor statements that will be added only once + // we use a LinkedHashSet to keep the insertion order + private val reusableConstructorStatements: mutable.LinkedHashSet[(String, String)] = + mutable.LinkedHashSet[(String, String)]() + + // set of inner class definition statements that will be added only once + private val reusableInnerClassDefinitionStatements: mutable.Map[String, String] = + mutable.Map[String, String]() + + // map of string constants that will be added only once + // string_constant -> reused_term + private val reusableStringConstants: mutable.Map[String, String] = mutable.Map[String, String]() + + // map of local variable statements. It will be placed in method if method code not excess + // max code length, otherwise will be placed in member area of the class. The statements + // are maintained for multiple methods, so that it's a map from method_name to variables. + // + // method_name -> local_variable_statements + private val reusableLocalVariableStatements = mutable.Map[String, mutable.LinkedHashSet[String]]() + + /** + * The current method name for [[reusableLocalVariableStatements]]. You can start a new + * local variable statements for another method using [[startNewLocalVariableStatement()]] + */ + private var currentMethodNameForLocalVariables = "DEFAULT" + + // --------------------------------------------------------------------------------- + // Getter + // --------------------------------------------------------------------------------- + + def getReusableInputUnboxingExprs(inputTerm: String, index: Int): Option[GeneratedExpression] = + reusableInputUnboxingExprs.get((inputTerm, index)) + + def getNullCheck: Boolean = tableConfig.getNullCheck + + // --------------------------------------------------------------------------------- + // Local Variables for Code Split + // --------------------------------------------------------------------------------- + + /** + * Starts a new local variable statements for a generated class with the given method name. + * @param methodName the method name which the fields will be placed into if code is not split. + */ + def startNewLocalVariableStatement(methodName: String): Unit = { + currentMethodNameForLocalVariables = methodName + reusableLocalVariableStatements(methodName) = mutable.LinkedHashSet[String]() + } + + + /** + * Adds a reusable local variable statement with the given type term and field name. + * The local variable statements will be placed in methods or class member area depends + * on whether the method length excess max code length. + * + * @param fieldName the field name prefix + * @param fieldTypeTerm the field type term + * @return a new generated unique field name + */ + def newReusableLocalVariable(fieldTypeTerm: String, fieldName: String): String = { + val fieldTerm = newName(fieldName) + reusableLocalVariableStatements + .getOrElse(currentMethodNameForLocalVariables, mutable.LinkedHashSet[String]()) + .add(s"$fieldTypeTerm $fieldTerm;") + fieldTerm + } + + /** + * Adds multiple pairs of local variables. + * The local variable statements will be placed in methods or class + * member area depends on whether the method length excess max code length. + * + * @param fieldTypeAndNames pairs of local variables with + * left is field type term and right is field name + * @return the new generated unique field names for each variable pairs + */ + def newReusableLocalFields(fieldTypeAndNames: (String, String)*): Seq[String] = { + val fieldTerms = newNames(fieldTypeAndNames.map(_._2): _*) + fieldTypeAndNames.map(_._1).zip(fieldTerms).foreach { case (fieldTypeTerm, fieldTerm) => + reusableLocalVariableStatements + .getOrElse(currentMethodNameForLocalVariables, mutable.LinkedHashSet[String]()) + .add(s"$fieldTypeTerm $fieldTerm;") + } + fieldTerms + } + + // --------------------------------------------------------------------------------- + // generate reuse code methods + // --------------------------------------------------------------------------------- + + /** + * @return code block of statements that need to be placed in the member area of the class + * (e.g. inner class definition) + */ + def reuseInnerClassDefinitionCode(): String = { + reusableInnerClassDefinitionStatements.values.mkString("\n") + } + + /** + * @return code block of statements that need to be placed in the member area of the class + * (e.g. member variables and their initialization) + */ + def reuseMemberCode(): String = { + reusableMemberStatements.mkString("\n") + } + + /** + * @return code block of statements that will be placed in the member area of the class + * if generated code is split or in local variables of method + */ + def reuseLocalVariableCode(methodName: String = null): String = { + if (methodName == null) { + reusableLocalVariableStatements(currentMethodNameForLocalVariables).mkString("\n") + } else { + reusableLocalVariableStatements(methodName).mkString("\n") + } + } + + /** + * @return code block of statements that need to be placed in the constructor + */ + def reuseInitCode(): String = { + reusableInitStatements.mkString("\n") + } + + /** + * @return code block of statements that need to be placed in the per recode process block + * (e.g. Function or StreamOperator's processElement) + */ + def reusePerRecordCode(): String = { + reusablePerRecordStatements.mkString("\n") + } + + /** + * @return code block of statements that need to be placed in the open() method + * (e.g. RichFunction or StreamOperator) + */ + def reuseOpenCode(): String = { + reusableOpenStatements.mkString("\n") + } + + /** + * @return code block of statements that need to be placed in the close() method + * (e.g. RichFunction or StreamOperator) + */ + def reuseCloseCode(): String = { + reusableCloseStatements.mkString("\n") + } + + /** + * @return code block of statements that need to be placed in the endInput() method + * (StreamOperator) + */ + def reuseEndInputCode(): String = { + reusableEndInputStatements.mkString("\n") + } + + /** + * @return code block of statements that need to be placed in the cleanup() method of + * [AggregationsFunction] + */ + def reuseCleanupCode(): String = { + reusableCleanupStatements.mkString("", "\n", "\n") + } + + /** + * @return code block of statements that unbox input variables to a primitive variable + * and a corresponding null flag variable + */ + def reuseInputUnboxingCode(): String = { + reusableInputUnboxingExprs.values.map(_.code).mkString("\n") + } + + /** + * Returns code block of unboxing input variables which belongs to the given inputTerm. + */ + def reuseInputUnboxingCode(inputTerm: String): String = { + val exprs = reusableInputUnboxingExprs.filter { case ((term, _), _) => + inputTerm.equals(term) + } + val codes = for (((_, _), expr) <- exprs) yield expr.code + codes.mkString("\n").trim + } + + /** + * @return code block of constructor statements + */ + def reuseConstructorCode(className: String): String = { + reusableConstructorStatements.map { case (params, body) => + s""" + |public $className($params) throws Exception { + | this(); + | $body + |} + |""".stripMargin + }.mkString("\n") + } + + // --------------------------------------------------------------------------------- + // add reusable code blocks + // --------------------------------------------------------------------------------- + + /** + * Adds a reusable inner class statement with the given class name and class code + */ + def addReusableInnerClass(className: String, statements: String): Unit = { + reusableInnerClassDefinitionStatements(className) = statements + } + + /** + * Adds a reusable member field statement to the member area. + * + * @param memberStatement the member field declare statement + */ + def addReusableMember(memberStatement: String): Unit = { + reusableMemberStatements.add(memberStatement) + } + + /** + * Adds a reusable per record statement + */ + def addReusablePerRecordStatement(s: String): Unit = reusablePerRecordStatements.add(s) + + /** + * Adds a reusable open statement + */ + def addReusableOpenStatement(s: String): Unit = reusableOpenStatements.add(s) + + /** + * Adds a reusable close statement + */ + def addReusableCloseStatement(s: String): Unit = reusableCloseStatements.add(s) + + /** + * Adds a reusable endInput statement + */ + def addReusableEndInputStatement(s: String): Unit = reusableEndInputStatements.add(s) + + /** + * Adds a reusable cleanup statement + */ + def addReusableCleanupStatement(s: String): Unit = reusableCleanupStatements.add(s) + + + /** + * Adds a reusable input unboxing expression + */ + def addReusableInputUnboxingExprs( + inputTerm: String, + index: Int, + expr: GeneratedExpression): Unit = reusableInputUnboxingExprs((inputTerm, index)) = expr + + /** + * Adds a reusable output record statement to member area. + */ + def addReusableOutputRecord( + t: InternalType, + clazz: Class[_], + outRecordTerm: String, + outRecordWriterTerm: Option[String] = None): Unit = { + val statement = generateOutputRecordStatement(t, clazz, outRecordTerm, outRecordWriterTerm) + reusableMemberStatements.add(statement) + } + + /** + * Adds a reusable null [[org.apache.flink.table.dataformat.GenericRow]] to the member area. + */ + def addReusableNullRow(rowTerm: String, arity: Int): Unit = { + addReusableOutputRecord( + new RowType((0 until arity).map(_ => InternalTypes.INT): _*), + classOf[GenericRow], + rowTerm) + } + + /** + * Adds a reusable timestamp to the beginning of the SAM of the generated class. + */ + def addReusableTimestamp(): String = { + val fieldTerm = s"timestamp" + val field = + s""" + |final long $fieldTerm = java.lang.System.currentTimeMillis(); + |""".stripMargin + reusablePerRecordStatements.add(field) + fieldTerm + } + + /** + * Adds a reusable local timestamp to the beginning of the SAM of the generated class. + */ + def addReusableLocalTimestamp(): String = { + addReusableTimestamp() + } + + /** + * Adds a reusable time to the beginning of the SAM of the generated class. + */ + def addReusableTime(): String = { + val fieldTerm = s"time" + val timestamp = addReusableTimestamp() + // adopted from org.apache.calcite.runtime.SqlFunctions.currentTime() + val field = + s""" + |final int $fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY}); + |if (time < 0) { + | time += ${DateTimeUtils.MILLIS_PER_DAY}; + |} + |""".stripMargin + reusablePerRecordStatements.add(field) + fieldTerm + } + + /** + * Adds a reusable local time to the beginning of the SAM of the generated class. + */ + def addReusableLocalTime(): String = { + val fieldTerm = s"localtime" + val timeZone = addReusableTimeZone() + val localtimestamp = addReusableLocalTimestamp() + // adopted from org.apache.calcite.runtime.SqlFunctions.localTime() + val field = + s""" + |final int $fieldTerm = (int) ( ($localtimestamp + $timeZone.getOffset($localtimestamp)) + | % ${DateTimeUtils.MILLIS_PER_DAY}); + |""".stripMargin + reusablePerRecordStatements.add(field) + fieldTerm + } + + /** + * Adds a reusable date to the beginning of the SAM of the generated class. + */ + def addReusableDate(): String = { + val fieldTerm = s"date" + val timestamp = addReusableTimestamp() + val time = addReusableTime() + val timeZone = addReusableTimeZone() + + // adopted from org.apache.calcite.runtime.SqlFunctions.currentDate() + val field = + s""" + |final int $fieldTerm = (int) (($timestamp + $timeZone.getOffset($timestamp)) + | / ${DateTimeUtils.MILLIS_PER_DAY}); + |if ($time < 0) { + | $fieldTerm -= 1; + |} + |""".stripMargin + reusablePerRecordStatements.add(field) + fieldTerm + } + + /** + * Adds a reusable TimeZone to the member area of the generated class. + */ + def addReusableTimeZone(): String = { + val zoneID = tableConfig.getTimeZone.getID + val stmt = + s"""private static final java.util.TimeZone $DEFAULT_TIMEZONE_TERM = + | java.util.TimeZone.getTimeZone("$zoneID");""".stripMargin + addReusableMember(stmt) + DEFAULT_TIMEZONE_TERM + } + + + /** + * Adds a reusable [[java.util.Random]] to the member area of the generated class. + * + * The seed parameter must be a literal/constant expression. + * + * @return member variable term + */ + def addReusableRandom(seedExpr: Option[GeneratedExpression]): String = { + val fieldTerm = newName("random") + + val field = + s""" + |final java.util.Random $fieldTerm; + |""".stripMargin + + val fieldInit = seedExpr match { + case Some(s) if getNullCheck => + s""" + |${s.code} + |if (!${s.nullTerm}) { + | $fieldTerm = new java.util.Random(${s.resultTerm}); + |} + |else { + | $fieldTerm = new java.util.Random(); + |} + |""".stripMargin + case Some(s) => + s""" + |${s.code} + |$fieldTerm = new java.util.Random(${s.resultTerm}); + |""".stripMargin + case _ => + s""" + |$fieldTerm = new java.util.Random(); + |""".stripMargin + } + + reusableMemberStatements.add(field) + reusableInitStatements.add(fieldInit) + fieldTerm + } + + // we should avoid to invoke this from other class, please use addReusableObject + private def addReferenceObj(obj: AnyRef, className: String = null): String = { + val idx = references.length + + // make a deep copy of the object + val byteArray = InstantiationUtil.serializeObject(obj) + val objCopy: AnyRef = InstantiationUtil.deserializeObject( + byteArray, + Thread.currentThread().getContextClassLoader) Review comment: obj.getClass.getClassLoader? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
