[FLINK-7337] [table] Efficient handling of rowtime timestamps Use Long instead of a SQL Timestamp to represent timestamps internally
This closes #4532. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47944b1b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47944b1b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47944b1b Branch: refs/heads/master Commit: 47944b1bb23136ae498971b3765a0d3fe6bf2f18 Parents: 93d0ae4 Author: twalthr <twal...@apache.org> Authored: Sat Aug 12 13:51:42 2017 +0200 Committer: twalthr <twal...@apache.org> Committed: Wed Aug 23 10:09:21 2017 +0200 ---------------------------------------------------------------------- .../flink/table/api/BatchTableEnvironment.scala | 36 +- .../table/api/StreamTableEnvironment.scala | 245 +++++------ .../flink/table/api/TableEnvironment.scala | 73 ++-- .../calcite/RelTimeIndicatorConverter.scala | 2 +- .../codegen/AggregationCodeGenerator.scala | 25 +- .../flink/table/codegen/CodeGenUtils.scala | 29 +- .../flink/table/codegen/CodeGenerator.scala | 21 +- .../table/codegen/calls/ScalarOperators.scala | 7 +- .../table/functions/ProctimeSqlFunction.scala | 41 -- .../functions/sql/ProctimeSqlFunction.scala | 43 ++ .../utils/UserDefinedFunctionUtils.scala | 6 +- .../DataStreamGroupWindowAggregate.scala | 4 +- .../table/runtime/CRowInputMapRunner.scala | 57 --- .../runtime/CRowInputTupleOutputMapRunner.scala | 94 ---- .../flink/table/runtime/CRowMapRunner.scala | 57 +++ .../runtime/OutputRowtimeProcessFunction.scala | 58 +++ .../table/runtime/RowtimeProcessFunction.scala | 49 +++ .../TimestampSetterProcessFunction.scala | 52 --- ...WrappingTimestampSetterProcessFunction.scala | 61 --- .../table/runtime/aggregate/AggregateUtil.scala | 33 +- .../aggregate/RowTimeBoundedRangeOver.scala | 4 +- .../aggregate/RowTimeBoundedRowsOver.scala | 13 +- .../aggregate/RowTimeSortProcessFunction.scala | 15 +- .../aggregate/RowTimeUnboundedOver.scala | 17 +- .../aggregate/TimeWindowPropertyCollector.scala | 2 +- .../conversion/CRowToJavaTupleMapFunction.scala | 40 ++ .../conversion/CRowToJavaTupleMapRunner.scala | 64 +++ .../conversion/CRowToRowMapFunction.scala | 32 ++ .../CRowToScalaTupleMapFunction.scala | 33 ++ .../conversion/CRowToScalaTupleMapRunner.scala | 56 +++ .../table/typeutils/TimeIndicatorTypeInfo.scala | 13 +- .../runtime/harness/OverWindowHarnessTest.scala | 436 +++++++++---------- .../SortProcessFunctionHarnessTest.scala | 49 +-- .../runtime/stream/TimeAttributesITCase.scala | 10 +- .../runtime/stream/table/TableSinkITCase.scala | 4 +- 35 files changed, 952 insertions(+), 829 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 78667a2..a9d60dd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -153,28 +153,22 @@ abstract class BatchTableEnvironment( physicalTypeInfo: TypeInformation[IN], schema: RowSchema, requestedTypeInfo: TypeInformation[OUT], - functionName: String): - Option[MapFunction[IN, OUT]] = { - - if (requestedTypeInfo.getTypeClass == classOf[Row]) { - // Row to Row, no conversion needed - None - } else { - // some type that is neither Row or CRow - - val converterFunction = generateRowConverterFunction[OUT]( - physicalTypeInfo.asInstanceOf[TypeInformation[Row]], - schema, - requestedTypeInfo, - functionName - ) - - val mapFunction = new MapRunner[IN, OUT]( - converterFunction.name, - converterFunction.code, - converterFunction.returnType) + functionName: String) + : Option[MapFunction[IN, OUT]] = { + + val converterFunction = generateRowConverterFunction[OUT]( + physicalTypeInfo.asInstanceOf[TypeInformation[Row]], + schema, + requestedTypeInfo, + functionName + ) - Some(mapFunction) + // add a runner if we need conversion + converterFunction.map { func => + new MapRunner[IN, OUT]( + func.name, + func.code, + func.returnType) } } http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index c4e1450..8d8cebb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -23,15 +23,15 @@ import _root_.java.util.concurrent.atomic.AtomicInteger import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.plan.hep.HepMatchOrder -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelDataTypeFieldImpl, RelRecordType} import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelDataTypeFieldImpl, RelRecordType} import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.{RuleSet, RuleSets} import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} -import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.DataStream @@ -44,12 +44,12 @@ import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetr import org.apache.flink.table.plan.rules.FlinkRuleSets import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable} import org.apache.flink.table.plan.util.UpdatingPlanChecker +import org.apache.flink.table.runtime.conversion._ import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} -import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner, WrappingTimestampSetterProcessFunction} +import org.apache.flink.table.runtime.{CRowMapRunner, OutputRowtimeProcessFunction} import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink} import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource} import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils} -import org.apache.flink.types.Row import _root_.scala.collection.JavaConverters._ @@ -223,38 +223,33 @@ abstract class StreamTableEnvironment( /** * Creates a final converter that maps the internal row type to external type. * - * @param physicalTypeInfo the input of the sink + * @param inputTypeInfo the input of the sink * @param schema the input schema with correct field names (esp. for POJO field mapping) * @param requestedTypeInfo the output type of the sink * @param functionName name of the map function. Must not be unique but has to be a * valid Java class identifier. */ - protected def getConversionMapper[IN, OUT]( - physicalTypeInfo: TypeInformation[IN], + protected def getConversionMapper[OUT]( + inputTypeInfo: TypeInformation[CRow], schema: RowSchema, requestedTypeInfo: TypeInformation[OUT], - functionName: String): - MapFunction[IN, OUT] = { - - if (requestedTypeInfo.getTypeClass == classOf[Row]) { - // CRow to Row, only needs to be unwrapped - new MapFunction[CRow, Row] { - override def map(value: CRow): Row = value.row - }.asInstanceOf[MapFunction[IN, OUT]] - } else { - // Some type that is neither CRow nor Row - val converterFunction = generateRowConverterFunction[OUT]( - physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType, - schema, - requestedTypeInfo, - functionName - ) + functionName: String) + : MapFunction[CRow, OUT] = { + + val converterFunction = generateRowConverterFunction[OUT]( + inputTypeInfo.asInstanceOf[CRowTypeInfo].rowType, + schema, + requestedTypeInfo, + functionName + ) - new CRowInputMapRunner[OUT]( - converterFunction.name, - converterFunction.code, - converterFunction.returnType) - .asInstanceOf[MapFunction[IN, OUT]] + converterFunction match { + + case Some(func) => + new CRowMapRunner[OUT](func.name, func.code, func.returnType) + + case _ => + new CRowToRowMapFunction().asInstanceOf[MapFunction[CRow, OUT]] } } @@ -271,71 +266,62 @@ abstract class StreamTableEnvironment( physicalTypeInfo: TypeInformation[CRow], schema: RowSchema, requestedTypeInfo: TypeInformation[OUT], - functionName: String): - MapFunction[CRow, OUT] = { - - requestedTypeInfo match { - - // Scala tuple - case t: CaseClassTypeInfo[_] - if t.getTypeClass == classOf[(_, _)] && t.getTypeAt(0) == Types.BOOLEAN => - - val reqType = t.getTypeAt(1).asInstanceOf[TypeInformation[Any]] - if (reqType.getTypeClass == classOf[Row]) { - // Requested type is Row. Just rewrap CRow in Tuple2 - new MapFunction[CRow, (Boolean, Row)] { - override def map(cRow: CRow): (Boolean, Row) = { - (cRow.change, cRow.row) - } - }.asInstanceOf[MapFunction[CRow, OUT]] - } else { - // Use a map function to convert Row into requested type and wrap result in Tuple2 - val converterFunction = generateRowConverterFunction( - physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType, - schema, - reqType, - functionName - ) - - new CRowInputScalaTupleOutputMapRunner( - converterFunction.name, - converterFunction.code, - requestedTypeInfo.asInstanceOf[TypeInformation[(Boolean, Any)]]) - .asInstanceOf[MapFunction[CRow, OUT]] + functionName: String) + : MapFunction[CRow, OUT] = requestedTypeInfo match { - } + // Scala tuple + case t: CaseClassTypeInfo[_] + if t.getTypeClass == classOf[(_, _)] && t.getTypeAt(0) == Types.BOOLEAN => - // Java tuple - case t: TupleTypeInfo[_] - if t.getTypeClass == classOf[JTuple2[_, _]] && t.getTypeAt(0) == Types.BOOLEAN => - - val reqType = t.getTypeAt(1).asInstanceOf[TypeInformation[Any]] - if (reqType.getTypeClass == classOf[Row]) { - // Requested type is Row. Just rewrap CRow in Tuple2 - new MapFunction[CRow, JTuple2[JBool, Row]] { - val outT = new JTuple2(true.asInstanceOf[JBool], null.asInstanceOf[Row]) - override def map(cRow: CRow): JTuple2[JBool, Row] = { - outT.f0 = cRow.change - outT.f1 = cRow.row - outT - } - }.asInstanceOf[MapFunction[CRow, OUT]] - } else { - // Use a map function to convert Row into requested type and wrap result in Tuple2 - val converterFunction = generateRowConverterFunction( - physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType, - schema, - reqType, - functionName - ) - - new CRowInputJavaTupleOutputMapRunner( - converterFunction.name, - converterFunction.code, - requestedTypeInfo.asInstanceOf[TypeInformation[JTuple2[JBool, Any]]]) - .asInstanceOf[MapFunction[CRow, OUT]] - } - } + val reqType = t.getTypeAt[Any](1) + + // convert Row into requested type and wrap result in Tuple2 + val converterFunction = generateRowConverterFunction( + physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType, + schema, + reqType, + functionName + ) + + converterFunction match { + + case Some(func) => + new CRowToScalaTupleMapRunner( + func.name, + func.code, + requestedTypeInfo.asInstanceOf[TypeInformation[(Boolean, Any)]] + ).asInstanceOf[MapFunction[CRow, OUT]] + + case _ => + new CRowToScalaTupleMapFunction().asInstanceOf[MapFunction[CRow, OUT]] + } + + // Java tuple + case t: TupleTypeInfo[_] + if t.getTypeClass == classOf[JTuple2[_, _]] && t.getTypeAt(0) == Types.BOOLEAN => + + val reqType = t.getTypeAt[Any](1) + + // convert Row into requested type and wrap result in Tuple2 + val converterFunction = generateRowConverterFunction( + physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType, + schema, + reqType, + functionName + ) + + converterFunction match { + + case Some(func) => + new CRowToJavaTupleMapRunner( + func.name, + func.code, + requestedTypeInfo.asInstanceOf[TypeInformation[JTuple2[JBool, Any]]] + ).asInstanceOf[MapFunction[CRow, OUT]] + + case _ => + new CRowToJavaTupleMapFunction().asInstanceOf[MapFunction[CRow, OUT]] + } } /** @@ -733,16 +719,42 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) + val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + + // convert the input type for the conversion mapper + // the input will be changed in the OutputRowtimeProcessFunction later + val convType = if (rowtimeFields.size > 1) { + throw new TableException( + s"Found more than one rowtime field: [${rowtimeFields.map(_.getName).mkString(", ")}] in " + + s"the table that should be converted to a DataStream.\n" + + s"Please select the rowtime field that should be used as event-time timestamp for the " + + s"DataStream by casting all other fields to TIMESTAMP.") + } else if (rowtimeFields.size == 1) { + val origRowType = plan.getType.asInstanceOf[CRowTypeInfo].rowType + val convFieldTypes = origRowType.getFieldTypes.map { t => + if (FlinkTypeFactory.isRowtimeIndicatorType(t)) { + SqlTimeTypeInfo.TIMESTAMP + } else { + t + } + } + CRowTypeInfo(new RowTypeInfo(convFieldTypes, origRowType.getFieldNames)) + } else { + plan.getType + } + // convert CRow to output type - val conversion = if (withChangeFlag) { + val conversion: MapFunction[CRow, A] = if (withChangeFlag) { getConversionMapperWithChanges( - plan.getType, + convType, new RowSchema(logicalType), tpe, "DataStreamSinkConversion") } else { getConversionMapper( - plan.getType, + convType, new RowSchema(logicalType), tpe, "DataStreamSinkConversion") @@ -750,42 +762,19 @@ abstract class StreamTableEnvironment( val rootParallelism = plan.getParallelism - val rowtimeFields = logicalType.getFieldList.asScala - .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) - - if (rowtimeFields.isEmpty) { + val withRowtime = if (rowtimeFields.isEmpty) { // no rowtime field to set - conversion match { - case mapFunction: MapFunction[CRow, A] => - plan.map(mapFunction) - .returns(tpe) - .name(s"to: ${tpe.getTypeClass.getSimpleName}") - .setParallelism(rootParallelism) - } - } else if (rowtimeFields.size == 1) { - // set the only rowtime field as event-time timestamp for DataStream - val mapFunction = conversion match { - case mapFunction: MapFunction[CRow, A] => mapFunction - case _ => new MapFunction[CRow, A] { - override def map(cRow: CRow): A = cRow.asInstanceOf[A] - } - } - - plan.process( - new WrappingTimestampSetterProcessFunction[A]( - mapFunction, - rowtimeFields.head.getIndex)) - .returns(tpe) - .name(s"to: ${tpe.getTypeClass.getSimpleName}") - .setParallelism(rootParallelism) - + plan.map(conversion) } else { - throw new TableException( - s"Found more than one rowtime field: [${rowtimeFields.map(_.getName).mkString(", ")}] in " + - s"the table that should be converted to a DataStream.\n" + - s"Please select the rowtime field that should be used as event-time timestamp for the " + - s"DataStream by casting all other fields to TIMESTAMP.") + // set the only rowtime field as event-time timestamp for DataStream + // and convert it to SQL timestamp + plan.process(new OutputRowtimeProcessFunction[A](conversion, rowtimeFields.head.getIndex)) } + + withRowtime + .returns(tpe) + .name(s"to: ${tpe.getTypeClass.getSimpleName}") + .setParallelism(rootParallelism) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index b647c51..2e9e18f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -38,7 +38,7 @@ import org.apache.calcite.tools._ import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils._ +import org.apache.flink.api.java.typeutils.{RowTypeInfo, _} import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv} @@ -48,26 +48,23 @@ import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableE import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv} import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema} -import org.apache.flink.table.codegen.{FunctionCodeGenerator, ExpressionReducer, GeneratedFunction} -import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference} -import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ -import org.apache.flink.table.functions.AggregateFunction -import org.apache.flink.table.expressions._ -import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions} -import org.apache.flink.table.functions.{ScalarFunction, TableFunction} +import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction} +import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference, _} +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions, _} +import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction} import org.apache.flink.table.plan.cost.DataSetCostFactory import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode} import org.apache.flink.table.plan.rules.FlinkRuleSets import org.apache.flink.table.plan.schema.{RelTable, RowSchema} import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.{DefinedFieldNames, TableSource} +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import org.apache.flink.table.validate.FunctionCatalog import org.apache.flink.types.Row -import org.apache.flink.api.java.typeutils.RowTypeInfo -import _root_.scala.collection.JavaConverters._ -import _root_.scala.collection.mutable.HashMap import _root_.scala.annotation.varargs +import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable /** * The abstract base class for batch and stream TableEnvironments. @@ -108,10 +105,10 @@ abstract class TableEnvironment(val config: TableConfig) { private[flink] val attrNameCntr: AtomicInteger = new AtomicInteger(0) // registered external catalog names -> catalog - private val externalCatalogs = new HashMap[String, ExternalCatalog] + private val externalCatalogs = new mutable.HashMap[String, ExternalCatalog] /** Returns the table config to define the runtime behavior of the Table API. */ - def getConfig = config + def getConfig: TableConfig = config /** * Returns the operator table for this environment including a custom Calcite configuration. @@ -692,7 +689,7 @@ abstract class TableEnvironment(val config: TableConfig) { case _ => throw new TableException( "Field reference expression or alias on field expression expected.") } - case r: RowTypeInfo => { + case r: RowTypeInfo => exprs.zipWithIndex flatMap { case (UnresolvedFieldReference(name), idx) => Some((idx, name)) @@ -707,8 +704,7 @@ abstract class TableEnvironment(val config: TableConfig) { case _ => throw new TableException( "Field reference expression or alias on field expression expected.") } - - } + case tpe => throw new TableException( s"Source of type $tpe cannot be converted into Table.") } @@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: TableConfig) { throw new TableException("Field name can not be '*'.") } - (fieldNames.toArray, fieldIndexes.toArray) + (fieldNames.toArray, fieldIndexes.toArray) // build fails in Scala 2.10 if not converted } protected def generateRowConverterFunction[OUT]( inputTypeInfo: TypeInformation[Row], schema: RowSchema, requestedTypeInfo: TypeInformation[OUT], - functionName: String): - GeneratedFunction[MapFunction[Row, OUT], OUT] = { + functionName: String) + : Option[GeneratedFunction[MapFunction[Row, OUT], OUT]] = { // validate that at least the field types of physical and logical type match // we do that here to make sure that plan translation was correct if (schema.typeInfo != inputTypeInfo) { throw TableException( s"The field types of physical and logical row types do not match. " + - s"Physical type is [${schema.typeInfo}], Logical type is [${inputTypeInfo}]. " + + s"Physical type is [${schema.typeInfo}], Logical type is [$inputTypeInfo]. " + s"This is a bug and should not happen. Please file an issue.") } + // generic row needs no conversion + if (requestedTypeInfo.isInstanceOf[GenericTypeInfo[_]] && + requestedTypeInfo.getTypeClass == classOf[Row]) { + return None + } + val fieldTypes = schema.fieldTypeInfos val fieldNames = schema.fieldNames - // validate requested type + // check for valid type info if (requestedTypeInfo.getArity != fieldTypes.length) { throw new TableException( - s"Arity[${fieldTypes.length}] of result[${fieldTypes}] does not match " + - s"the number[${requestedTypeInfo.getArity}] of requested type[${requestedTypeInfo}].") + s"Arity [${fieldTypes.length}] of result [$fieldTypes] does not match " + + s"the number[${requestedTypeInfo.getArity}] of requested type [$requestedTypeInfo].") + } + + // check requested types + + def validateFieldType(fieldType: TypeInformation[_]): Unit = fieldType match { + case _: TimeIndicatorTypeInfo => + throw new TableException("The time indicator type is an internal type only.") + case _ => // ok } requestedTypeInfo match { @@ -758,9 +768,10 @@ abstract class TableEnvironment(val config: TableConfig) { throw new TableException(s"POJO does not define field name: $fName") } val requestedTypeInfo = pt.getTypeAt(pojoIdx) + validateFieldType(requestedTypeInfo) if (fType != requestedTypeInfo) { throw new TableException(s"Result field does not match requested type. " + - s"requested: $requestedTypeInfo; Actual: $fType") + s"Requested: $requestedTypeInfo; Actual: $fType") } } @@ -769,6 +780,7 @@ abstract class TableEnvironment(val config: TableConfig) { fieldTypes.zipWithIndex foreach { case (fieldTypeInfo, i) => val requestedTypeInfo = tt.getTypeAt(i) + validateFieldType(requestedTypeInfo) if (fieldTypeInfo != requestedTypeInfo) { throw new TableException(s"Result field does not match requested type. " + s"Requested: $requestedTypeInfo; Actual: $fieldTypeInfo") @@ -781,10 +793,11 @@ abstract class TableEnvironment(val config: TableConfig) { throw new TableException(s"Requested result type is an atomic type but " + s"result[$fieldTypes] has more or less than a single field.") } - val fieldTypeInfo = fieldTypes.head - if (fieldTypeInfo != at) { + val requestedTypeInfo = fieldTypes.head + validateFieldType(requestedTypeInfo) + if (requestedTypeInfo != at) { throw new TableException(s"Result field does not match requested type. " + - s"Requested: $at; Actual: $fieldTypeInfo") + s"Requested: $at; Actual: $requestedTypeInfo") } case _ => @@ -809,11 +822,13 @@ abstract class TableEnvironment(val config: TableConfig) { |return ${conversion.resultTerm}; |""".stripMargin - generator.generateFunction( + val generated = generator.generateFunction( functionName, classOf[MapFunction[Row, OUT]], body, requestedTypeInfo) + + Some(generated) } } @@ -972,7 +987,7 @@ object TableEnvironment { validateType(inputType) inputType match { - case t: CompositeType[_] => 0.until(t.getArity).map(t.getTypeAt(_)).toArray + case t: CompositeType[_] => 0.until(t.getArity).map(i => t.getTypeAt(i)).toArray case a: AtomicType[_] => Array(a.asInstanceOf[TypeInformation[_]]) case tpe => throw new TableException(s"Currently only CompositeType and AtomicType are supported.") http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala index 717a1af..1f88737 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala @@ -27,7 +27,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.calcite.FlinkTypeFactory.{isRowtimeIndicatorType, _} -import org.apache.flink.table.functions.ProctimeSqlFunction +import org.apache.flink.table.functions.sql.ProctimeSqlFunction import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala index 680eb44..25527cc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala @@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableConfig import org.apache.flink.table.codegen.Indenter.toISC import org.apache.flink.table.codegen.CodeGenUtils.newName import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getUserDefinedMethod, signatureToString} import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, SingleElementIterable} @@ -47,7 +48,6 @@ class AggregationCodeGenerator( * * @param name Class name of the function. * Does not need to be unique but has to be a valid Java class identifier. - * @param generator The code generator instance * @param physicalInputTypes Physical input row types * @param aggregates All aggregate functions * @param aggFields Indexes of the input fields for all aggregate functions @@ -68,7 +68,6 @@ class AggregationCodeGenerator( */ def generateAggregations( name: String, - generator: CodeGenerator, physicalInputTypes: Seq[TypeInformation[_]], aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]], aggFields: Array[Array[Int]], @@ -86,28 +85,29 @@ class AggregationCodeGenerator( // get unique function name val funcName = newName(name) // register UDAGGs - val aggs = aggregates.map(a => generator.addReusableFunction(a)) + val aggs = aggregates.map(a => addReusableFunction(a)) // get java types of accumulators val accTypeClasses = aggregates.map { a => a.getClass.getMethod("createAccumulator").getReturnType } val accTypes = accTypeClasses.map(_.getCanonicalName) - // get java classes of input fields - val javaClasses = physicalInputTypes.map(t => t.getTypeClass) // get parameter lists for aggregation functions - val parameters = aggFields.map { inFields => + val parametersCode = aggFields.map { inFields => val fields = for (f <- inFields) yield - s"(${javaClasses(f).getCanonicalName}) input.getField($f)" + s"(${CodeGenUtils.boxedTypeTermForTypeInfo(physicalInputTypes(f))}) input.getField($f)" fields.mkString(", ") } - val methodSignaturesList = aggFields.map { - inFields => for (f <- inFields) yield javaClasses(f) + + // get method signatures + val classes = UserDefinedFunctionUtils.typeInfoToClass(physicalInputTypes) + val methodSignaturesList = aggFields.map { inFields => + inFields.map(classes(_)) } // check and validate the needed methods aggregates.zipWithIndex.map { - case (a, i) => { + case (a, i) => getUserDefinedMethod(a, "accumulate", Array(accTypeClasses(i)) ++ methodSignaturesList(i)) .getOrElse( throw new CodeGenException( @@ -159,7 +159,6 @@ class AggregationCodeGenerator( s"aggregate ${a.getClass.getCanonicalName}'.") ) } - } } def genSetAggregationResults: String = { @@ -208,7 +207,7 @@ class AggregationCodeGenerator( j""" | ${aggs(i)}.accumulate( | ((${accTypes(i)}) accs.getField($i)), - | ${parameters(i)});""".stripMargin + | ${parametersCode(i)});""".stripMargin }.mkString("\n") j"""$sig { @@ -229,7 +228,7 @@ class AggregationCodeGenerator( j""" | ${aggs(i)}.retract( | ((${accTypes(i)}) accs.getField($i)), - | ${parameters(i)});""".stripMargin + | ${parametersCode(i)});""".stripMargin }.mkString("\n") if (needRetract) { http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala index 1d8c926..161f9a3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala @@ -28,7 +28,7 @@ import org.apache.flink.api.common.typeinfo.{FractionalTypeInfo, SqlTimeTypeInfo import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.table.typeutils.{TimeIntervalTypeInfo, TypeCheckUtils} +import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TimeIntervalTypeInfo, TypeCheckUtils} object CodeGenUtils { @@ -90,6 +90,9 @@ object CodeGenUtils { case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]" case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]" + // time indicators are represented as Long even if they seem to be Timestamp + case _: TimeIndicatorTypeInfo => "java.lang.Long" + case _ => tpe.getTypeClass.getCanonicalName } @@ -123,8 +126,10 @@ object CodeGenUtils { def qualifyEnum(enum: Enum[_]): String = enum.getClass.getCanonicalName + "." + enum.name() - def internalToTimePointCode(resultType: TypeInformation[_], resultTerm: String) = + def internalToTimePointCode(resultType: TypeInformation[_], resultTerm: String): String = resultType match { + case _: TimeIndicatorTypeInfo => + resultTerm // time indicators are not modified case SqlTimeTypeInfo.DATE => s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_DATE.method)}($resultTerm)" case SqlTimeTypeInfo.TIME => @@ -133,7 +138,7 @@ object CodeGenUtils { s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_TIMESTAMP.method)}($resultTerm)" } - def timePointToInternalCode(resultType: TypeInformation[_], resultTerm: String) = + def timePointToInternalCode(resultType: TypeInformation[_], resultTerm: String): String = resultType match { case SqlTimeTypeInfo.DATE => s"${qualifyMethod(BuiltInMethod.DATE_TO_INT.method)}($resultTerm)" @@ -157,43 +162,43 @@ object CodeGenUtils { // ---------------------------------------------------------------------------------------------- - def requireNumeric(genExpr: GeneratedExpression) = + def requireNumeric(genExpr: GeneratedExpression): Unit = if (!TypeCheckUtils.isNumeric(genExpr.resultType)) { throw new CodeGenException("Numeric expression type expected, but was " + s"'${genExpr.resultType}'.") } - def requireComparable(genExpr: GeneratedExpression) = + def requireComparable(genExpr: GeneratedExpression): Unit = if (!TypeCheckUtils.isComparable(genExpr.resultType)) { throw new CodeGenException(s"Comparable type expected, but was '${genExpr.resultType}'.") } - def requireString(genExpr: GeneratedExpression) = + def requireString(genExpr: GeneratedExpression): Unit = if (!TypeCheckUtils.isString(genExpr.resultType)) { throw new CodeGenException("String expression type expected.") } - def requireBoolean(genExpr: GeneratedExpression) = + def requireBoolean(genExpr: GeneratedExpression): Unit = if (!TypeCheckUtils.isBoolean(genExpr.resultType)) { throw new CodeGenException("Boolean expression type expected.") } - def requireTemporal(genExpr: GeneratedExpression) = + def requireTemporal(genExpr: GeneratedExpression): Unit = if (!TypeCheckUtils.isTemporal(genExpr.resultType)) { throw new CodeGenException("Temporal expression type expected.") } - def requireTimeInterval(genExpr: GeneratedExpression) = + def requireTimeInterval(genExpr: GeneratedExpression): Unit = if (!TypeCheckUtils.isTimeInterval(genExpr.resultType)) { throw new CodeGenException("Interval expression type expected.") } - def requireArray(genExpr: GeneratedExpression) = + def requireArray(genExpr: GeneratedExpression): Unit = if (!TypeCheckUtils.isArray(genExpr.resultType)) { throw new CodeGenException("Array expression type expected.") } - def requireInteger(genExpr: GeneratedExpression) = + def requireInteger(genExpr: GeneratedExpression): Unit = if (!TypeCheckUtils.isInteger(genExpr.resultType)) { throw new CodeGenException("Integer expression type expected.") } @@ -243,7 +248,7 @@ object CodeGenUtils { val fieldName = pt.getFieldNames()(index) getFieldAccessor(pt.getTypeClass, fieldName) - case _ => throw new CodeGenException(s"Unsupported composite type: '${compType}'") + case _ => throw new CodeGenException(s"Unsupported composite type: '$compType'") } } http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index be55eac..946c6cd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -39,9 +39,9 @@ import org.apache.flink.table.codegen.CodeGenUtils._ import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE} import org.apache.flink.table.codegen.calls.FunctionGenerator import org.apache.flink.table.codegen.calls.ScalarOperators._ -import org.apache.flink.table.functions.sql.ScalarSqlFunctions +import org.apache.flink.table.functions.sql.{ProctimeSqlFunction, ScalarSqlFunctions} import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils -import org.apache.flink.table.functions.{FunctionContext, ProctimeSqlFunction, UserDefinedFunction} +import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import org.apache.flink.table.typeutils.TypeCheckUtils._ @@ -328,13 +328,13 @@ abstract class CodeGenerator( // initial type check if (returnType.getArity != fieldExprs.length) { throw new CodeGenException( - s"Arity[${returnType.getArity}] of result type[$returnType] does not match " + - s"number[${fieldExprs.length}] of expressions[$fieldExprs].") + s"Arity [${returnType.getArity}] of result type [$returnType] does not match " + + s"number [${fieldExprs.length}] of expressions [$fieldExprs].") } if (resultFieldNames.length != fieldExprs.length) { throw new CodeGenException( - s"Arity[${resultFieldNames.length}] of result field names[$resultFieldNames] does not " + - s"match number[${fieldExprs.length}] of expressions[$fieldExprs].") + s"Arity [${resultFieldNames.length}] of result field names [$resultFieldNames] does not " + + s"match number [${fieldExprs.length}] of expressions [$fieldExprs].") } // type check returnType match { @@ -342,8 +342,8 @@ abstract class CodeGenerator( fieldExprs.zipWithIndex foreach { case (fieldExpr, i) if fieldExpr.resultType != pt.getTypeAt(resultFieldNames(i)) => throw new CodeGenException( - s"Incompatible types of expression and result type. Expression[$fieldExpr] type is " + - s"[${fieldExpr.resultType}], result type is [${pt.getTypeAt(resultFieldNames(i))}]") + s"Incompatible types of expression and result type. Expression [$fieldExpr] type is" + + s" [${fieldExpr.resultType}], result type is [${pt.getTypeAt(resultFieldNames(i))}]") case _ => // ok } @@ -359,7 +359,7 @@ abstract class CodeGenerator( case at: AtomicType[_] if at != fieldExprs.head.resultType => throw new CodeGenException( - s"Incompatible types of expression and result type. Expression[${fieldExprs.head}] " + + s"Incompatible types of expression and result type. Expression [${fieldExprs.head}] " + s"type is [${fieldExprs.head.resultType}], result type is [$at]") case _ => // ok @@ -1303,11 +1303,10 @@ abstract class CodeGenerator( private[flink] def generateProctimeTimestamp(): GeneratedExpression = { val resultTerm = newName("result") - val resultTypeTerm = primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP) val resultCode = s""" - |$resultTypeTerm $resultTerm = $contextTerm.timerService().currentProcessingTime(); + |long $resultTerm = $contextTerm.timerService().currentProcessingTime(); |""".stripMargin GeneratedExpression(resultTerm, NEVER_NULL, resultCode, SqlTimeTypeInfo.TIMESTAMP) } http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala index 01e9dff..7de7aca 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala @@ -26,7 +26,7 @@ import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.codegen.CodeGenUtils._ import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression} -import org.apache.flink.table.typeutils.{TimeIntervalTypeInfo, TypeCoercion} +import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TimeIntervalTypeInfo, TypeCoercion} import org.apache.flink.table.typeutils.TypeCheckUtils._ object ScalarOperators { @@ -543,6 +543,11 @@ object ScalarOperators { operand: GeneratedExpression, targetType: TypeInformation[_]) : GeneratedExpression = (operand.resultType, targetType) match { + + // special case: cast from TimeIndicatorTypeInfo to SqlTimeTypeInfo + case (ti: TimeIndicatorTypeInfo, SqlTimeTypeInfo.TIMESTAMP) => + operand.copy(resultType = SqlTimeTypeInfo.TIMESTAMP) // just replace the TypeInformation + // identity casting case (fromTp, toTp) if fromTp == toTp => operand http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala deleted file mode 100644 index 4fb0378..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.functions - -import org.apache.calcite.sql._ -import org.apache.calcite.sql.`type`._ -import org.apache.calcite.sql.validate.SqlMonotonicity - -/** - * Function that materializes a processing time attribute. - * After materialization the result can be used in regular arithmetical calculations. - */ -object ProctimeSqlFunction - extends SqlFunction( - "PROCTIME", - SqlKind.OTHER_FUNCTION, - ReturnTypes.explicit(SqlTypeName.TIMESTAMP), - InferTypes.RETURN_TYPE, - OperandTypes.family(SqlTypeFamily.TIMESTAMP), - SqlFunctionCategory.SYSTEM) { - - override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION - - override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity = - SqlMonotonicity.INCREASING -} http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ProctimeSqlFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ProctimeSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ProctimeSqlFunction.scala new file mode 100644 index 0000000..f30ad2f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ProctimeSqlFunction.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions.sql + +import org.apache.calcite.sql._ +import org.apache.calcite.sql.`type`._ +import org.apache.calcite.sql.validate.SqlMonotonicity + +/** + * Function that materializes a processing time attribute. + * After materialization the result can be used in regular arithmetical calculations. + */ +object ProctimeSqlFunction + extends SqlFunction( + "PROCTIME", + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(SqlTypeName.TIMESTAMP), + InferTypes.RETURN_TYPE, + OperandTypes.family(SqlTypeFamily.TIMESTAMP), + SqlFunctionCategory.SYSTEM) { + + override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION + + override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity = + SqlMonotonicity.INCREASING + + override def isDeterministic: Boolean = false +} http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala index 47469d1..b44c28e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala @@ -175,11 +175,11 @@ object UserDefinedFunctionUtils { } }) { throw new ValidationException( - s"Scala-style variable arguments in '${methodName}' methods are not supported. Please " + + s"Scala-style variable arguments in '$methodName' methods are not supported. Please " + s"add a @scala.annotation.varargs annotation.") } else if (found.length > 1) { throw new ValidationException( - s"Found multiple '${methodName}' methods which match the signature.") + s"Found multiple '$methodName' methods which match the signature.") } found.headOption } @@ -218,7 +218,7 @@ object UserDefinedFunctionUtils { if (methods.isEmpty) { throw new ValidationException( s"Function class '${function.getClass.getCanonicalName}' does not implement at least " + - s"one method named '${methodName}' which is public, not abstract and " + + s"one method named '$methodName' which is public, not abstract and " + s"(in case of table functions) not static.") } http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index ac63be1..0cf86f7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -38,7 +38,7 @@ import org.apache.flink.table.plan.nodes.CommonAggregate import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate._ import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules -import org.apache.flink.table.runtime.TimestampSetterProcessFunction +import org.apache.flink.table.runtime.RowtimeProcessFunction import org.apache.flink.table.runtime.aggregate.AggregateUtil._ import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} @@ -149,7 +149,7 @@ class DataStreamGroupWindowAggregate( inputDS .process( - new TimestampSetterProcessFunction(timeIdx, CRowTypeInfo(inputSchema.typeInfo))) + new RowtimeProcessFunction(timeIdx, CRowTypeInfo(inputSchema.typeInfo))) .setParallelism(inputDS.getParallelism) .name(s"time attribute: ($timeAttribute)") } else { http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala deleted file mode 100644 index 109c6e1..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime - -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.configuration.Configuration -import org.apache.flink.table.codegen.Compiler -import org.apache.flink.table.runtime.types.CRow -import org.apache.flink.types.Row -import org.slf4j.LoggerFactory - -/** - * MapRunner with [[CRow]] input. - */ -class CRowInputMapRunner[OUT]( - name: String, - code: String, - @transient var returnType: TypeInformation[OUT]) - extends RichMapFunction[CRow, OUT] - with ResultTypeQueryable[OUT] - with Compiler[MapFunction[Row, OUT]] { - - val LOG = LoggerFactory.getLogger(this.getClass) - - private var function: MapFunction[Row, OUT] = _ - - override def open(parameters: Configuration): Unit = { - LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") - val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) - LOG.debug("Instantiating MapFunction.") - function = clazz.newInstance() - } - - override def map(in: CRow): OUT = { - function.map(in.row) - } - - override def getProducedType: TypeInformation[OUT] = returnType -} http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala deleted file mode 100644 index 6b3aa44..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime - -import java.lang.{Boolean => JBool} -import java.sql.Timestamp - -import org.apache.calcite.runtime.SqlFunctions -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.configuration.Configuration -import org.apache.flink.table.codegen.Compiler -import org.apache.flink.table.runtime.types.CRow -import org.apache.flink.types.Row -import org.slf4j.LoggerFactory -import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} - -/** - * Convert [[CRow]] to a [[JTuple2]] - */ -class CRowInputJavaTupleOutputMapRunner( - name: String, - code: String, - @transient var returnType: TypeInformation[JTuple2[JBool, Any]]) - extends RichMapFunction[CRow, Any] - with ResultTypeQueryable[JTuple2[JBool, Any]] - with Compiler[MapFunction[Row, Any]] { - - val LOG = LoggerFactory.getLogger(this.getClass) - - private var function: MapFunction[Row, Any] = _ - private var tupleWrapper: JTuple2[JBool, Any] = _ - - override def open(parameters: Configuration): Unit = { - LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") - val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) - LOG.debug("Instantiating MapFunction.") - function = clazz.newInstance() - tupleWrapper = new JTuple2[JBool, Any]() - } - - override def map(in: CRow): JTuple2[JBool, Any] = { - tupleWrapper.f0 = in.change - tupleWrapper.f1 = function.map(in.row) - tupleWrapper - } - - override def getProducedType: TypeInformation[JTuple2[JBool, Any]] = returnType -} - -/** - * Convert [[CRow]] to a [[Tuple2]] - */ -class CRowInputScalaTupleOutputMapRunner( - name: String, - code: String, - @transient var returnType: TypeInformation[(Boolean, Any)]) - extends RichMapFunction[CRow, (Boolean, Any)] - with ResultTypeQueryable[(Boolean, Any)] - with Compiler[MapFunction[Row, Any]] { - - val LOG = LoggerFactory.getLogger(this.getClass) - - private var function: MapFunction[Row, Any] = _ - - override def open(parameters: Configuration): Unit = { - LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") - val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) - LOG.debug("Instantiating MapFunction.") - function = clazz.newInstance() - } - - override def map(in: CRow): (Boolean, Any) = - (in.change, function.map(in.row)) - - override def getProducedType: TypeInformation[(Boolean, Any)] = returnType -} http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowMapRunner.scala new file mode 100644 index 0000000..9ed9188 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowMapRunner.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.slf4j.LoggerFactory + +/** + * MapRunner with [[CRow]] input. + */ +class CRowMapRunner[OUT]( + name: String, + code: String, + @transient var returnType: TypeInformation[OUT]) + extends RichMapFunction[CRow, OUT] + with ResultTypeQueryable[OUT] + with Compiler[MapFunction[Row, OUT]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: MapFunction[Row, OUT] = _ + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating MapFunction.") + function = clazz.newInstance() + } + + override def map(in: CRow): OUT = { + function.map(in.row) + } + + override def getProducedType: TypeInformation[OUT] = returnType +} http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala new file mode 100644 index 0000000..3eaeea3 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.calcite.runtime.SqlFunctions +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.functions.util.FunctionUtils +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.util.Collector + +/** + * Wraps a ProcessFunction and sets a Timestamp field of a CRow as + * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] timestamp. + */ +class OutputRowtimeProcessFunction[OUT]( + function: MapFunction[CRow, OUT], + rowtimeIdx: Int) + extends ProcessFunction[CRow, OUT] { + + override def open(parameters: Configuration): Unit = { + FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) + FunctionUtils.openFunction(function, parameters) + } + + override def processElement( + in: CRow, + ctx: ProcessFunction[CRow, OUT]#Context, + out: Collector[OUT]): Unit = { + + val timestamp = in.row.getField(rowtimeIdx).asInstanceOf[Long] + out.asInstanceOf[TimestampedCollector[_]].setAbsoluteTimestamp(timestamp) + + val convertedTimestamp = SqlFunctions.internalToTimestamp(timestamp) + in.row.setField(rowtimeIdx, convertedTimestamp) + + out.collect(function.map(in)) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/RowtimeProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/RowtimeProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/RowtimeProcessFunction.scala new file mode 100644 index 0000000..e192b07 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/RowtimeProcessFunction.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.util.Collector + +/** + * ProcessFunction to copy a timestamp from a [[org.apache.flink.types.Row]] field into the + * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]]. + */ +class RowtimeProcessFunction( + val rowtimeIdx: Int, + @transient var returnType: TypeInformation[CRow]) + extends ProcessFunction[CRow, CRow] + with ResultTypeQueryable[CRow] { + + override def processElement( + in: CRow, + ctx: ProcessFunction[CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + + val timestamp = in.row.getField(rowtimeIdx).asInstanceOf[Long] + out.asInstanceOf[TimestampedCollector[CRow]].setAbsoluteTimestamp(timestamp) + out.collect(in) + } + + override def getProducedType: TypeInformation[CRow] = returnType +} http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TimestampSetterProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TimestampSetterProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TimestampSetterProcessFunction.scala deleted file mode 100644 index 00961f0..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TimestampSetterProcessFunction.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime - -import java.sql.Timestamp - -import org.apache.calcite.runtime.SqlFunctions -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.streaming.api.operators.TimestampedCollector -import org.apache.flink.table.runtime.types.CRow -import org.apache.flink.util.Collector - -/** - * ProcessFunction to copy a timestamp from a [[org.apache.flink.types.Row]] field into the - * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]]. - */ -class TimestampSetterProcessFunction( - val rowtimeIdx: Int, - @transient var returnType: TypeInformation[CRow]) - extends ProcessFunction[CRow, CRow] - with ResultTypeQueryable[CRow] { - - override def processElement( - in: CRow, - ctx: ProcessFunction[CRow, CRow]#Context, - out: Collector[CRow]): Unit = { - - val timestamp = SqlFunctions.toLong(in.row.getField(rowtimeIdx).asInstanceOf[Timestamp]) - out.asInstanceOf[TimestampedCollector[CRow]].setAbsoluteTimestamp(timestamp) - out.collect(in) - } - - override def getProducedType: TypeInformation[CRow] = returnType -} http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala deleted file mode 100644 index 8f12c30..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime - -import java.sql.Timestamp - -import org.apache.calcite.runtime.SqlFunctions -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} -import org.apache.flink.configuration.Configuration -import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.streaming.api.operators.TimestampedCollector -import org.apache.flink.table.runtime.types.CRow -import org.apache.flink.util.Collector - -/** - * Wraps a ProcessFunction and sets a Timestamp field of a CRow as - * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] timestamp. - */ -class WrappingTimestampSetterProcessFunction[OUT]( - function: MapFunction[CRow, OUT], - rowtimeIdx: Int) - extends ProcessFunction[CRow, OUT] { - - override def open(parameters: Configuration): Unit = { - super.open(parameters) - function match { - case f: RichMapFunction[_, _] => - f.setRuntimeContext(getRuntimeContext) - f.open(parameters) - case _ => - } - } - - override def processElement( - in: CRow, - ctx: ProcessFunction[CRow, OUT]#Context, - out: Collector[OUT]): Unit = { - - val timestamp = SqlFunctions.toLong(in.row.getField(rowtimeIdx).asInstanceOf[Timestamp]) - out.asInstanceOf[TimestampedCollector[_]].setAbsoluteTimestamp(timestamp) - - out.collect(function.map(in)) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 52105e3..6304dc4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -96,7 +96,6 @@ object AggregateUtil { val genFunction = generator.generateAggregations( "UnboundedProcessingOverAggregateHelper", - generator, inputFieldTypeInfo, aggregates, aggFields, @@ -175,7 +174,6 @@ object AggregateUtil { val genFunction = generator.generateAggregations( "NonWindowedAggregationHelper", - generator, inputFieldTypes, aggregates, aggFields, @@ -240,7 +238,6 @@ object AggregateUtil { val genFunction = generator.generateAggregations( "BoundedOverAggregateHelper", - generator, inputFieldTypeInfo, aggregates, aggFields, @@ -372,7 +369,6 @@ object AggregateUtil { val genFunction = generator.generateAggregations( "DataSetAggregatePrepareMapHelper", - generator, inputFieldTypeInfo, aggregates, aggFieldIndexes, @@ -451,7 +447,6 @@ object AggregateUtil { // sliding time-window for partial aggregations val genFunction = generator.generateAggregations( "DataSetAggregatePrepareMapHelper", - generator, physicalInputTypes, aggregates, aggFieldIndexes, @@ -555,7 +550,6 @@ object AggregateUtil { val genPreAggFunction = generator.generateAggregations( "GroupingWindowAggregateHelper", - generator, physicalInputTypes, aggregates, aggFieldIndexes, @@ -572,7 +566,6 @@ object AggregateUtil { val genFinalAggFunction = generator.generateAggregations( "GroupingWindowAggregateHelper", - generator, physicalInputTypes, aggregates, aggFieldIndexes, @@ -714,7 +707,6 @@ object AggregateUtil { val genFunction = generator.generateAggregations( "GroupingWindowAggregateHelper", - generator, physicalInputTypes, aggregates, aggFieldIndexes, @@ -789,7 +781,6 @@ object AggregateUtil { val genFunction = generator.generateAggregations( "GroupingWindowAggregateHelper", - generator, physicalInputTypes, aggregates, aggFieldIndexes, @@ -870,7 +861,6 @@ object AggregateUtil { val genPreAggFunction = generator.generateAggregations( "DataSetAggregatePrepareMapHelper", - generator, inputFieldTypeInfo, aggregates, aggInFields, @@ -897,7 +887,6 @@ object AggregateUtil { val genFinalAggFunction = generator.generateAggregations( "DataSetAggregateFinalHelper", - generator, inputFieldTypeInfo, aggregates, aggInFields, @@ -921,7 +910,6 @@ object AggregateUtil { else { val genFunction = generator.generateAggregations( "DataSetAggregateHelper", - generator, inputFieldTypeInfo, aggregates, aggInFields, @@ -1019,7 +1007,6 @@ object AggregateUtil { val genFunction = generator.generateAggregations( "GroupingWindowAggregateHelper", - generator, inputFieldTypeInfo, aggregates, aggFields, @@ -1214,7 +1201,7 @@ object AggregateUtil { case DECIMAL => new DecimalSumWithRetractAggFunction case sqlType: SqlTypeName => - throw new TableException(s"Sum aggregate does no support type: '${sqlType}'") + throw new TableException(s"Sum aggregate does no support type: '$sqlType'") } } else { aggregates(index) = sqlTypeName match { @@ -1233,7 +1220,7 @@ object AggregateUtil { case DECIMAL => new DecimalSumAggFunction case sqlType: SqlTypeName => - throw new TableException(s"Sum aggregate does no support type: '${sqlType}'") + throw new TableException(s"Sum aggregate does no support type: '$sqlType'") } } @@ -1255,7 +1242,7 @@ object AggregateUtil { case DECIMAL => new DecimalSum0WithRetractAggFunction case sqlType: SqlTypeName => - throw new TableException(s"Sum0 aggregate does no support type: '${sqlType}'") + throw new TableException(s"Sum0 aggregate does no support type: '$sqlType'") } } else { aggregates(index) = sqlTypeName match { @@ -1274,7 +1261,7 @@ object AggregateUtil { case DECIMAL => new DecimalSum0AggFunction case sqlType: SqlTypeName => - throw new TableException(s"Sum0 aggregate does no support type: '${sqlType}'") + throw new TableException(s"Sum0 aggregate does no support type: '$sqlType'") } } @@ -1295,7 +1282,7 @@ object AggregateUtil { case DECIMAL => new DecimalAvgAggFunction case sqlType: SqlTypeName => - throw new TableException(s"Avg aggregate does no support type: '${sqlType}'") + throw new TableException(s"Avg aggregate does no support type: '$sqlType'") } case sqlMinMaxFunction: SqlMinMaxAggFunction => @@ -1322,7 +1309,7 @@ object AggregateUtil { new StringMinWithRetractAggFunction case sqlType: SqlTypeName => throw new TableException( - s"Min with retract aggregate does no support type: '${sqlType}'") + s"Min with retract aggregate does no support type: '$sqlType'") } } else { sqlTypeName match { @@ -1345,7 +1332,7 @@ object AggregateUtil { case VARCHAR | CHAR => new StringMinAggFunction case sqlType: SqlTypeName => - throw new TableException(s"Min aggregate does no support type: '${sqlType}'") + throw new TableException(s"Min aggregate does no support type: '$sqlType'") } } } else { @@ -1371,7 +1358,7 @@ object AggregateUtil { new StringMaxWithRetractAggFunction case sqlType: SqlTypeName => throw new TableException( - s"Max with retract aggregate does no support type: '${sqlType}'") + s"Max with retract aggregate does no support type: '$sqlType'") } } else { sqlTypeName match { @@ -1394,7 +1381,7 @@ object AggregateUtil { case VARCHAR | CHAR => new StringMaxAggFunction case sqlType: SqlTypeName => - throw new TableException(s"Max aggregate does no support type: '${sqlType}'") + throw new TableException(s"Max aggregate does no support type: '$sqlType'") } } } @@ -1463,7 +1450,7 @@ object AggregateUtil { relDataType.head.getIndex } else { throw TableException( - s"Encountered more than one time attribute with the same name: '${relDataType}'") + s"Encountered more than one time attribute with the same name: '$relDataType'") } case e => throw TableException( "The time attribute of window in batch environment should be " + http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala index ceb986d..8a0d682 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala @@ -17,10 +17,8 @@ */ package org.apache.flink.table.runtime.aggregate -import java.sql.Timestamp import java.util.{ArrayList => JArrayList, List => JList} -import org.apache.calcite.runtime.SqlFunctions import org.apache.flink.api.common.state._ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo} @@ -118,7 +116,7 @@ class RowTimeBoundedRangeOver( registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) // triggering timestamp for trigger calculation - val triggeringTs = SqlFunctions.toLong(input.getField(rowTimeIdx).asInstanceOf[Timestamp]) + val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long] val lastTriggeringTs = lastTriggeringTsState.value http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala index 678a3b7..ba65846 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala @@ -17,11 +17,9 @@ */ package org.apache.flink.table.runtime.aggregate -import java.sql.Timestamp import java.util import java.util.{List => JList} -import org.apache.calcite.runtime.SqlFunctions import org.apache.flink.api.common.state._ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo} @@ -29,11 +27,11 @@ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.operators.TimestampedCollector import org.apache.flink.table.api.StreamQueryConfig -import org.apache.flink.types.Row -import org.apache.flink.util.{Collector, Preconditions} import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} -import org.slf4j.LoggerFactory +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} +import org.slf4j.{Logger, LoggerFactory} /** * Process Function for ROWS clause event-time bounded OVER window @@ -56,6 +54,8 @@ class RowTimeBoundedRowsOver( Preconditions.checkNotNull(aggregationStateType) Preconditions.checkNotNull(precedingOffset) + val LOG: Logger = LoggerFactory.getLogger(this.getClass) + private var output: CRow = _ // the state which keeps the last triggering timestamp @@ -73,7 +73,6 @@ class RowTimeBoundedRowsOver( // to this time stamp. private var dataState: MapState[Long, JList[Row]] = _ - val LOG = LoggerFactory.getLogger(this.getClass) private var function: GeneratedAggregations = _ override def open(config: Configuration) { @@ -127,7 +126,7 @@ class RowTimeBoundedRowsOver( registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) // triggering timestamp for trigger calculation - val triggeringTs = SqlFunctions.toLong(input.getField(rowTimeIdx).asInstanceOf[Timestamp]) + val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long] val lastTriggeringTs = lastTriggeringTsState.value // check if the data is expired, if not, save the data and register event time timer http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala index fd58678..0d69355 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala @@ -17,24 +17,17 @@ */ package org.apache.flink.table.runtime.aggregate -import java.sql.Timestamp +import java.util.{Collections, ArrayList => JArrayList, List => JList} -import org.apache.flink.api.common.state.ValueState -import org.apache.flink.api.common.state.ValueStateDescriptor -import org.apache.flink.api.common.state.MapState -import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ValueState, ValueStateDescriptor} import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.ListTypeInfo import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.types.Row import org.apache.flink.util.{Collector, Preconditions} -import java.util.Collections -import java.util.{ArrayList => JArrayList, List => JList} - -import org.apache.calcite.runtime.SqlFunctions -import org.apache.flink.streaming.api.operators.TimestampedCollector /** * ProcessFunction to sort on event-time and possibly addtional secondary sort attributes. @@ -90,7 +83,7 @@ class RowTimeSortProcessFunction( val input = inputC.row // timestamp of the processed row - val rowtime = SqlFunctions.toLong(input.getField(rowtimeIdx).asInstanceOf[Timestamp]) + val rowtime = input.getField(rowtimeIdx).asInstanceOf[Long] val lastTriggeringTs = lastTriggeringTsState.value