[FLINK-6887] [table] Split up CodeGenerator into several specific CodeGenerator
This closes #4171. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/527e7499 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/527e7499 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/527e7499 Branch: refs/heads/master Commit: 527e7499c5807be138d1ba0a278917190a7d4cf1 Parents: ef0653a Author: Jark Wu <[email protected]> Authored: Fri Jun 23 16:29:20 2017 +0800 Committer: Jark Wu <[email protected]> Committed: Mon Jul 17 16:34:31 2017 +0800 ---------------------------------------------------------------------- .../flink/table/api/TableEnvironment.scala | 4 +- .../codegen/AggregationCodeGenerator.scala | 436 +++++++++++++ .../flink/table/codegen/CodeGenerator.scala | 641 +------------------ .../table/codegen/CollectorCodeGenerator.scala | 100 +++ .../flink/table/codegen/ExpressionReducer.scala | 4 +- .../table/codegen/FunctionCodeGenerator.scala | 177 +++++ .../codegen/InputFormatCodeGenerator.scala | 92 +++ .../flink/table/plan/nodes/CommonCalc.scala | 4 +- .../table/plan/nodes/CommonCorrelate.scala | 16 +- .../flink/table/plan/nodes/CommonScan.scala | 4 +- .../plan/nodes/dataset/DataSetAggregate.scala | 4 +- .../table/plan/nodes/dataset/DataSetCalc.scala | 4 +- .../table/plan/nodes/dataset/DataSetJoin.scala | 4 +- .../nodes/dataset/DataSetSingleRowJoin.scala | 4 +- .../plan/nodes/dataset/DataSetValues.scala | 4 +- .../nodes/dataset/DataSetWindowAggregate.scala | 10 +- .../plan/nodes/datastream/DataStreamCalc.scala | 4 +- .../datastream/DataStreamGroupAggregate.scala | 5 +- .../DataStreamGroupWindowAggregate.scala | 6 +- .../datastream/DataStreamOverAggregate.scala | 8 +- .../nodes/datastream/DataStreamValues.scala | 4 +- .../table/runtime/aggregate/AggregateUtil.scala | 22 +- .../expressions/utils/ExpressionTestBase.scala | 4 +- 23 files changed, 872 insertions(+), 689 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 48e33cc..252b0eb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -48,7 +48,7 @@ import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableE import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv} import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema} -import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer, GeneratedFunction} +import org.apache.flink.table.codegen.{FunctionCodeGenerator, ExpressionReducer, GeneratedFunction} import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference} import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ import org.apache.flink.table.functions.AggregateFunction @@ -790,7 +790,7 @@ abstract class TableEnvironment(val config: TableConfig) { } // code generate MapFunction - val generator = new CodeGenerator( + val generator = new FunctionCodeGenerator( config, false, inputTypeInfo, http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala new file mode 100644 index 0000000..680eb44 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.codegen + +import java.lang.reflect.ParameterizedType +import java.lang.{Iterable => JIterable} + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.codegen.Indenter.toISC +import org.apache.flink.table.codegen.CodeGenUtils.newName +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getUserDefinedMethod, signatureToString} +import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, SingleElementIterable} + +/** + * A code generator for generating [[GeneratedAggregations]]. + * + * @param config configuration that determines runtime behavior + * @param nullableInput input(s) can be null. + * @param input type information about the input of the Function + */ +class AggregationCodeGenerator( + config: TableConfig, + nullableInput: Boolean, + input: TypeInformation[_ <: Any]) + extends CodeGenerator(config, nullableInput, input) { + + /** + * Generates a [[org.apache.flink.table.runtime.aggregate.GeneratedAggregations]] that can be + * passed to a Java compiler. + * + * @param name Class name of the function. + * Does not need to be unique but has to be a valid Java class identifier. + * @param generator The code generator instance + * @param physicalInputTypes Physical input row types + * @param aggregates All aggregate functions + * @param aggFields Indexes of the input fields for all aggregate functions + * @param aggMapping The mapping of aggregates to output fields + * @param partialResults A flag defining whether final or partial results (accumulators) are set + * to the output row. + * @param fwdMapping The mapping of input fields to output fields + * @param mergeMapping An optional mapping to specify the accumulators to merge. If not set, we + * assume that both rows have the accumulators at the same position. + * @param constantFlags An optional parameter to define where to set constant boolean flags in + * the output row. + * @param outputArity The number of fields in the output row. + * @param needRetract a flag to indicate if the aggregate needs the retract method + * @param needMerge a flag to indicate if the aggregate needs the merge method + * @param needReset a flag to indicate if the aggregate needs the resetAccumulator method + * + * @return A GeneratedAggregationsFunction + */ + def generateAggregations( + name: String, + generator: CodeGenerator, + physicalInputTypes: Seq[TypeInformation[_]], + aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]], + aggFields: Array[Array[Int]], + aggMapping: Array[Int], + partialResults: Boolean, + fwdMapping: Array[Int], + mergeMapping: Option[Array[Int]], + constantFlags: Option[Array[(Int, Boolean)]], + outputArity: Int, + needRetract: Boolean, + needMerge: Boolean, + needReset: Boolean) + : GeneratedAggregationsFunction = { + + // get unique function name + val funcName = newName(name) + // register UDAGGs + val aggs = aggregates.map(a => generator.addReusableFunction(a)) + // get java types of accumulators + val accTypeClasses = aggregates.map { a => + a.getClass.getMethod("createAccumulator").getReturnType + } + val accTypes = accTypeClasses.map(_.getCanonicalName) + + // get java classes of input fields + val javaClasses = physicalInputTypes.map(t => t.getTypeClass) + // get parameter lists for aggregation functions + val parameters = aggFields.map { inFields => + val fields = for (f <- inFields) yield + s"(${javaClasses(f).getCanonicalName}) input.getField($f)" + fields.mkString(", ") + } + val methodSignaturesList = aggFields.map { + inFields => for (f <- inFields) yield javaClasses(f) + } + + // check and validate the needed methods + aggregates.zipWithIndex.map { + case (a, i) => { + getUserDefinedMethod(a, "accumulate", Array(accTypeClasses(i)) ++ methodSignaturesList(i)) + .getOrElse( + throw new CodeGenException( + s"No matching accumulate method found for AggregateFunction " + + s"'${a.getClass.getCanonicalName}'" + + s"with parameters '${signatureToString(methodSignaturesList(i))}'.") + ) + + if (needRetract) { + getUserDefinedMethod(a, "retract", Array(accTypeClasses(i)) ++ methodSignaturesList(i)) + .getOrElse( + throw new CodeGenException( + s"No matching retract method found for AggregateFunction " + + s"'${a.getClass.getCanonicalName}'" + + s"with parameters '${signatureToString(methodSignaturesList(i))}'.") + ) + } + + if (needMerge) { + val methods = + getUserDefinedMethod(a, "merge", Array(accTypeClasses(i), classOf[JIterable[Any]])) + .getOrElse( + throw new CodeGenException( + s"No matching merge method found for AggregateFunction " + + s"${a.getClass.getCanonicalName}'.") + ) + + var iterableTypeClass = methods.getGenericParameterTypes.apply(1) + .asInstanceOf[ParameterizedType].getActualTypeArguments.apply(0) + // further extract iterableTypeClass if the accumulator has generic type + iterableTypeClass match { + case impl: ParameterizedType => iterableTypeClass = impl.getRawType + case _ => + } + + if (iterableTypeClass != accTypeClasses(i)) { + throw new CodeGenException( + s"merge method in AggregateFunction ${a.getClass.getCanonicalName} does not have " + + s"the correct Iterable type. Actually: ${iterableTypeClass.toString}. " + + s"Expected: ${accTypeClasses(i).toString}") + } + } + + if (needReset) { + getUserDefinedMethod(a, "resetAccumulator", Array(accTypeClasses(i))) + .getOrElse( + throw new CodeGenException( + s"No matching resetAccumulator method found for " + + s"aggregate ${a.getClass.getCanonicalName}'.") + ) + } + } + } + + def genSetAggregationResults: String = { + + val sig: String = + j""" + | public final void setAggregationResults( + | org.apache.flink.types.Row accs, + | org.apache.flink.types.Row output)""".stripMargin + + val setAggs: String = { + for (i <- aggs.indices) yield + + if (partialResults) { + j""" + | output.setField( + | ${aggMapping(i)}, + | (${accTypes(i)}) accs.getField($i));""".stripMargin + } else { + j""" + | org.apache.flink.table.functions.AggregateFunction baseClass$i = + | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; + | + | output.setField( + | ${aggMapping(i)}, + | baseClass$i.getValue((${accTypes(i)}) accs.getField($i)));""".stripMargin + } + }.mkString("\n") + + j""" + |$sig { + |$setAggs + | }""".stripMargin + } + + def genAccumulate: String = { + + val sig: String = + j""" + | public final void accumulate( + | org.apache.flink.types.Row accs, + | org.apache.flink.types.Row input)""".stripMargin + + val accumulate: String = { + for (i <- aggs.indices) yield + j""" + | ${aggs(i)}.accumulate( + | ((${accTypes(i)}) accs.getField($i)), + | ${parameters(i)});""".stripMargin + }.mkString("\n") + + j"""$sig { + |$accumulate + | }""".stripMargin + } + + def genRetract: String = { + + val sig: String = + j""" + | public final void retract( + | org.apache.flink.types.Row accs, + | org.apache.flink.types.Row input)""".stripMargin + + val retract: String = { + for (i <- aggs.indices) yield + j""" + | ${aggs(i)}.retract( + | ((${accTypes(i)}) accs.getField($i)), + | ${parameters(i)});""".stripMargin + }.mkString("\n") + + if (needRetract) { + j""" + |$sig { + |$retract + | }""".stripMargin + } else { + j""" + |$sig { + | }""".stripMargin + } + } + + def genCreateAccumulators: String = { + + val sig: String = + j""" + | public final org.apache.flink.types.Row createAccumulators() + | """.stripMargin + val init: String = + j""" + | org.apache.flink.types.Row accs = + | new org.apache.flink.types.Row(${aggs.length});""" + .stripMargin + val create: String = { + for (i <- aggs.indices) yield + j""" + | accs.setField( + | $i, + | ${aggs(i)}.createAccumulator());""" + .stripMargin + }.mkString("\n") + val ret: String = + j""" + | return accs;""" + .stripMargin + + j"""$sig { + |$init + |$create + |$ret + | }""".stripMargin + } + + def genSetForwardedFields: String = { + + val sig: String = + j""" + | public final void setForwardedFields( + | org.apache.flink.types.Row input, + | org.apache.flink.types.Row output) + | """.stripMargin + + val forward: String = { + for (i <- fwdMapping.indices if fwdMapping(i) >= 0) yield + { + j""" + | output.setField( + | $i, + | input.getField(${fwdMapping(i)}));""" + .stripMargin + } + }.mkString("\n") + + j"""$sig { + |$forward + | }""".stripMargin + } + + def genSetConstantFlags: String = { + + val sig: String = + j""" + | public final void setConstantFlags(org.apache.flink.types.Row output) + | """.stripMargin + + val setFlags: String = if (constantFlags.isDefined) { + { + for (cf <- constantFlags.get) yield { + j""" + | output.setField(${cf._1}, ${if (cf._2) "true" else "false"});""" + .stripMargin + } + }.mkString("\n") + } else { + "" + } + + j"""$sig { + |$setFlags + | }""".stripMargin + } + + def genCreateOutputRow: String = { + j""" + | public final org.apache.flink.types.Row createOutputRow() { + | return new org.apache.flink.types.Row($outputArity); + | }""".stripMargin + } + + def genMergeAccumulatorsPair: String = { + + val mapping = mergeMapping.getOrElse(aggs.indices.toArray) + + val sig: String = + j""" + | public final org.apache.flink.types.Row mergeAccumulatorsPair( + | org.apache.flink.types.Row a, + | org.apache.flink.types.Row b) + """.stripMargin + val merge: String = { + for (i <- aggs.indices) yield + j""" + | ${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i); + | ${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField(${mapping(i)}); + | accIt$i.setElement(bAcc$i); + | ${aggs(i)}.merge(aAcc$i, accIt$i); + | a.setField($i, aAcc$i); + """.stripMargin + }.mkString("\n") + val ret: String = + j""" + | return a; + """.stripMargin + + if (needMerge) { + j""" + |$sig { + |$merge + |$ret + | }""".stripMargin + } else { + j""" + |$sig { + |$ret + | }""".stripMargin + } + } + + def genMergeList: String = { + { + val singleIterableClass = classOf[SingleElementIterable[_]].getCanonicalName + for (i <- accTypes.indices) yield + j""" + | private final $singleIterableClass<${accTypes(i)}> accIt$i = + | new $singleIterableClass<${accTypes(i)}>(); + """.stripMargin + }.mkString("\n") + } + + def genResetAccumulator: String = { + + val sig: String = + j""" + | public final void resetAccumulator( + | org.apache.flink.types.Row accs)""".stripMargin + + val reset: String = { + for (i <- aggs.indices) yield + j""" + | ${aggs(i)}.resetAccumulator( + | ((${accTypes(i)}) accs.getField($i)));""".stripMargin + }.mkString("\n") + + if (needReset) { + j"""$sig { + |$reset + | }""".stripMargin + } else { + j"""$sig { + | }""".stripMargin + } + } + + val generatedAggregationsClass = classOf[GeneratedAggregations].getCanonicalName + var funcCode = + j""" + |public final class $funcName extends $generatedAggregationsClass { + | + | ${reuseMemberCode()} + | $genMergeList + | public $funcName() throws Exception { + | ${reuseInitCode()} + | } + | ${reuseConstructorCode(funcName)} + | + """.stripMargin + + funcCode += genSetAggregationResults + "\n" + funcCode += genAccumulate + "\n" + funcCode += genRetract + "\n" + funcCode += genCreateAccumulators + "\n" + funcCode += genSetForwardedFields + "\n" + funcCode += genSetConstantFlags + "\n" + funcCode += genCreateOutputRow + "\n" + funcCode += genMergeAccumulatorsPair + "\n" + funcCode += genResetAccumulator + "\n" + funcCode += "}" + + GeneratedAggregationsFunction(funcName, funcCode) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index e8bcdcf..af16b51 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -18,8 +18,6 @@ package org.apache.flink.table.codegen -import java.lang.reflect.ParameterizedType -import java.lang.{Iterable => JIterable} import java.math.{BigDecimal => JBigDecimal} import org.apache.calcite.avatica.util.DateTimeUtils @@ -28,25 +26,21 @@ import org.apache.calcite.sql.SqlOperator import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.fun.SqlStdOperatorTable._ import org.apache.flink.api.common.functions._ -import org.apache.flink.api.common.io.GenericInputFormat import org.apache.flink.api.common.typeinfo._ import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils._ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.table.api.TableConfig import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenUtils._ import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE} -import org.apache.flink.table.codegen.Indenter.toISC import org.apache.flink.table.codegen.calls.{BuiltInMethods, FunctionGenerator} import org.apache.flink.table.codegen.calls.ScalarOperators._ import org.apache.flink.table.functions.sql.ScalarSqlFunctions import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getUserDefinedMethod, signatureToString} -import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, TimeMaterializationSqlFunction, UserDefinedFunction} -import org.apache.flink.table.runtime.TableFunctionCollector +import org.apache.flink.table.functions.{FunctionContext, TimeMaterializationSqlFunction, UserDefinedFunction} import org.apache.flink.table.typeutils.TypeCheckUtils._ import org.apache.flink.types.Row @@ -54,7 +48,9 @@ import scala.collection.JavaConversions._ import scala.collection.mutable /** - * A code generator for generating Flink [[org.apache.flink.api.common.functions.Function]]s. + * [[CodeGenerator]] is the base code generator for generating Flink + * [[org.apache.flink.api.common.functions.Function]]s. + * It is responsible for expression generation and tracks the context (member variables etc). * * @param config configuration that determines runtime behavior * @param nullableInput input(s) can be null. @@ -65,7 +61,7 @@ import scala.collection.mutable * @param input2FieldMapping additional mapping information for input2 * (e.g. POJO types have no deterministic field order and some input fields might not be read) */ -class CodeGenerator( +abstract class CodeGenerator( config: TableConfig, nullableInput: Boolean, input1: TypeInformation[_ <: Any], @@ -95,12 +91,12 @@ class CodeGenerator( case _ => // ok } - private val input1Mapping = input1FieldMapping match { + protected val input1Mapping: Array[Int] = input1FieldMapping match { case Some(mapping) => mapping case _ => (0 until input1.getArity).toArray } - private val input2Mapping = input2FieldMapping match { + protected val input2Mapping: Array[Int] = input2FieldMapping match { case Some(mapping) => mapping case _ => input2 match { case Some(input) => (0 until input.getArity).toArray @@ -108,31 +104,6 @@ class CodeGenerator( } } - /** - * A code generator for generating unary Flink - * [[org.apache.flink.api.common.functions.Function]]s with one input. - * - * @param config configuration that determines runtime behavior - * @param nullableInput input(s) can be null. - * @param input type information about the input of the Function - * @param inputFieldMapping additional mapping information necessary for input - * (e.g. POJO types have no deterministic field order and some input fields might not be read) - */ - def this( - config: TableConfig, - nullableInput: Boolean, - input: TypeInformation[Any], - inputFieldMapping: Array[Int]) = - this(config, nullableInput, input, None, Some(inputFieldMapping)) - - /** - * A code generator for generating Flink input formats. - * - * @param config configuration that determines runtime behavior - */ - def this(config: TableConfig) = - this(config, false, new RowTypeInfo(), None, None) - // set of member statements that will be added only once // we use a LinkedHashSet to keep the insertion order private val reusableMemberStatements = mutable.LinkedHashSet[String]() @@ -261,604 +232,6 @@ class CodeGenerator( } /** - * Generates a [[org.apache.flink.table.runtime.aggregate.GeneratedAggregations]] that can be - * passed to a Java compiler. - * - * @param name Class name of the function. - * Does not need to be unique but has to be a valid Java class identifier. - * @param generator The code generator instance - * @param physicalInputTypes Physical input row types - * @param aggregates All aggregate functions - * @param aggFields Indexes of the input fields for all aggregate functions - * @param aggMapping The mapping of aggregates to output fields - * @param partialResults A flag defining whether final or partial results (accumulators) are set - * to the output row. - * @param fwdMapping The mapping of input fields to output fields - * @param mergeMapping An optional mapping to specify the accumulators to merge. If not set, we - * assume that both rows have the accumulators at the same position. - * @param constantFlags An optional parameter to define where to set constant boolean flags in - * the output row. - * @param outputArity The number of fields in the output row. - * @param needRetract a flag to indicate if the aggregate needs the retract method - * @param needMerge a flag to indicate if the aggregate needs the merge method - * @param needReset a flag to indicate if the aggregate needs the resetAccumulator method - * - * @return A GeneratedAggregationsFunction - */ - def generateAggregations( - name: String, - generator: CodeGenerator, - physicalInputTypes: Seq[TypeInformation[_]], - aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]], - aggFields: Array[Array[Int]], - aggMapping: Array[Int], - partialResults: Boolean, - fwdMapping: Array[Int], - mergeMapping: Option[Array[Int]], - constantFlags: Option[Array[(Int, Boolean)]], - outputArity: Int, - needRetract: Boolean, - needMerge: Boolean, - needReset: Boolean) - : GeneratedAggregationsFunction = { - - // get unique function name - val funcName = newName(name) - // register UDAGGs - val aggs = aggregates.map(a => generator.addReusableFunction(a)) - // get java types of accumulators - val accTypeClasses = aggregates.map { a => - a.getClass.getMethod("createAccumulator").getReturnType - } - val accTypes = accTypeClasses.map(_.getCanonicalName) - - // get java classes of input fields - val javaClasses = physicalInputTypes.map(t => t.getTypeClass) - // get parameter lists for aggregation functions - val parameters = aggFields.map { inFields => - val fields = for (f <- inFields) yield - s"(${javaClasses(f).getCanonicalName}) input.getField($f)" - fields.mkString(", ") - } - val methodSignaturesList = aggFields.map { - inFields => for (f <- inFields) yield javaClasses(f) - } - - // check and validate the needed methods - aggregates.zipWithIndex.map { - case (a, i) => { - getUserDefinedMethod(a, "accumulate", Array(accTypeClasses(i)) ++ methodSignaturesList(i)) - .getOrElse( - throw new CodeGenException( - s"No matching accumulate method found for AggregateFunction " + - s"'${a.getClass.getCanonicalName}'" + - s"with parameters '${signatureToString(methodSignaturesList(i))}'.") - ) - - if (needRetract) { - getUserDefinedMethod(a, "retract", Array(accTypeClasses(i)) ++ methodSignaturesList(i)) - .getOrElse( - throw new CodeGenException( - s"No matching retract method found for AggregateFunction " + - s"'${a.getClass.getCanonicalName}'" + - s"with parameters '${signatureToString(methodSignaturesList(i))}'.") - ) - } - - if (needMerge) { - val methods = - getUserDefinedMethod(a, "merge", Array(accTypeClasses(i), classOf[JIterable[Any]])) - .getOrElse( - throw new CodeGenException( - s"No matching merge method found for AggregateFunction " + - s"${a.getClass.getCanonicalName}'.") - ) - - var iterableTypeClass = methods.getGenericParameterTypes.apply(1) - .asInstanceOf[ParameterizedType].getActualTypeArguments.apply(0) - // further extract iterableTypeClass if the accumulator has generic type - iterableTypeClass match { - case impl: ParameterizedType => iterableTypeClass = impl.getRawType - case _ => - } - - if (iterableTypeClass != accTypeClasses(i)) { - throw new CodeGenException( - s"merge method in AggregateFunction ${a.getClass.getCanonicalName} does not have " + - s"the correct Iterable type. Actually: ${iterableTypeClass.toString}. " + - s"Expected: ${accTypeClasses(i).toString}") - } - } - - if (needReset) { - getUserDefinedMethod(a, "resetAccumulator", Array(accTypeClasses(i))) - .getOrElse( - throw new CodeGenException( - s"No matching resetAccumulator method found for " + - s"aggregate ${a.getClass.getCanonicalName}'.") - ) - } - } - } - - def genSetAggregationResults: String = { - - val sig: String = - j""" - | public final void setAggregationResults( - | org.apache.flink.types.Row accs, - | org.apache.flink.types.Row output)""".stripMargin - - val setAggs: String = { - for (i <- aggs.indices) yield - - if (partialResults) { - j""" - | output.setField( - | ${aggMapping(i)}, - | (${accTypes(i)}) accs.getField($i));""".stripMargin - } else { - j""" - | org.apache.flink.table.functions.AggregateFunction baseClass$i = - | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)}; - | - | output.setField( - | ${aggMapping(i)}, - | baseClass$i.getValue((${accTypes(i)}) accs.getField($i)));""".stripMargin - } - }.mkString("\n") - - j""" - |$sig { - |$setAggs - | }""".stripMargin - } - - def genAccumulate: String = { - - val sig: String = - j""" - | public final void accumulate( - | org.apache.flink.types.Row accs, - | org.apache.flink.types.Row input)""".stripMargin - - val accumulate: String = { - for (i <- aggs.indices) yield - j""" - | ${aggs(i)}.accumulate( - | ((${accTypes(i)}) accs.getField($i)), - | ${parameters(i)});""".stripMargin - }.mkString("\n") - - j"""$sig { - |$accumulate - | }""".stripMargin - } - - def genRetract: String = { - - val sig: String = - j""" - | public final void retract( - | org.apache.flink.types.Row accs, - | org.apache.flink.types.Row input)""".stripMargin - - val retract: String = { - for (i <- aggs.indices) yield - j""" - | ${aggs(i)}.retract( - | ((${accTypes(i)}) accs.getField($i)), - | ${parameters(i)});""".stripMargin - }.mkString("\n") - - if (needRetract) { - j""" - |$sig { - |$retract - | }""".stripMargin - } else { - j""" - |$sig { - | }""".stripMargin - } - } - - def genCreateAccumulators: String = { - - val sig: String = - j""" - | public final org.apache.flink.types.Row createAccumulators() - | """.stripMargin - val init: String = - j""" - | org.apache.flink.types.Row accs = - | new org.apache.flink.types.Row(${aggs.length});""" - .stripMargin - val create: String = { - for (i <- aggs.indices) yield - j""" - | accs.setField( - | $i, - | ${aggs(i)}.createAccumulator());""" - .stripMargin - }.mkString("\n") - val ret: String = - j""" - | return accs;""" - .stripMargin - - j"""$sig { - |$init - |$create - |$ret - | }""".stripMargin - } - - def genSetForwardedFields: String = { - - val sig: String = - j""" - | public final void setForwardedFields( - | org.apache.flink.types.Row input, - | org.apache.flink.types.Row output) - | """.stripMargin - - val forward: String = { - for (i <- fwdMapping.indices if fwdMapping(i) >= 0) yield - { - j""" - | output.setField( - | $i, - | input.getField(${fwdMapping(i)}));""" - .stripMargin - } - }.mkString("\n") - - j"""$sig { - |$forward - | }""".stripMargin - } - - def genSetConstantFlags: String = { - - val sig: String = - j""" - | public final void setConstantFlags(org.apache.flink.types.Row output) - | """.stripMargin - - val setFlags: String = if (constantFlags.isDefined) { - { - for (cf <- constantFlags.get) yield { - j""" - | output.setField(${cf._1}, ${if (cf._2) "true" else "false"});""" - .stripMargin - } - }.mkString("\n") - } else { - "" - } - - j"""$sig { - |$setFlags - | }""".stripMargin - } - - def genCreateOutputRow: String = { - j""" - | public final org.apache.flink.types.Row createOutputRow() { - | return new org.apache.flink.types.Row($outputArity); - | }""".stripMargin - } - - def genMergeAccumulatorsPair: String = { - - val mapping = mergeMapping.getOrElse(aggs.indices.toArray) - - val sig: String = - j""" - | public final org.apache.flink.types.Row mergeAccumulatorsPair( - | org.apache.flink.types.Row a, - | org.apache.flink.types.Row b) - """.stripMargin - val merge: String = { - for (i <- aggs.indices) yield - j""" - | ${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i); - | ${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField(${mapping(i)}); - | accIt$i.setElement(bAcc$i); - | ${aggs(i)}.merge(aAcc$i, accIt$i); - | a.setField($i, aAcc$i); - """.stripMargin - }.mkString("\n") - val ret: String = - j""" - | return a; - """.stripMargin - - if (needMerge) { - j""" - |$sig { - |$merge - |$ret - | }""".stripMargin - } else { - j""" - |$sig { - |$ret - | }""".stripMargin - } - } - - def genMergeList: String = { - { - val singleIterableClass = "org.apache.flink.table.runtime.aggregate.SingleElementIterable" - for (i <- accTypes.indices) yield - j""" - | private final $singleIterableClass<${accTypes(i)}> accIt$i = - | new $singleIterableClass<${accTypes(i)}>(); - """.stripMargin - }.mkString("\n") - } - - def genResetAccumulator: String = { - - val sig: String = - j""" - | public final void resetAccumulator( - | org.apache.flink.types.Row accs)""".stripMargin - - val reset: String = { - for (i <- aggs.indices) yield - j""" - | ${aggs(i)}.resetAccumulator( - | ((${accTypes(i)}) accs.getField($i)));""".stripMargin - }.mkString("\n") - - if (needReset) { - j"""$sig { - |$reset - | }""".stripMargin - } else { - j"""$sig { - | }""".stripMargin - } - } - - var funcCode = - j""" - |public final class $funcName - | extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations { - | - | ${reuseMemberCode()} - | $genMergeList - | public $funcName() throws Exception { - | ${reuseInitCode()} - | } - | ${reuseConstructorCode(funcName)} - | - """.stripMargin - - funcCode += genSetAggregationResults + "\n" - funcCode += genAccumulate + "\n" - funcCode += genRetract + "\n" - funcCode += genCreateAccumulators + "\n" - funcCode += genSetForwardedFields + "\n" - funcCode += genSetConstantFlags + "\n" - funcCode += genCreateOutputRow + "\n" - funcCode += genMergeAccumulatorsPair + "\n" - funcCode += genResetAccumulator + "\n" - funcCode += "}" - - GeneratedAggregationsFunction(funcName, funcCode) - } - - /** - * Generates a [[org.apache.flink.api.common.functions.Function]] that can be passed to Java - * compiler. - * - * @param name Class name of the Function. Must not be unique but has to be a valid Java class - * identifier. - * @param clazz Flink Function to be generated. - * @param bodyCode code contents of the SAM (Single Abstract Method). Inputs, collector, or - * output record can be accessed via the given term methods. - * @param returnType expected return type - * @tparam F Flink Function to be generated. - * @tparam T Return type of the Flink Function. - * @return instance of GeneratedFunction - */ - def generateFunction[F <: Function, T <: Any]( - name: String, - clazz: Class[F], - bodyCode: String, - returnType: TypeInformation[T]) - : GeneratedFunction[F, T] = { - val funcName = newName(name) - - // Janino does not support generics, that's why we need - // manual casting here - val samHeader = - // FlatMapFunction - if (clazz == classOf[FlatMapFunction[_, _]]) { - val baseClass = classOf[RichFlatMapFunction[_, _]] - val inputTypeTerm = boxedTypeTermForTypeInfo(input1) - (baseClass, - s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)", - List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) - } - - // MapFunction - else if (clazz == classOf[MapFunction[_, _]]) { - val baseClass = classOf[RichMapFunction[_, _]] - val inputTypeTerm = boxedTypeTermForTypeInfo(input1) - (baseClass, - "Object map(Object _in1)", - List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) - } - - // FlatJoinFunction - else if (clazz == classOf[FlatJoinFunction[_, _, _]]) { - val baseClass = classOf[RichFlatJoinFunction[_, _, _]] - val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1) - val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.getOrElse( - throw new CodeGenException("Input 2 for FlatJoinFunction should not be null"))) - (baseClass, - s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector $collectorTerm)", - List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;", - s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;")) - } - - // ProcessFunction - else if (clazz == classOf[ProcessFunction[_, _]]) { - val baseClass = classOf[ProcessFunction[_, _]] - val inputTypeTerm = boxedTypeTermForTypeInfo(input1) - (baseClass, - s"void processElement(Object _in1, " + - s"org.apache.flink.streaming.api.functions.ProcessFunction.Context $contextTerm," + - s"org.apache.flink.util.Collector $collectorTerm)", - List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) - } - else { - // TODO more functions - throw new CodeGenException("Unsupported Function.") - } - - val funcCode = j""" - public class $funcName - extends ${samHeader._1.getCanonicalName} { - - ${reuseMemberCode()} - - public $funcName() throws Exception { - ${reuseInitCode()} - } - - ${reuseConstructorCode(funcName)} - - @Override - public void open(${classOf[Configuration].getCanonicalName} parameters) throws Exception { - ${reuseOpenCode()} - } - - @Override - public ${samHeader._2} throws Exception { - ${samHeader._3.mkString("\n")} - ${reusePerRecordCode()} - ${reuseInputUnboxingCode()} - $bodyCode - } - - @Override - public void close() throws Exception { - ${reuseCloseCode()} - } - } - """.stripMargin - - GeneratedFunction(funcName, returnType, funcCode) - } - - /** - * Generates a values input format that can be passed to Java compiler. - * - * @param name Class name of the input format. Must not be unique but has to be a - * valid Java class identifier. - * @param records code for creating records - * @param returnType expected return type - * @tparam T Return type of the Flink Function. - * @return instance of GeneratedFunction - */ - def generateValuesInputFormat[T <: Row]( - name: String, - records: Seq[String], - returnType: TypeInformation[T]) - : GeneratedInput[GenericInputFormat[T], T] = { - val funcName = newName(name) - - addReusableOutRecord(returnType) - - val funcCode = j""" - public class $funcName extends ${classOf[GenericInputFormat[_]].getCanonicalName} { - - private int nextIdx = 0; - - ${reuseMemberCode()} - - public $funcName() throws Exception { - ${reuseInitCode()} - } - - @Override - public boolean reachedEnd() throws java.io.IOException { - return nextIdx >= ${records.length}; - } - - @Override - public Object nextRecord(Object reuse) { - switch (nextIdx) { - ${records.zipWithIndex.map { case (r, i) => - s""" - |case $i: - | $r - |break; - """.stripMargin - }.mkString("\n")} - } - nextIdx++; - return $outRecordTerm; - } - } - """.stripMargin - - GeneratedInput(funcName, returnType, funcCode) - } - - /** - * Generates a [[TableFunctionCollector]] that can be passed to Java compiler. - * - * @param name Class name of the table function collector. Must not be unique but has to be a - * valid Java class identifier. - * @param bodyCode body code for the collector method - * @param collectedType The type information of the element collected by the collector - * @return instance of GeneratedCollector - */ - def generateTableFunctionCollector( - name: String, - bodyCode: String, - collectedType: TypeInformation[Any]) - : GeneratedCollector = { - - val className = newName(name) - val input1TypeClass = boxedTypeTermForTypeInfo(input1) - val input2TypeClass = boxedTypeTermForTypeInfo(collectedType) - - val funcCode = j""" - public class $className extends ${classOf[TableFunctionCollector[_]].getCanonicalName} { - - ${reuseMemberCode()} - - public $className() throws Exception { - ${reuseInitCode()} - } - - @Override - public void collect(Object record) throws Exception { - super.collect(record); - $input1TypeClass $input1Term = ($input1TypeClass) getInput(); - $input2TypeClass $input2Term = ($input2TypeClass) record; - ${reuseInputUnboxingCode()} - $bodyCode - } - - @Override - public void close() { - } - } - """.stripMargin - - GeneratedCollector(className, funcCode) - } - - /** * Generates an expression that converts the first input (and second input) into the given type. * If two inputs are converted, the second input is appended. If objects or variables can * be reused, they will be added to reusable code sections internally. The evaluation result http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala new file mode 100644 index 0000000..70f6638 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.codegen + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, newName} +import org.apache.flink.table.codegen.Indenter.toISC +import org.apache.flink.table.runtime.TableFunctionCollector + + +/** + * A code generator for generating [[org.apache.flink.util.Collector]]s. + * + * @param config configuration that determines runtime behavior + * @param nullableInput input(s) can be null. + * @param input1 type information about the first input of the Function + * @param input2 type information about the second input if the Function is binary + * @param input1FieldMapping additional mapping information for input1 + * (e.g. POJO types have no deterministic field order and some input fields might not be read) + * @param input2FieldMapping additional mapping information for input2 + * (e.g. POJO types have no deterministic field order and some input fields might not be read) + */ +class CollectorCodeGenerator( + config: TableConfig, + nullableInput: Boolean, + input1: TypeInformation[_ <: Any], + input2: Option[TypeInformation[_ <: Any]] = None, + input1FieldMapping: Option[Array[Int]] = None, + input2FieldMapping: Option[Array[Int]] = None) + extends CodeGenerator( + config, + nullableInput, + input1, + input2, + input1FieldMapping, + input2FieldMapping) { + + /** + * Generates a [[TableFunctionCollector]] that can be passed to Java compiler. + * + * @param name Class name of the table function collector. Must not be unique but has to be a + * valid Java class identifier. + * @param bodyCode body code for the collector method + * @param collectedType The type information of the element collected by the collector + * @return instance of GeneratedCollector + */ + def generateTableFunctionCollector( + name: String, + bodyCode: String, + collectedType: TypeInformation[Any]) + : GeneratedCollector = { + + val className = newName(name) + val input1TypeClass = boxedTypeTermForTypeInfo(input1) + val input2TypeClass = boxedTypeTermForTypeInfo(collectedType) + + val funcCode = j""" + public class $className extends ${classOf[TableFunctionCollector[_]].getCanonicalName} { + + ${reuseMemberCode()} + + public $className() throws Exception { + ${reuseInitCode()} + } + + @Override + public void collect(Object record) throws Exception { + super.collect(record); + $input1TypeClass $input1Term = ($input1TypeClass) getInput(); + $input2TypeClass $input2Term = ($input2TypeClass) record; + ${reuseInputUnboxingCode()} + $bodyCode + } + + @Override + public void close() { + } + } + """.stripMargin + + GeneratedCollector(className, funcCode) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala index b7e1335..cf36417 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala @@ -33,7 +33,7 @@ import org.apache.flink.types.Row import scala.collection.JavaConverters._ /** - * Evaluates constant expressions using Flink's [[CodeGenerator]]. + * Evaluates constant expressions using Flink's [[FunctionCodeGenerator]]. */ class ExpressionReducer(config: TableConfig) extends RelOptPlanner.Executor with Compiler[MapFunction[Row, Row]] { @@ -77,7 +77,7 @@ class ExpressionReducer(config: TableConfig) val resultType = new RowTypeInfo(literalTypes: _*) // generate MapFunction - val generator = new CodeGenerator(config, false, EMPTY_ROW_INFO) + val generator = new FunctionCodeGenerator(config, false, EMPTY_ROW_INFO) val result = generator.generateResultExpression( resultType, http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala new file mode 100644 index 0000000..e86c4ab --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.codegen + +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, newName} +import org.apache.flink.table.codegen.Indenter.toISC + +/** + * A code generator for generating Flink [[org.apache.flink.api.common.functions.Function]]s. + * Including [[MapFunction]], [[FlatMapFunction]], [[FlatJoinFunction]], [[ProcessFunction]]. + * + * @param config configuration that determines runtime behavior + * @param nullableInput input(s) can be null. + * @param input1 type information about the first input of the Function + * @param input2 type information about the second input if the Function is binary + * @param input1FieldMapping additional mapping information for input1 + * (e.g. POJO types have no deterministic field order and some input fields might not be read) + * @param input2FieldMapping additional mapping information for input2 + * (e.g. POJO types have no deterministic field order and some input fields might not be read) + */ +class FunctionCodeGenerator( + config: TableConfig, + nullableInput: Boolean, + input1: TypeInformation[_ <: Any], + input2: Option[TypeInformation[_ <: Any]] = None, + input1FieldMapping: Option[Array[Int]] = None, + input2FieldMapping: Option[Array[Int]] = None) + extends CodeGenerator( + config, + nullableInput, + input1, + input2, + input1FieldMapping, + input2FieldMapping) { + + /** + * A code generator for generating unary Flink + * [[org.apache.flink.api.common.functions.Function]]s with one input. + * + * @param config configuration that determines runtime behavior + * @param nullableInput input(s) can be null. + * @param input type information about the input of the Function + * @param inputFieldMapping additional mapping information necessary for input + * (e.g. POJO types have no deterministic field order and some input fields might not be read) + */ + def this( + config: TableConfig, + nullableInput: Boolean, + input: TypeInformation[Any], + inputFieldMapping: Array[Int]) = + this(config, nullableInput, input, None, Some(inputFieldMapping)) + + /** + * Generates a [[org.apache.flink.api.common.functions.Function]] that can be passed to Java + * compiler. + * + * @param name Class name of the Function. Must not be unique but has to be a valid Java class + * identifier. + * @param clazz Flink Function to be generated. + * @param bodyCode code contents of the SAM (Single Abstract Method). Inputs, collector, or + * output record can be accessed via the given term methods. + * @param returnType expected return type + * @tparam F Flink Function to be generated. + * @tparam T Return type of the Flink Function. + * @return instance of GeneratedFunction + */ + def generateFunction[F <: Function, T <: Any]( + name: String, + clazz: Class[F], + bodyCode: String, + returnType: TypeInformation[T]) + : GeneratedFunction[F, T] = { + val funcName = newName(name) + + // Janino does not support generics, that's why we need + // manual casting here + val samHeader = + // FlatMapFunction + if (clazz == classOf[FlatMapFunction[_, _]]) { + val baseClass = classOf[RichFlatMapFunction[_, _]] + val inputTypeTerm = boxedTypeTermForTypeInfo(input1) + (baseClass, + s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)", + List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) + } + + // MapFunction + else if (clazz == classOf[MapFunction[_, _]]) { + val baseClass = classOf[RichMapFunction[_, _]] + val inputTypeTerm = boxedTypeTermForTypeInfo(input1) + (baseClass, + "Object map(Object _in1)", + List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) + } + + // FlatJoinFunction + else if (clazz == classOf[FlatJoinFunction[_, _, _]]) { + val baseClass = classOf[RichFlatJoinFunction[_, _, _]] + val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1) + val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.getOrElse( + throw new CodeGenException("Input 2 for FlatJoinFunction should not be null"))) + (baseClass, + s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector $collectorTerm)", + List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;", + s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;")) + } + + // ProcessFunction + else if (clazz == classOf[ProcessFunction[_, _]]) { + val baseClass = classOf[ProcessFunction[_, _]] + val inputTypeTerm = boxedTypeTermForTypeInfo(input1) + (baseClass, + s"void processElement(Object _in1, " + + s"org.apache.flink.streaming.api.functions.ProcessFunction.Context $contextTerm," + + s"org.apache.flink.util.Collector $collectorTerm)", + List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) + } + else { + // TODO more functions + throw new CodeGenException("Unsupported Function.") + } + + val funcCode = j""" + public class $funcName + extends ${samHeader._1.getCanonicalName} { + + ${reuseMemberCode()} + + public $funcName() throws Exception { + ${reuseInitCode()} + } + + ${reuseConstructorCode(funcName)} + + @Override + public void open(${classOf[Configuration].getCanonicalName} parameters) throws Exception { + ${reuseOpenCode()} + } + + @Override + public ${samHeader._2} throws Exception { + ${samHeader._3.mkString("\n")} + ${reusePerRecordCode()} + ${reuseInputUnboxingCode()} + $bodyCode + } + + @Override + public void close() throws Exception { + ${reuseCloseCode()} + } + } + """.stripMargin + + GeneratedFunction(funcName, returnType, funcCode) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala new file mode 100644 index 0000000..6d6e1b6 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.codegen + +import org.apache.flink.api.common.io.GenericInputFormat +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.codegen.CodeGenUtils.newName +import org.apache.flink.table.codegen.Indenter.toISC +import org.apache.flink.types.Row + +/** + * A code generator for generating Flink [[GenericInputFormat]]s. + * + * @param config configuration that determines runtime behavior + */ +class InputFormatCodeGenerator( + config: TableConfig) + extends CodeGenerator(config, false, new RowTypeInfo(), None, None) { + + + /** + * Generates a values input format that can be passed to Java compiler. + * + * @param name Class name of the input format. Must not be unique but has to be a + * valid Java class identifier. + * @param records code for creating records + * @param returnType expected return type + * @tparam T Return type of the Flink Function. + * @return instance of GeneratedFunction + */ + def generateValuesInputFormat[T <: Row]( + name: String, + records: Seq[String], + returnType: TypeInformation[T]) + : GeneratedInput[GenericInputFormat[T], T] = { + val funcName = newName(name) + + addReusableOutRecord(returnType) + + val funcCode = j""" + public class $funcName extends ${classOf[GenericInputFormat[_]].getCanonicalName} { + + private int nextIdx = 0; + + ${reuseMemberCode()} + + public $funcName() throws Exception { + ${reuseInitCode()} + } + + @Override + public boolean reachedEnd() throws java.io.IOException { + return nextIdx >= ${records.length}; + } + + @Override + public Object nextRecord(Object reuse) { + switch (nextIdx) { + ${records.zipWithIndex.map { case (r, i) => + s""" + |case $i: + | $r + |break; + """.stripMargin + }.mkString("\n")} + } + nextIdx++; + return $outRecordTerm; + } + } + """.stripMargin + + GeneratedInput(funcName, returnType, funcCode) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala index 9b486e4..693924e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala @@ -23,7 +23,7 @@ import org.apache.calcite.rex._ import org.apache.flink.api.common.functions.Function import org.apache.flink.table.api.TableConfig import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction} +import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row @@ -33,7 +33,7 @@ import scala.collection.JavaConverters._ trait CommonCalc { private[flink] def generateFunction[T <: Function]( - generator: CodeGenerator, + generator: FunctionCodeGenerator, ruleDescription: String, inputSchema: RowSchema, returnSchema: RowSchema, http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala index 96e1f5e..96aaf3e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala @@ -20,12 +20,12 @@ package org.apache.flink.table.plan.nodes import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexShuttle} import org.apache.calcite.sql.SemiJoinType -import org.apache.flink.api.common.functions.{FlatMapFunction, Function} +import org.apache.flink.api.common.functions.Function import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.{TableConfig, TableException} import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE} -import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction} +import org.apache.flink.table.codegen._ import org.apache.flink.table.functions.utils.TableSqlFunction import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.runtime.TableFunctionCollector @@ -55,7 +55,7 @@ trait CommonCorrelate { val physicalRexCall = inputSchema.mapRexNode(rexCall) - val functionGenerator = new CodeGenerator( + val functionGenerator = new FunctionCodeGenerator( config, false, inputSchema.physicalTypeInfo, @@ -123,7 +123,7 @@ trait CommonCorrelate { pojoFieldMapping: Option[Array[Int]]) : GeneratedCollector = { - val generator = new CodeGenerator( + val generator = new CollectorCodeGenerator( config, false, inputSchema.physicalTypeInfo, @@ -155,7 +155,13 @@ trait CommonCorrelate { // The generated expression is discarded. generator.generateExpression(condition.get.accept(changeInputRefIndexShuttle)) - val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo, None, pojoFieldMapping) + val filterGenerator = new FunctionCodeGenerator( + config, + false, + udtfTypeInfo, + None, + pojoFieldMapping) + filterGenerator.input1Term = filterGenerator.input2Term val filterCondition = filterGenerator.generateExpression(condition.get) s""" http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala index 7ce73ee..996d62c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.plan.nodes import org.apache.flink.api.common.functions.Function import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.TableConfig -import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction} +import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.types.Row /** @@ -48,7 +48,7 @@ trait CommonScan[T] { inputFieldMapping: Option[Array[Int]] = None) : GeneratedFunction[F, Row] = { - val generator = new CodeGenerator( + val generator = new FunctionCodeGenerator( config, false, inputType, http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala index b53081c..37d1a51 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala @@ -29,7 +29,7 @@ import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.codegen.AggregationCodeGenerator import org.apache.flink.table.plan.nodes.CommonAggregate import org.apache.flink.table.runtime.aggregate.{AggregateUtil, DataSetPreAggFunction} import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair @@ -95,7 +95,7 @@ class DataSetAggregate( val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] - val generator = new CodeGenerator( + val generator = new AggregationCodeGenerator( tableEnv.getConfig, false, inputDS.getType) http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala index 9a9f738..a923acc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala @@ -29,7 +29,7 @@ import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.codegen.FunctionCodeGenerator import org.apache.flink.table.plan.nodes.CommonCalc import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.runtime.FlatMapRunner @@ -86,7 +86,7 @@ class DataSetCalc( val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) - val generator = new CodeGenerator(config, false, inputDS.getType) + val generator = new FunctionCodeGenerator(config, false, inputDS.getType) val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala index e6f8ca4..a6c31d3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala @@ -30,7 +30,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.{BatchTableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.codegen.FunctionCodeGenerator import org.apache.flink.table.runtime.FlatJoinRunner import org.apache.flink.types.Row @@ -158,7 +158,7 @@ class DataSetJoin( throw TableException("Null check in TableConfig must be enabled for outer joins.") } - val generator = new CodeGenerator( + val generator = new FunctionCodeGenerator( config, nullCheck, leftDataSet.getType, http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala index e92dec5..3c1c58b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala @@ -29,7 +29,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig} import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.codegen.FunctionCodeGenerator import org.apache.flink.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner} import org.apache.flink.types.Row @@ -129,7 +129,7 @@ class DataSetSingleRowJoin( case _ => false } - val codeGenerator = new CodeGenerator( + val codeGenerator = new FunctionCodeGenerator( config, isOuterJoin, inputType1, http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala index 948dd27..3a4ba47 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala @@ -27,7 +27,7 @@ import org.apache.calcite.rex.RexLiteral import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.codegen.InputFormatCodeGenerator import org.apache.flink.table.runtime.io.ValuesInputFormat import org.apache.flink.types.Row @@ -72,7 +72,7 @@ class DataSetValues( val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) - val generator = new CodeGenerator(config) + val generator = new InputFormatCodeGenerator(config) // generate code for every record val generatedRecords = getTuples.asScala.map { r => http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala index 3cb872a..38de368 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala @@ -28,7 +28,7 @@ import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.codegen.AggregationCodeGenerator import org.apache.flink.table.expressions.ExpressionUtils._ import org.apache.flink.table.plan.logical._ import org.apache.flink.table.plan.nodes.CommonAggregate @@ -109,7 +109,7 @@ class DataSetWindowAggregate( val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) - val generator = new CodeGenerator( + val generator = new AggregationCodeGenerator( tableEnv.getConfig, false, inputDS.getType) @@ -147,7 +147,7 @@ class DataSetWindowAggregate( } private def createEventTimeTumblingWindowDataSet( - generator: CodeGenerator, + generator: AggregationCodeGenerator, inputDS: DataSet[Row], isTimeWindow: Boolean, isParserCaseSensitive: Boolean): DataSet[Row] = { @@ -210,7 +210,7 @@ class DataSetWindowAggregate( } private[this] def createEventTimeSessionWindowDataSet( - generator: CodeGenerator, + generator: AggregationCodeGenerator, inputDS: DataSet[Row], isParserCaseSensitive: Boolean): DataSet[Row] = { @@ -352,7 +352,7 @@ class DataSetWindowAggregate( } private def createEventTimeSlidingWindowDataSet( - generator: CodeGenerator, + generator: AggregationCodeGenerator, inputDS: DataSet[Row], isTimeWindow: Boolean, size: Long, http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala index 67c5782..d626c46 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala @@ -27,7 +27,7 @@ import org.apache.calcite.rex.RexProgram import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} -import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.codegen.FunctionCodeGenerator import org.apache.flink.table.plan.nodes.CommonCalc import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.runtime.CRowProcessRunner @@ -93,7 +93,7 @@ class DataStreamCalc( val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) - val generator = new CodeGenerator(config, false, inputSchema.physicalTypeInfo) + val generator = new FunctionCodeGenerator(config, false, inputSchema.physicalTypeInfo) val genFunction = generateFunction( generator, http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala index 33bb8cc..12694fc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala @@ -18,13 +18,12 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.flink.api.java.functions.NullByteKeySelector import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} -import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.codegen.AggregationCodeGenerator import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.table.plan.nodes.CommonAggregate import org.apache.flink.table.plan.schema.RowSchema @@ -121,7 +120,7 @@ class DataStreamGroupAggregate( val outRowType = CRowTypeInfo(schema.physicalTypeInfo) - val generator = new CodeGenerator( + val generator = new AggregationCodeGenerator( tableEnv.getConfig, false, inputSchema.physicalTypeInfo) http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index d860cbe..c4ffdb1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -26,11 +26,11 @@ import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger -import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window => DataStreamWindow} +import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.codegen.AggregationCodeGenerator import org.apache.flink.table.expressions.ExpressionUtils._ import org.apache.flink.table.plan.logical._ import org.apache.flink.table.plan.nodes.CommonAggregate @@ -162,7 +162,7 @@ class DataStreamGroupWindowAggregate( s"select: ($aggString)" val nonKeyedAggOpName = s"window: ($window), select: ($aggString)" - val generator = new CodeGenerator( + val generator = new AggregationCodeGenerator( tableEnv.getConfig, false, inputSchema.physicalTypeInfo) http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index c03dac6..34a7fd8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -32,7 +32,7 @@ import org.apache.flink.table.plan.nodes.OverAggregate import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.api.java.functions.NullByteKeySelector -import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.codegen.AggregationCodeGenerator import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} @@ -131,7 +131,7 @@ class DataStreamOverAggregate( "excessive state size. You may specify a retention time of 0 to not clean up the state.") } - val generator = new CodeGenerator( + val generator = new AggregationCodeGenerator( tableEnv.getConfig, false, inputSchema.physicalTypeInfo) @@ -200,7 +200,7 @@ class DataStreamOverAggregate( def createUnboundedAndCurrentRowOverWindow( queryConfig: StreamQueryConfig, - generator: CodeGenerator, + generator: AggregationCodeGenerator, inputDS: DataStream[CRow], isRowTimeType: Boolean, isRowsClause: Boolean): DataStream[CRow] = { @@ -252,7 +252,7 @@ class DataStreamOverAggregate( def createBoundedAndCurrentRowOverWindow( queryConfig: StreamQueryConfig, - generator: CodeGenerator, + generator: AggregationCodeGenerator, inputDS: DataStream[CRow], isRowTimeType: Boolean, isRowsClause: Boolean): DataStream[CRow] = { http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala index d7c490f..1476681 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala @@ -25,7 +25,7 @@ import org.apache.calcite.rel.core.Values import org.apache.calcite.rex.RexLiteral import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} -import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.codegen.InputFormatCodeGenerator import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.runtime.io.CRowValuesInputFormat import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} @@ -63,7 +63,7 @@ class DataStreamValues( val config = tableEnv.getConfig val returnType = CRowTypeInfo(schema.physicalTypeInfo) - val generator = new CodeGenerator(config) + val generator = new InputFormatCodeGenerator(config) // generate code for every record val generatedRecords = getTuples.asScala.map { r =>
