Repository: flink
Updated Branches:
  refs/heads/master 37b4e2cef -> 976d004ce


[FLINK-7934] [table] Clean up and add more EXTRACT tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/976d004c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/976d004c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/976d004c

Branch: refs/heads/master
Commit: 976d004ce8204d4c8020b86368e00c98537c657d
Parents: 4816a6e
Author: twalthr <[email protected]>
Authored: Fri Jan 26 12:59:05 2018 +0100
Committer: twalthr <[email protected]>
Committed: Fri Jan 26 13:20:48 2018 +0100

----------------------------------------------------------------------
 .../flink/table/calcite/FlinkTypeFactory.scala  |  2 +-
 .../table/codegen/calls/ExtractCallGen.scala    | 27 ++++-----
 .../table/codegen/calls/FunctionGenerator.scala |  2 -
 .../table/expressions/ExpressionUtils.scala     | 59 +-------------------
 .../flink/table/expressions/aggregations.scala  | 46 ++++++++-------
 .../flink/table/expressions/symbols.scala       |  2 +-
 .../apache/flink/table/expressions/time.scala   | 37 ++----------
 .../flink/table/api/batch/table/CalcTest.scala  |  8 +--
 .../table/expressions/ScalarFunctionsTest.scala | 41 ++++++++++++++
 9 files changed, 91 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 515a36d..db7ffdb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -322,7 +322,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
 
 object FlinkTypeFactory {
 
-  def typeInfoToSqlTypeName(typeInfo: TypeInformation[_]): SqlTypeName = 
typeInfo match {
+  private def typeInfoToSqlTypeName(typeInfo: TypeInformation[_]): SqlTypeName 
= typeInfo match {
       case BOOLEAN_TYPE_INFO => BOOLEAN
       case BYTE_TYPE_INFO => TINYINT
       case SHORT_TYPE_INFO => SMALLINT

http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ExtractCallGen.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ExtractCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ExtractCallGen.scala
index fe72733..140c7d9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ExtractCallGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ExtractCallGen.scala
@@ -21,32 +21,29 @@ package org.apache.flink.table.codegen.calls
 import java.lang.reflect.Method
 
 import org.apache.calcite.avatica.util.{TimeUnit, TimeUnitRange}
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.table.codegen.CodeGenUtils._
-import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
GeneratedExpression}
 import 
org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
GeneratedExpression}
 
 class ExtractCallGen(returnType: TypeInformation[_], method: Method)
   extends MethodCallGen(returnType, method) {
 
   override def generate(codeGenerator: CodeGenerator, operands: 
Seq[GeneratedExpression])
   : GeneratedExpression = {
-    val unit = getEnum(operands(0)).asInstanceOf[TimeUnitRange].startUnit
-    val sqlTypeName = 
FlinkTypeFactory.typeInfoToSqlTypeName(operands(1).resultType)
+    val unit = getEnum(operands.head).asInstanceOf[TimeUnitRange].startUnit
+    val tpe = operands(1).resultType
     unit match {
       case TimeUnit.YEAR |
            TimeUnit.MONTH |
            TimeUnit.DAY |
            TimeUnit.QUARTER |
-           TimeUnit.DOW |
            TimeUnit.DOY |
            TimeUnit.WEEK |
            TimeUnit.CENTURY |
-           TimeUnit.MILLENNIUM=>
-        sqlTypeName match {
-          case SqlTypeName.TIMESTAMP =>
+           TimeUnit.MILLENNIUM =>
+        tpe match {
+          case SqlTimeTypeInfo.TIMESTAMP =>
             return generateCallIfArgsNotNull(codeGenerator.nullCheck, 
returnType, operands) {
               (terms) =>
                 s"""
@@ -55,7 +52,7 @@ class ExtractCallGen(returnType: TypeInformation[_], method: 
Method)
                    |""".stripMargin
             }
 
-          case SqlTypeName.DATE =>
+          case SqlTimeTypeInfo.DATE =>
             return super.generate(codeGenerator, operands)
 
           case _ => // do nothing
@@ -69,7 +66,7 @@ class ExtractCallGen(returnType: TypeInformation[_], method: 
Method)
         unit match {
           case TimeUnit.QUARTER =>
             s"""
-               |((${terms(1)} % ${factor}) - 1) / 
${unit.multiplier.intValue()} + 1
+               |((${terms(1)} % $factor) - 1) / ${unit.multiplier.intValue()} 
+ 1
                |""".stripMargin
           case _ =>
             if (factor == 1) {
@@ -78,7 +75,7 @@ class ExtractCallGen(returnType: TypeInformation[_], method: 
Method)
                  |""".stripMargin
             } else {
               s"""
-                 |(${terms(1)} % ${factor}) / ${unit.multiplier.intValue()}
+                 |(${terms(1)} % $factor) / ${unit.multiplier.intValue()}
                  |""".stripMargin
             }
         }
@@ -101,10 +98,10 @@ class ExtractCallGen(returnType: TypeInformation[_], 
method: Method)
       case TimeUnit.QUARTER =>
         TimeUnit.YEAR.multiplier.longValue()
       case TimeUnit.YEAR |
-           TimeUnit.DECADE |
            TimeUnit.CENTURY |
            TimeUnit.MILLENNIUM => 1L
-      case _ => throw new CodeGenException("unit %s is NOT 
supported.".format(unit))
+      case _ =>
+        throw new CodeGenException(s"Unit '$unit' is not supported.")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 9cd67c8..2cd7388 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.codegen.calls
 
 import java.lang.reflect.Method
 
-import org.apache.calcite.avatica.SqlType
 import org.apache.calcite.avatica.util.TimeUnitRange
 import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
@@ -445,7 +444,6 @@ object FunctionGenerator {
     Seq(new GenericTypeInfo(classOf[TimeUnitRange]), 
SqlTimeTypeInfo.TIMESTAMP),
     new ExtractCallGen(LONG_TYPE_INFO, BuiltInMethod.UNIX_DATE_EXTRACT.method))
 
-
   addSqlFunction(
     EXTRACT,
     Seq(new GenericTypeInfo(classOf[TimeUnitRange]), SqlTimeTypeInfo.TIME),

http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
index 08abc8f..013c8ac 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
@@ -22,15 +22,11 @@ import java.lang.{Boolean => JBoolean, Byte => JByte, 
Double => JDouble, Float =
 import java.math.{BigDecimal => JBigDecimal}
 import java.sql.{Date, Time, Timestamp}
 
-import org.apache.calcite.avatica.util.TimeUnit
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexBuilder, RexNode}
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime}
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeInfo}
-import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime}
 
 object ExpressionUtils {
 
@@ -137,57 +133,4 @@ object ExpressionUtils {
         }
     }
   }
-
-  // 
----------------------------------------------------------------------------------------------
-  // RexNode conversion functions (see 
org.apache.calcite.sql2rel.StandardConvertletTable)
-  // 
----------------------------------------------------------------------------------------------
-
-  /**
-    * Copy of 
[[org.apache.calcite.sql2rel.StandardConvertletTable#getFactor()]].
-    */
-  private[flink] def getFactor(unit: TimeUnit): JBigDecimal = unit match {
-    case TimeUnit.DAY => java.math.BigDecimal.ONE
-    case TimeUnit.HOUR => TimeUnit.DAY.multiplier
-    case TimeUnit.MINUTE => TimeUnit.HOUR.multiplier
-    case TimeUnit.SECOND => TimeUnit.MINUTE.multiplier
-    case TimeUnit.YEAR => java.math.BigDecimal.ONE
-    case TimeUnit.MONTH => TimeUnit.YEAR.multiplier
-    case _ => throw new IllegalArgumentException("Invalid start unit.")
-  }
-
-  /**
-    * Copy of [[org.apache.calcite.sql2rel.StandardConvertletTable#mod()]].
-    */
-  private[flink] def mod(
-      rexBuilder: RexBuilder,
-      resType: RelDataType,
-      res: RexNode,
-      value: JBigDecimal)
-    : RexNode = {
-    if (value == JBigDecimal.ONE) return res
-    rexBuilder.makeCall(SqlStdOperatorTable.MOD, res, 
rexBuilder.makeExactLiteral(value, resType))
-  }
-
-  /**
-    * Copy of [[org.apache.calcite.sql2rel.StandardConvertletTable#divide()]].
-    */
-  private[flink] def divide(rexBuilder: RexBuilder, res: RexNode, value: 
JBigDecimal): RexNode = {
-    if (value == JBigDecimal.ONE) return res
-    if (value.compareTo(JBigDecimal.ONE) < 0 && value.signum == 1) {
-      try {
-        val reciprocal = JBigDecimal.ONE.divide(value, 
JBigDecimal.ROUND_UNNECESSARY)
-        return rexBuilder.makeCall(
-          SqlStdOperatorTable.MULTIPLY,
-          res,
-          rexBuilder.makeExactLiteral(reciprocal))
-      } catch {
-        case e: ArithmeticException => // ignore
-      }
-    }
-    rexBuilder.makeCall(
-      SqlStdOperatorTable.DIVIDE_INTEGER,
-      res,
-      rexBuilder.makeExactLiteral(value))
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
index 51526b2..47d1519 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
@@ -56,7 +56,7 @@ case class Sum(child: Expression) extends Aggregation {
   override def toString = s"sum($child)"
 
   override private[flink] def toAggCall(name: String)(implicit relBuilder: 
RelBuilder): AggCall = {
-    relBuilder.aggregateCall(SqlStdOperatorTable.SUM, false, null, name, 
child.toRexNode)
+    relBuilder.aggregateCall(SqlStdOperatorTable.SUM, false, false, null, 
name, child.toRexNode)
   }
 
   override private[flink] def resultType = child.resultType
@@ -77,7 +77,7 @@ case class Sum0(child: Expression) extends Aggregation {
   override def toString = s"sum0($child)"
 
   override private[flink] def toAggCall(name: String)(implicit relBuilder: 
RelBuilder): AggCall = {
-    relBuilder.aggregateCall(SqlStdOperatorTable.SUM0, false, null, name, 
child.toRexNode)
+    relBuilder.aggregateCall(SqlStdOperatorTable.SUM0, false, false, null, 
name, child.toRexNode)
   }
 
   override private[flink] def resultType = child.resultType
@@ -86,7 +86,7 @@ case class Sum0(child: Expression) extends Aggregation {
     TypeCheckUtils.assertNumericExpr(child.resultType, "sum0")
 
   override private[flink] def getSqlAggFunction()(implicit relBuilder: 
RelBuilder) =
-    new SqlSumEmptyIsZeroAggFunction()
+    SqlStdOperatorTable.SUM0
 }
 
 case class Min(child: Expression) extends Aggregation {
@@ -94,7 +94,7 @@ case class Min(child: Expression) extends Aggregation {
   override def toString = s"min($child)"
 
   override private[flink] def toAggCall(name: String)(implicit relBuilder: 
RelBuilder): AggCall = {
-    relBuilder.aggregateCall(SqlStdOperatorTable.MIN, false, null, name, 
child.toRexNode)
+    relBuilder.aggregateCall(SqlStdOperatorTable.MIN, false, false, null, 
name, child.toRexNode)
   }
 
   override private[flink] def resultType = child.resultType
@@ -103,7 +103,7 @@ case class Min(child: Expression) extends Aggregation {
     TypeCheckUtils.assertOrderableExpr(child.resultType, "min")
 
   override private[flink] def getSqlAggFunction()(implicit relBuilder: 
RelBuilder) = {
-    new SqlMinMaxAggFunction(MIN)
+    SqlStdOperatorTable.MIN
   }
 }
 
@@ -112,7 +112,7 @@ case class Max(child: Expression) extends Aggregation {
   override def toString = s"max($child)"
 
   override private[flink] def toAggCall(name: String)(implicit relBuilder: 
RelBuilder): AggCall = {
-    relBuilder.aggregateCall(SqlStdOperatorTable.MAX, false, null, name, 
child.toRexNode)
+    relBuilder.aggregateCall(SqlStdOperatorTable.MAX, false, false, null, 
name, child.toRexNode)
   }
 
   override private[flink] def resultType = child.resultType
@@ -121,7 +121,7 @@ case class Max(child: Expression) extends Aggregation {
     TypeCheckUtils.assertOrderableExpr(child.resultType, "max")
 
   override private[flink] def getSqlAggFunction()(implicit relBuilder: 
RelBuilder) = {
-    new SqlMinMaxAggFunction(MAX)
+    SqlStdOperatorTable.MAX
   }
 }
 
@@ -130,13 +130,13 @@ case class Count(child: Expression) extends Aggregation {
   override def toString = s"count($child)"
 
   override private[flink] def toAggCall(name: String)(implicit relBuilder: 
RelBuilder): AggCall = {
-    relBuilder.aggregateCall(SqlStdOperatorTable.COUNT, false, null, name, 
child.toRexNode)
+    relBuilder.aggregateCall(SqlStdOperatorTable.COUNT, false, false, null, 
name, child.toRexNode)
   }
 
   override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO
 
   override private[flink] def getSqlAggFunction()(implicit relBuilder: 
RelBuilder) = {
-    new SqlCountAggFunction("COUNT")
+    SqlStdOperatorTable.COUNT
   }
 }
 
@@ -145,7 +145,7 @@ case class Avg(child: Expression) extends Aggregation {
   override def toString = s"avg($child)"
 
   override private[flink] def toAggCall(name: String)(implicit relBuilder: 
RelBuilder): AggCall = {
-    relBuilder.aggregateCall(SqlStdOperatorTable.AVG, false, null, name, 
child.toRexNode)
+    relBuilder.aggregateCall(SqlStdOperatorTable.AVG, false, false, null, 
name, child.toRexNode)
   }
 
   override private[flink] def resultType = child.resultType
@@ -154,7 +154,7 @@ case class Avg(child: Expression) extends Aggregation {
     TypeCheckUtils.assertNumericExpr(child.resultType, "avg")
 
   override private[flink] def getSqlAggFunction()(implicit relBuilder: 
RelBuilder) = {
-    new SqlAvgAggFunction(AVG)
+    SqlStdOperatorTable.AVG
   }
 }
 
@@ -163,7 +163,8 @@ case class StddevPop(child: Expression) extends Aggregation 
{
   override def toString = s"stddev_pop($child)"
 
   override private[flink] def toAggCall(name: String)(implicit relBuilder: 
RelBuilder): AggCall = {
-    relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV_POP, false, null, 
name, child.toRexNode)
+    relBuilder.aggregateCall(
+      SqlStdOperatorTable.STDDEV_POP, false, false, null, name, 
child.toRexNode)
   }
 
   override private[flink] def resultType = child.resultType
@@ -172,7 +173,7 @@ case class StddevPop(child: Expression) extends Aggregation 
{
     TypeCheckUtils.assertNumericExpr(child.resultType, "stddev_pop")
 
   override private[flink] def getSqlAggFunction()(implicit relBuilder: 
RelBuilder) =
-    new SqlAvgAggFunction(STDDEV_POP)
+    SqlStdOperatorTable.STDDEV_POP
 }
 
 case class StddevSamp(child: Expression) extends Aggregation {
@@ -180,7 +181,8 @@ case class StddevSamp(child: Expression) extends 
Aggregation {
   override def toString = s"stddev_samp($child)"
 
   override private[flink] def toAggCall(name: String)(implicit relBuilder: 
RelBuilder): AggCall = {
-    relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV_SAMP, false, null, 
name, child.toRexNode)
+    relBuilder.aggregateCall(
+      SqlStdOperatorTable.STDDEV_SAMP, false, false, null, name, 
child.toRexNode)
   }
 
   override private[flink] def resultType = child.resultType
@@ -189,7 +191,7 @@ case class StddevSamp(child: Expression) extends 
Aggregation {
     TypeCheckUtils.assertNumericExpr(child.resultType, "stddev_samp")
 
   override private[flink] def getSqlAggFunction()(implicit relBuilder: 
RelBuilder) =
-    new SqlAvgAggFunction(STDDEV_SAMP)
+    SqlStdOperatorTable.STDDEV_SAMP
 }
 
 case class VarPop(child: Expression) extends Aggregation {
@@ -197,7 +199,7 @@ case class VarPop(child: Expression) extends Aggregation {
   override def toString = s"var_pop($child)"
 
   override private[flink] def toAggCall(name: String)(implicit relBuilder: 
RelBuilder): AggCall = {
-    relBuilder.aggregateCall(SqlStdOperatorTable.VAR_POP, false, null, name, 
child.toRexNode)
+    relBuilder.aggregateCall(SqlStdOperatorTable.VAR_POP, false, false, null, 
name, child.toRexNode)
   }
 
   override private[flink] def resultType = child.resultType
@@ -206,7 +208,7 @@ case class VarPop(child: Expression) extends Aggregation {
     TypeCheckUtils.assertNumericExpr(child.resultType, "var_pop")
 
   override private[flink] def getSqlAggFunction()(implicit relBuilder: 
RelBuilder) =
-    new SqlAvgAggFunction(VAR_POP)
+    SqlStdOperatorTable.VAR_POP
 }
 
 case class VarSamp(child: Expression) extends Aggregation {
@@ -214,7 +216,8 @@ case class VarSamp(child: Expression) extends Aggregation {
   override def toString = s"var_samp($child)"
 
   override private[flink] def toAggCall(name: String)(implicit relBuilder: 
RelBuilder): AggCall = {
-    relBuilder.aggregateCall(SqlStdOperatorTable.VAR_SAMP, false, null, name, 
child.toRexNode)
+    relBuilder.aggregateCall(
+      SqlStdOperatorTable.VAR_SAMP, false, false, null, name, child.toRexNode)
   }
 
   override private[flink] def resultType = child.resultType
@@ -223,7 +226,7 @@ case class VarSamp(child: Expression) extends Aggregation {
     TypeCheckUtils.assertNumericExpr(child.resultType, "var_samp")
 
   override private[flink] def getSqlAggFunction()(implicit relBuilder: 
RelBuilder) =
-    new SqlAvgAggFunction(VAR_SAMP)
+    SqlStdOperatorTable.VAR_SAMP
 }
 
 case class AggFunctionCall(
@@ -254,10 +257,11 @@ case class AggFunctionCall(
     }
   }
 
-  override def toString(): String = 
s"${aggregateFunction.getClass.getSimpleName}($args)"
+  override def toString: String = 
s"${aggregateFunction.getClass.getSimpleName}($args)"
 
   override def toAggCall(name: String)(implicit relBuilder: RelBuilder): 
AggCall = {
-    relBuilder.aggregateCall(this.getSqlAggFunction(), false, null, name, 
args.map(_.toRexNode): _*)
+    relBuilder.aggregateCall(
+      this.getSqlAggFunction(), false, false, null, name, 
args.map(_.toRexNode): _*)
   }
 
   override private[flink] def getSqlAggFunction()(implicit relBuilder: 
RelBuilder) = {

http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala
index 0d71fb2..4faf8d3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala
@@ -34,7 +34,7 @@ case class SymbolExpression(symbol: TableSymbol) extends 
LeafExpression {
   override private[flink] def resultType: TypeInformation[_] =
     throw new UnsupportedOperationException("This should not happen. A symbol 
has no result type.")
 
-  def toExpr = this // triggers implicit conversion
+  def toExpr: SymbolExpression = this // triggers implicit conversion
 
   override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
     // dirty hack to pass Java enums to Java from Scala

http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
index 5d75cd4..f231343 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -18,16 +18,13 @@
 
 package org.apache.flink.table.expressions
 
-import org.apache.calcite.avatica.util.{TimeUnit, TimeUnitRange}
+import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.rex._
-import org.apache.calcite.sql.SqlFunctionCategory
-import org.apache.calcite.sql.`type`.{SqlOperandTypeChecker, SqlTypeName}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.table.calcite.FlinkRelBuilder
-import org.apache.flink.table.expressions.ExpressionUtils.{divide, getFactor, 
mod}
 import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
 import org.apache.flink.table.functions.sql.ScalarSqlFunctions
 import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
@@ -75,33 +72,11 @@ case class Extract(timeIntervalUnit: Expression, temporal: 
Expression) extends E
   override def toString: String = s"($temporal).extract($timeIntervalUnit)"
 
   override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
-    // get wrapped Calcite unit
-    val timeUnitRange = timeIntervalUnit
-      .asInstanceOf[SymbolExpression]
-      .symbol
-      .enum
-      .asInstanceOf[TimeUnitRange]
-
-    relBuilder.getRexBuilder
-    // convert RexNodes
-    convertFunction(
-      timeIntervalUnit.toRexNode,
-      temporal.toRexNode,
-      relBuilder.asInstanceOf[FlinkRelBuilder])
-  }
-
-  // Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#convertFunction()]]
-  private def convertFunction(timeUnitRangeRexNode: RexNode,
-                              temporal: RexNode,
-                              relBuilder: FlinkRelBuilder): RexNode = {
-    val rexBuilder = relBuilder.getRexBuilder
-    val resultType = relBuilder
-      .getTypeFactory()
-      .createTypeFromTypeInfo(LONG_TYPE_INFO, isNullable = true)
-    rexBuilder.makeCall(
-      resultType,
-      SqlStdOperatorTable.EXTRACT,
-      Seq(timeUnitRangeRexNode, temporal))
+    relBuilder
+      .getRexBuilder
+      .makeCall(
+        SqlStdOperatorTable.EXTRACT,
+        Seq(timeIntervalUnit.toRexNode, temporal.toRexNode))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
index 5bdd9dc..c2fa647 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
@@ -235,8 +235,6 @@ class CalcTest extends TableTestBase {
     util.verifyTable(resultTable, expected)
   }
 
-  // As stated in https://issues.apache.org/jira/browse/CALCITE-1584, Calcite 
planner doesn't
-  // promise to retain field names.
   @Test
   def testSelectFromGroupedTableWithNonTrivialKey(): Unit = {
     val util = batchTestUtil()
@@ -251,6 +249,8 @@ class CalcTest extends TableTestBase {
           unaryNode(
             "DataSetCalc",
             batchTableNode(0),
+            // As stated in https://issues.apache.org/jira/browse/CALCITE-1584
+            // Calcite planner doesn't promise to retain field names.
             term("select", "a", "c", "UPPER(c) AS $f2")
           ),
           term("groupBy", "$f2"),
@@ -262,8 +262,6 @@ class CalcTest extends TableTestBase {
     util.verifyTable(resultTable, expected)
   }
 
-  // As stated in https://issues.apache.org/jira/browse/CALCITE-1584, Calcite 
planner doesn't
-  // promise to retain field names.
   @Test
   def testSelectFromGroupedTableWithFunctionKey(): Unit = {
     val util = batchTestUtil()
@@ -278,6 +276,8 @@ class CalcTest extends TableTestBase {
           unaryNode(
             "DataSetCalc",
             batchTableNode(0),
+            // As stated in https://issues.apache.org/jira/browse/CALCITE-1584
+            // Calcite planner doesn't promise to retain field names.
             term("select", "a", "c", "MyHashCode$(c) AS $f2")
           ),
           term("groupBy", "$f2"),

http://git-wip-us.apache.org/repos/asf/flink/blob/976d004c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 281dd90..d449fba 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -1425,6 +1425,47 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "f20.extract(YEAR)",
       "EXTRACT(YEAR FROM f20)",
       "2")
+
+    // test SQL only time units
+    testSqlApi(
+      "EXTRACT(MILLENNIUM FROM f18)",
+      "2")
+
+    testSqlApi(
+      "EXTRACT(MILLENNIUM FROM f16)",
+      "2")
+
+    testSqlApi(
+      "EXTRACT(CENTURY FROM f18)",
+      "20")
+
+    testSqlApi(
+      "EXTRACT(CENTURY FROM f16)",
+      "20")
+
+    testSqlApi(
+      "EXTRACT(DOY FROM f18)",
+      "315")
+
+    testSqlApi(
+      "EXTRACT(DOY FROM f16)",
+      "315")
+
+    testSqlApi(
+      "EXTRACT(QUARTER FROM f18)",
+      "4")
+
+    testSqlApi(
+      "EXTRACT(QUARTER FROM f16)",
+      "4")
+
+    testSqlApi(
+      "EXTRACT(WEEK FROM f18)",
+      "45")
+
+    testSqlApi(
+      "EXTRACT(WEEK FROM f16)",
+      "45")
   }
 
   @Test

Reply via email to