Repository: flink Updated Branches: refs/heads/master 37b4e2cef -> 976d004ce
[FLINK-7934] [table] Clean up and add more EXTRACT tests Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/976d004c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/976d004c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/976d004c Branch: refs/heads/master Commit: 976d004ce8204d4c8020b86368e00c98537c657d Parents: 4816a6e Author: twalthr <[email protected]> Authored: Fri Jan 26 12:59:05 2018 +0100 Committer: twalthr <[email protected]> Committed: Fri Jan 26 13:20:48 2018 +0100 ---------------------------------------------------------------------- .../flink/table/calcite/FlinkTypeFactory.scala | 2 +- .../table/codegen/calls/ExtractCallGen.scala | 27 ++++----- .../table/codegen/calls/FunctionGenerator.scala | 2 - .../table/expressions/ExpressionUtils.scala | 59 +------------------- .../flink/table/expressions/aggregations.scala | 46 ++++++++------- .../flink/table/expressions/symbols.scala | 2 +- .../apache/flink/table/expressions/time.scala | 37 ++---------- .../flink/table/api/batch/table/CalcTest.scala | 8 +-- .../table/expressions/ScalarFunctionsTest.scala | 41 ++++++++++++++ 9 files changed, 91 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index 515a36d..db7ffdb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -322,7 +322,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp object FlinkTypeFactory { - def typeInfoToSqlTypeName(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match { + private def typeInfoToSqlTypeName(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match { case BOOLEAN_TYPE_INFO => BOOLEAN case BYTE_TYPE_INFO => TINYINT case SHORT_TYPE_INFO => SMALLINT http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ExtractCallGen.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ExtractCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ExtractCallGen.scala index fe72733..140c7d9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ExtractCallGen.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ExtractCallGen.scala @@ -21,32 +21,29 @@ package org.apache.flink.table.codegen.calls import java.lang.reflect.Method import org.apache.calcite.avatica.util.{TimeUnit, TimeUnitRange} -import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} import org.apache.flink.table.codegen.CodeGenUtils._ -import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression} import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression} class ExtractCallGen(returnType: TypeInformation[_], method: Method) extends MethodCallGen(returnType, method) { override def generate(codeGenerator: CodeGenerator, operands: Seq[GeneratedExpression]) : GeneratedExpression = { - val unit = getEnum(operands(0)).asInstanceOf[TimeUnitRange].startUnit - val sqlTypeName = FlinkTypeFactory.typeInfoToSqlTypeName(operands(1).resultType) + val unit = getEnum(operands.head).asInstanceOf[TimeUnitRange].startUnit + val tpe = operands(1).resultType unit match { case TimeUnit.YEAR | TimeUnit.MONTH | TimeUnit.DAY | TimeUnit.QUARTER | - TimeUnit.DOW | TimeUnit.DOY | TimeUnit.WEEK | TimeUnit.CENTURY | - TimeUnit.MILLENNIUM=> - sqlTypeName match { - case SqlTypeName.TIMESTAMP => + TimeUnit.MILLENNIUM => + tpe match { + case SqlTimeTypeInfo.TIMESTAMP => return generateCallIfArgsNotNull(codeGenerator.nullCheck, returnType, operands) { (terms) => s""" @@ -55,7 +52,7 @@ class ExtractCallGen(returnType: TypeInformation[_], method: Method) |""".stripMargin } - case SqlTypeName.DATE => + case SqlTimeTypeInfo.DATE => return super.generate(codeGenerator, operands) case _ => // do nothing @@ -69,7 +66,7 @@ class ExtractCallGen(returnType: TypeInformation[_], method: Method) unit match { case TimeUnit.QUARTER => s""" - |((${terms(1)} % ${factor}) - 1) / ${unit.multiplier.intValue()} + 1 + |((${terms(1)} % $factor) - 1) / ${unit.multiplier.intValue()} + 1 |""".stripMargin case _ => if (factor == 1) { @@ -78,7 +75,7 @@ class ExtractCallGen(returnType: TypeInformation[_], method: Method) |""".stripMargin } else { s""" - |(${terms(1)} % ${factor}) / ${unit.multiplier.intValue()} + |(${terms(1)} % $factor) / ${unit.multiplier.intValue()} |""".stripMargin } } @@ -101,10 +98,10 @@ class ExtractCallGen(returnType: TypeInformation[_], method: Method) case TimeUnit.QUARTER => TimeUnit.YEAR.multiplier.longValue() case TimeUnit.YEAR | - TimeUnit.DECADE | TimeUnit.CENTURY | TimeUnit.MILLENNIUM => 1L - case _ => throw new CodeGenException("unit %s is NOT supported.".format(unit)) + case _ => + throw new CodeGenException(s"Unit '$unit' is not supported.") } } } http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index 9cd67c8..2cd7388 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -20,7 +20,6 @@ package org.apache.flink.table.codegen.calls import java.lang.reflect.Method -import org.apache.calcite.avatica.SqlType import org.apache.calcite.avatica.util.TimeUnitRange import org.apache.calcite.sql.SqlOperator import org.apache.calcite.sql.fun.SqlStdOperatorTable._ @@ -445,7 +444,6 @@ object FunctionGenerator { Seq(new GenericTypeInfo(classOf[TimeUnitRange]), SqlTimeTypeInfo.TIMESTAMP), new ExtractCallGen(LONG_TYPE_INFO, BuiltInMethod.UNIX_DATE_EXTRACT.method)) - addSqlFunction( EXTRACT, Seq(new GenericTypeInfo(classOf[TimeUnitRange]), SqlTimeTypeInfo.TIME), http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala index 08abc8f..013c8ac 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala @@ -22,15 +22,11 @@ import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float = import java.math.{BigDecimal => JBigDecimal} import java.sql.{Date, Time, Timestamp} -import org.apache.calcite.avatica.util.TimeUnit -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rex.{RexBuilder, RexNode} -import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime} import org.apache.flink.table.api.ValidationException import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} -import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime} object ExpressionUtils { @@ -137,57 +133,4 @@ object ExpressionUtils { } } } - - // ---------------------------------------------------------------------------------------------- - // RexNode conversion functions (see org.apache.calcite.sql2rel.StandardConvertletTable) - // ---------------------------------------------------------------------------------------------- - - /** - * Copy of [[org.apache.calcite.sql2rel.StandardConvertletTable#getFactor()]]. - */ - private[flink] def getFactor(unit: TimeUnit): JBigDecimal = unit match { - case TimeUnit.DAY => java.math.BigDecimal.ONE - case TimeUnit.HOUR => TimeUnit.DAY.multiplier - case TimeUnit.MINUTE => TimeUnit.HOUR.multiplier - case TimeUnit.SECOND => TimeUnit.MINUTE.multiplier - case TimeUnit.YEAR => java.math.BigDecimal.ONE - case TimeUnit.MONTH => TimeUnit.YEAR.multiplier - case _ => throw new IllegalArgumentException("Invalid start unit.") - } - - /** - * Copy of [[org.apache.calcite.sql2rel.StandardConvertletTable#mod()]]. - */ - private[flink] def mod( - rexBuilder: RexBuilder, - resType: RelDataType, - res: RexNode, - value: JBigDecimal) - : RexNode = { - if (value == JBigDecimal.ONE) return res - rexBuilder.makeCall(SqlStdOperatorTable.MOD, res, rexBuilder.makeExactLiteral(value, resType)) - } - - /** - * Copy of [[org.apache.calcite.sql2rel.StandardConvertletTable#divide()]]. - */ - private[flink] def divide(rexBuilder: RexBuilder, res: RexNode, value: JBigDecimal): RexNode = { - if (value == JBigDecimal.ONE) return res - if (value.compareTo(JBigDecimal.ONE) < 0 && value.signum == 1) { - try { - val reciprocal = JBigDecimal.ONE.divide(value, JBigDecimal.ROUND_UNNECESSARY) - return rexBuilder.makeCall( - SqlStdOperatorTable.MULTIPLY, - res, - rexBuilder.makeExactLiteral(reciprocal)) - } catch { - case e: ArithmeticException => // ignore - } - } - rexBuilder.makeCall( - SqlStdOperatorTable.DIVIDE_INTEGER, - res, - rexBuilder.makeExactLiteral(value)) - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala index 51526b2..47d1519 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala @@ -56,7 +56,7 @@ case class Sum(child: Expression) extends Aggregation { override def toString = s"sum($child)" override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { - relBuilder.aggregateCall(SqlStdOperatorTable.SUM, false, null, name, child.toRexNode) + relBuilder.aggregateCall(SqlStdOperatorTable.SUM, false, false, null, name, child.toRexNode) } override private[flink] def resultType = child.resultType @@ -77,7 +77,7 @@ case class Sum0(child: Expression) extends Aggregation { override def toString = s"sum0($child)" override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { - relBuilder.aggregateCall(SqlStdOperatorTable.SUM0, false, null, name, child.toRexNode) + relBuilder.aggregateCall(SqlStdOperatorTable.SUM0, false, false, null, name, child.toRexNode) } override private[flink] def resultType = child.resultType @@ -86,7 +86,7 @@ case class Sum0(child: Expression) extends Aggregation { TypeCheckUtils.assertNumericExpr(child.resultType, "sum0") override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = - new SqlSumEmptyIsZeroAggFunction() + SqlStdOperatorTable.SUM0 } case class Min(child: Expression) extends Aggregation { @@ -94,7 +94,7 @@ case class Min(child: Expression) extends Aggregation { override def toString = s"min($child)" override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { - relBuilder.aggregateCall(SqlStdOperatorTable.MIN, false, null, name, child.toRexNode) + relBuilder.aggregateCall(SqlStdOperatorTable.MIN, false, false, null, name, child.toRexNode) } override private[flink] def resultType = child.resultType @@ -103,7 +103,7 @@ case class Min(child: Expression) extends Aggregation { TypeCheckUtils.assertOrderableExpr(child.resultType, "min") override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = { - new SqlMinMaxAggFunction(MIN) + SqlStdOperatorTable.MIN } } @@ -112,7 +112,7 @@ case class Max(child: Expression) extends Aggregation { override def toString = s"max($child)" override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { - relBuilder.aggregateCall(SqlStdOperatorTable.MAX, false, null, name, child.toRexNode) + relBuilder.aggregateCall(SqlStdOperatorTable.MAX, false, false, null, name, child.toRexNode) } override private[flink] def resultType = child.resultType @@ -121,7 +121,7 @@ case class Max(child: Expression) extends Aggregation { TypeCheckUtils.assertOrderableExpr(child.resultType, "max") override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = { - new SqlMinMaxAggFunction(MAX) + SqlStdOperatorTable.MAX } } @@ -130,13 +130,13 @@ case class Count(child: Expression) extends Aggregation { override def toString = s"count($child)" override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { - relBuilder.aggregateCall(SqlStdOperatorTable.COUNT, false, null, name, child.toRexNode) + relBuilder.aggregateCall(SqlStdOperatorTable.COUNT, false, false, null, name, child.toRexNode) } override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = { - new SqlCountAggFunction("COUNT") + SqlStdOperatorTable.COUNT } } @@ -145,7 +145,7 @@ case class Avg(child: Expression) extends Aggregation { override def toString = s"avg($child)" override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { - relBuilder.aggregateCall(SqlStdOperatorTable.AVG, false, null, name, child.toRexNode) + relBuilder.aggregateCall(SqlStdOperatorTable.AVG, false, false, null, name, child.toRexNode) } override private[flink] def resultType = child.resultType @@ -154,7 +154,7 @@ case class Avg(child: Expression) extends Aggregation { TypeCheckUtils.assertNumericExpr(child.resultType, "avg") override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = { - new SqlAvgAggFunction(AVG) + SqlStdOperatorTable.AVG } } @@ -163,7 +163,8 @@ case class StddevPop(child: Expression) extends Aggregation { override def toString = s"stddev_pop($child)" override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { - relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV_POP, false, null, name, child.toRexNode) + relBuilder.aggregateCall( + SqlStdOperatorTable.STDDEV_POP, false, false, null, name, child.toRexNode) } override private[flink] def resultType = child.resultType @@ -172,7 +173,7 @@ case class StddevPop(child: Expression) extends Aggregation { TypeCheckUtils.assertNumericExpr(child.resultType, "stddev_pop") override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = - new SqlAvgAggFunction(STDDEV_POP) + SqlStdOperatorTable.STDDEV_POP } case class StddevSamp(child: Expression) extends Aggregation { @@ -180,7 +181,8 @@ case class StddevSamp(child: Expression) extends Aggregation { override def toString = s"stddev_samp($child)" override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { - relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV_SAMP, false, null, name, child.toRexNode) + relBuilder.aggregateCall( + SqlStdOperatorTable.STDDEV_SAMP, false, false, null, name, child.toRexNode) } override private[flink] def resultType = child.resultType @@ -189,7 +191,7 @@ case class StddevSamp(child: Expression) extends Aggregation { TypeCheckUtils.assertNumericExpr(child.resultType, "stddev_samp") override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = - new SqlAvgAggFunction(STDDEV_SAMP) + SqlStdOperatorTable.STDDEV_SAMP } case class VarPop(child: Expression) extends Aggregation { @@ -197,7 +199,7 @@ case class VarPop(child: Expression) extends Aggregation { override def toString = s"var_pop($child)" override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { - relBuilder.aggregateCall(SqlStdOperatorTable.VAR_POP, false, null, name, child.toRexNode) + relBuilder.aggregateCall(SqlStdOperatorTable.VAR_POP, false, false, null, name, child.toRexNode) } override private[flink] def resultType = child.resultType @@ -206,7 +208,7 @@ case class VarPop(child: Expression) extends Aggregation { TypeCheckUtils.assertNumericExpr(child.resultType, "var_pop") override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = - new SqlAvgAggFunction(VAR_POP) + SqlStdOperatorTable.VAR_POP } case class VarSamp(child: Expression) extends Aggregation { @@ -214,7 +216,8 @@ case class VarSamp(child: Expression) extends Aggregation { override def toString = s"var_samp($child)" override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { - relBuilder.aggregateCall(SqlStdOperatorTable.VAR_SAMP, false, null, name, child.toRexNode) + relBuilder.aggregateCall( + SqlStdOperatorTable.VAR_SAMP, false, false, null, name, child.toRexNode) } override private[flink] def resultType = child.resultType @@ -223,7 +226,7 @@ case class VarSamp(child: Expression) extends Aggregation { TypeCheckUtils.assertNumericExpr(child.resultType, "var_samp") override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = - new SqlAvgAggFunction(VAR_SAMP) + SqlStdOperatorTable.VAR_SAMP } case class AggFunctionCall( @@ -254,10 +257,11 @@ case class AggFunctionCall( } } - override def toString(): String = s"${aggregateFunction.getClass.getSimpleName}($args)" + override def toString: String = s"${aggregateFunction.getClass.getSimpleName}($args)" override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { - relBuilder.aggregateCall(this.getSqlAggFunction(), false, null, name, args.map(_.toRexNode): _*) + relBuilder.aggregateCall( + this.getSqlAggFunction(), false, false, null, name, args.map(_.toRexNode): _*) } override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = { http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala index 0d71fb2..4faf8d3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala @@ -34,7 +34,7 @@ case class SymbolExpression(symbol: TableSymbol) extends LeafExpression { override private[flink] def resultType: TypeInformation[_] = throw new UnsupportedOperationException("This should not happen. A symbol has no result type.") - def toExpr = this // triggers implicit conversion + def toExpr: SymbolExpression = this // triggers implicit conversion override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { // dirty hack to pass Java enums to Java from Scala http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala index 5d75cd4..f231343 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala @@ -18,16 +18,13 @@ package org.apache.flink.table.expressions -import org.apache.calcite.avatica.util.{TimeUnit, TimeUnitRange} +import org.apache.calcite.avatica.util.TimeUnit import org.apache.calcite.rex._ -import org.apache.calcite.sql.SqlFunctionCategory -import org.apache.calcite.sql.`type`.{SqlOperandTypeChecker, SqlTypeName} import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} import org.apache.flink.table.calcite.FlinkRelBuilder -import org.apache.flink.table.expressions.ExpressionUtils.{divide, getFactor, mod} import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit import org.apache.flink.table.functions.sql.ScalarSqlFunctions import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval @@ -75,33 +72,11 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends E override def toString: String = s"($temporal).extract($timeIntervalUnit)" override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { - // get wrapped Calcite unit - val timeUnitRange = timeIntervalUnit - .asInstanceOf[SymbolExpression] - .symbol - .enum - .asInstanceOf[TimeUnitRange] - - relBuilder.getRexBuilder - // convert RexNodes - convertFunction( - timeIntervalUnit.toRexNode, - temporal.toRexNode, - relBuilder.asInstanceOf[FlinkRelBuilder]) - } - - // Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#convertFunction()]] - private def convertFunction(timeUnitRangeRexNode: RexNode, - temporal: RexNode, - relBuilder: FlinkRelBuilder): RexNode = { - val rexBuilder = relBuilder.getRexBuilder - val resultType = relBuilder - .getTypeFactory() - .createTypeFromTypeInfo(LONG_TYPE_INFO, isNullable = true) - rexBuilder.makeCall( - resultType, - SqlStdOperatorTable.EXTRACT, - Seq(timeUnitRangeRexNode, temporal)) + relBuilder + .getRexBuilder + .makeCall( + SqlStdOperatorTable.EXTRACT, + Seq(timeIntervalUnit.toRexNode, temporal.toRexNode)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala index 5bdd9dc..c2fa647 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala @@ -235,8 +235,6 @@ class CalcTest extends TableTestBase { util.verifyTable(resultTable, expected) } - // As stated in https://issues.apache.org/jira/browse/CALCITE-1584, Calcite planner doesn't - // promise to retain field names. @Test def testSelectFromGroupedTableWithNonTrivialKey(): Unit = { val util = batchTestUtil() @@ -251,6 +249,8 @@ class CalcTest extends TableTestBase { unaryNode( "DataSetCalc", batchTableNode(0), + // As stated in https://issues.apache.org/jira/browse/CALCITE-1584 + // Calcite planner doesn't promise to retain field names. term("select", "a", "c", "UPPER(c) AS $f2") ), term("groupBy", "$f2"), @@ -262,8 +262,6 @@ class CalcTest extends TableTestBase { util.verifyTable(resultTable, expected) } - // As stated in https://issues.apache.org/jira/browse/CALCITE-1584, Calcite planner doesn't - // promise to retain field names. @Test def testSelectFromGroupedTableWithFunctionKey(): Unit = { val util = batchTestUtil() @@ -278,6 +276,8 @@ class CalcTest extends TableTestBase { unaryNode( "DataSetCalc", batchTableNode(0), + // As stated in https://issues.apache.org/jira/browse/CALCITE-1584 + // Calcite planner doesn't promise to retain field names. term("select", "a", "c", "MyHashCode$(c) AS $f2") ), term("groupBy", "$f2"), http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala index 281dd90..d449fba 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala @@ -1425,6 +1425,47 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "f20.extract(YEAR)", "EXTRACT(YEAR FROM f20)", "2") + + // test SQL only time units + testSqlApi( + "EXTRACT(MILLENNIUM FROM f18)", + "2") + + testSqlApi( + "EXTRACT(MILLENNIUM FROM f16)", + "2") + + testSqlApi( + "EXTRACT(CENTURY FROM f18)", + "20") + + testSqlApi( + "EXTRACT(CENTURY FROM f16)", + "20") + + testSqlApi( + "EXTRACT(DOY FROM f18)", + "315") + + testSqlApi( + "EXTRACT(DOY FROM f16)", + "315") + + testSqlApi( + "EXTRACT(QUARTER FROM f18)", + "4") + + testSqlApi( + "EXTRACT(QUARTER FROM f16)", + "4") + + testSqlApi( + "EXTRACT(WEEK FROM f18)", + "45") + + testSqlApi( + "EXTRACT(WEEK FROM f16)", + "45") } @Test
