This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5da6b3eff29 [FLINK-33407] Remove old expression stack leftovers for
time functions
5da6b3eff29 is described below
commit 5da6b3eff29c14931c86b2018331c40ed256420f
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Tue Oct 31 10:22:54 2023 +0100
[FLINK-33407] Remove old expression stack leftovers for time functions
---
.../expressions/PlannerExpressionConverter.scala | 40 ---
.../flink/table/planner/expressions/time.scala | 309 ---------------------
2 files changed, 349 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index 6a0d7b028ed..8983fbbd64a 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -147,46 +147,6 @@ class PlannerExpressionConverter private extends
ApiExpressionVisitor[PlannerExp
assert(args.size == 1)
Collect(args.head)
- case EXTRACT =>
- assert(args.size == 2)
- Extract(args.head, args.last)
-
- case CURRENT_DATE =>
- assert(args.isEmpty)
- CurrentDate()
-
- case CURRENT_TIME =>
- assert(args.isEmpty)
- CurrentTime()
-
- case CURRENT_TIMESTAMP =>
- assert(args.isEmpty)
- CurrentTimestamp()
-
- case LOCAL_TIME =>
- assert(args.isEmpty)
- LocalTime()
-
- case LOCAL_TIMESTAMP =>
- assert(args.isEmpty)
- LocalTimestamp()
-
- case TEMPORAL_OVERLAPS =>
- assert(args.size == 4)
- TemporalOverlaps(args.head, args(1), args(2), args.last)
-
- case DATE_FORMAT =>
- assert(args.size == 2)
- DateFormat(args.head, args.last)
-
- case TIMESTAMP_DIFF =>
- assert(args.size == 3)
- TimestampDiff(args.head, args(1), args.last)
-
- case TO_TIMESTAMP_LTZ =>
- assert(args.size == 2)
- ToTimestampLtz(args.head, args.last)
-
case AT =>
assert(args.size == 2)
ItemAt(args.head, args.last)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/time.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/time.scala
deleted file mode 100644
index 19295dd36aa..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/time.scala
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.{LocalTimeTypeInfo,
SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.table.planner.calcite.FlinkRelBuilder
-import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
-import org.apache.flink.table.planner.typeutils.TypeInfoCheckUtils
-import
org.apache.flink.table.planner.typeutils.TypeInfoCheckUtils.isTimeInterval
-import org.apache.flink.table.planner.validate.{ValidationFailure,
ValidationResult, ValidationSuccess}
-import org.apache.flink.table.runtime.typeutils.LegacyLocalDateTimeTypeInfo
-import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
-
-import org.apache.calcite.rex._
-
-case class Extract(timeIntervalUnit: PlannerExpression, temporal:
PlannerExpression)
- extends PlannerExpression {
-
- override private[flink] def children: Seq[PlannerExpression] =
timeIntervalUnit :: temporal :: Nil
-
- override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult = {
- if (!TypeInfoCheckUtils.isTemporal(temporal.resultType)) {
- return ValidationFailure(
- s"Extract operator requires Temporal input, " +
- s"but $temporal is of type ${temporal.resultType}")
- }
-
- timeIntervalUnit match {
- case SymbolPlannerExpression(PlannerTimeIntervalUnit.YEAR) |
SymbolPlannerExpression(
- PlannerTimeIntervalUnit.QUARTER) | SymbolPlannerExpression(
- PlannerTimeIntervalUnit.MONTH) |
SymbolPlannerExpression(PlannerTimeIntervalUnit.WEEK) |
- SymbolPlannerExpression(PlannerTimeIntervalUnit.DAY)
- if temporal.resultType == SqlTimeTypeInfo.DATE
- || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
- || temporal.resultType == LocalTimeTypeInfo.LOCAL_DATE
- || temporal.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME
- || temporal.resultType.isInstanceOf[LegacyLocalDateTimeTypeInfo]
- || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS
- || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MONTHS =>
- ValidationSuccess
-
- case SymbolPlannerExpression(PlannerTimeIntervalUnit.HOUR) |
SymbolPlannerExpression(
- PlannerTimeIntervalUnit.MINUTE) | SymbolPlannerExpression(
- PlannerTimeIntervalUnit.SECOND)
- if temporal.resultType == SqlTimeTypeInfo.TIME
- || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
- || temporal.resultType == LocalTimeTypeInfo.LOCAL_TIME
- || temporal.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME
- || temporal.resultType.isInstanceOf[LegacyLocalDateTimeTypeInfo]
- || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS =>
- ValidationSuccess
-
- case _ =>
- ValidationFailure(
- s"Extract operator does not support unit '$timeIntervalUnit' for
input" +
- s" of type '${temporal.resultType}'.")
- }
- }
-
- override def toString: String = s"($temporal).extract($timeIntervalUnit)"
-}
-
-abstract class CurrentTimePoint(targetType: TypeInformation[_], local: Boolean)
- extends LeafExpression {
-
- override private[flink] def resultType: TypeInformation[_] = targetType
-
- override private[flink] def validateInput(): ValidationResult = {
- if (!TypeInfoCheckUtils.isTimePoint(targetType)) {
- ValidationFailure(
- s"CurrentTimePoint operator requires Time Point target type, " +
- s"but get $targetType.")
- } else if (local && targetType == SqlTimeTypeInfo.DATE) {
- ValidationFailure(
- s"Localized CurrentTimePoint operator requires Time or Timestamp
target " +
- s"type, but get $targetType.")
- } else {
- ValidationSuccess
- }
- }
-
- override def toString: String = if (local) {
- s"local$targetType()"
- } else {
- s"current$targetType()"
- }
-}
-
-case class CurrentDate() extends CurrentTimePoint(SqlTimeTypeInfo.DATE, local
= false)
-
-case class CurrentTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local
= false)
-
-case class CurrentTimestamp() extends
CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = false)
-
-case class LocalTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local =
true)
-
-case class LocalTimestamp() extends
CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = true)
-
-/** Determines whether two anchored time intervals overlap. */
-case class TemporalOverlaps(
- leftTimePoint: PlannerExpression,
- leftTemporal: PlannerExpression,
- rightTimePoint: PlannerExpression,
- rightTemporal: PlannerExpression)
- extends PlannerExpression {
-
- override private[flink] def children: Seq[PlannerExpression] =
- Seq(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
-
- override private[flink] def resultType: TypeInformation[_] =
BOOLEAN_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult = {
- if (!TypeInfoCheckUtils.isTimePoint(leftTimePoint.resultType)) {
- return ValidationFailure(
- s"TemporalOverlaps operator requires leftTimePoint to be of type " +
- s"Time Point, but get ${leftTimePoint.resultType}.")
- }
- if (!TypeInfoCheckUtils.isTimePoint(rightTimePoint.resultType)) {
- return ValidationFailure(
- s"TemporalOverlaps operator requires rightTimePoint to be of " +
- s"type Time Point, but get ${rightTimePoint.resultType}.")
- }
- if (leftTimePoint.resultType != rightTimePoint.resultType) {
- return ValidationFailure(
- s"TemporalOverlaps operator requires leftTimePoint and " +
- s"rightTimePoint to be of same type.")
- }
-
- // leftTemporal is point, then it must be comparable with leftTimePoint
- if (TypeInfoCheckUtils.isTimePoint(leftTemporal.resultType)) {
- if (leftTemporal.resultType != leftTimePoint.resultType) {
- return ValidationFailure(
- s"TemporalOverlaps operator requires leftTemporal and " +
- s"leftTimePoint to be of same type if leftTemporal is of type Time
Point.")
- }
- } else if (!isTimeInterval(leftTemporal.resultType)) {
- return ValidationFailure(
- s"TemporalOverlaps operator requires leftTemporal to be of " +
- s"type Time Point or Time Interval.")
- }
-
- // rightTemporal is point, then it must be comparable with rightTimePoint
- if (TypeInfoCheckUtils.isTimePoint(rightTemporal.resultType)) {
- if (rightTemporal.resultType != rightTimePoint.resultType) {
- return ValidationFailure(
- s"TemporalOverlaps operator requires rightTemporal and " +
- s"rightTimePoint to be of same type if rightTemporal is of type
Time Point.")
- }
- } else if (!isTimeInterval(rightTemporal.resultType)) {
- return ValidationFailure(
- s"TemporalOverlaps operator requires rightTemporal to be of " +
- s"type Time Point or Time Interval.")
- }
- ValidationSuccess
- }
-
- override def toString: String = s"temporalOverlaps(${children.mkString(",
")})"
-
- /**
- * Standard conversion of the OVERLAPS operator. Source:
- * [[org.apache.calcite.sql2rel.StandardConvertletTable#convertOverlaps()]]
- */
- private def convertOverlaps(
- leftP: RexNode,
- leftT: RexNode,
- rightP: RexNode,
- rightT: RexNode,
- relBuilder: FlinkRelBuilder): RexNode = {
- val convLeftT = convertOverlapsEnd(relBuilder, leftP, leftT,
leftTemporal.resultType)
- val convRightT = convertOverlapsEnd(relBuilder, rightP, rightT,
rightTemporal.resultType)
-
- // sort end points into start and end, such that (s0 <= e0) and (s1 <= e1).
- val (s0, e0) = buildSwap(relBuilder, leftP, convLeftT)
- val (s1, e1) = buildSwap(relBuilder, rightP, convRightT)
-
- // (e0 >= s1) AND (e1 >= s0)
- val leftPred =
relBuilder.call(FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL, e0, s1)
- val rightPred =
relBuilder.call(FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL, e1, s0)
- relBuilder.call(FlinkSqlOperatorTable.AND, leftPred, rightPred)
- }
-
- private def convertOverlapsEnd(
- relBuilder: FlinkRelBuilder,
- start: RexNode,
- end: RexNode,
- endType: TypeInformation[_]) = {
- if (isTimeInterval(endType)) {
- relBuilder.call(FlinkSqlOperatorTable.DATETIME_PLUS, start, end)
- } else {
- end
- }
- }
-
- private def buildSwap(relBuilder: FlinkRelBuilder, start: RexNode, end:
RexNode) = {
- val le = relBuilder.call(FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL, start,
end)
- val l = relBuilder.call(FlinkSqlOperatorTable.CASE, le, start, end)
- val r = relBuilder.call(FlinkSqlOperatorTable.CASE, le, end, start)
- (l, r)
- }
-}
-
-case class DateFormat(timestamp: PlannerExpression, format: PlannerExpression)
- extends PlannerExpression {
- override private[flink] def children = timestamp :: format :: Nil
-
- override def toString: String = s"$timestamp.dateFormat($format)"
-
- override private[flink] def resultType = STRING_TYPE_INFO
-}
-
-case class TimestampDiff(
- timePointUnit: PlannerExpression,
- timePoint1: PlannerExpression,
- timePoint2: PlannerExpression)
- extends PlannerExpression {
-
- override private[flink] def children: Seq[PlannerExpression] =
- timePointUnit :: timePoint1 :: timePoint2 :: Nil
-
- override private[flink] def validateInput(): ValidationResult = {
- if (!TypeInfoCheckUtils.isTimePoint(timePoint1.resultType)) {
- return ValidationFailure(
- s"$this requires an input time point type, " +
- s"but timePoint1 is of type '${timePoint1.resultType}'.")
- }
-
- if (!TypeInfoCheckUtils.isTimePoint(timePoint2.resultType)) {
- return ValidationFailure(
- s"$this requires an input time point type, " +
- s"but timePoint2 is of type '${timePoint2.resultType}'.")
- }
-
- timePointUnit match {
- case SymbolPlannerExpression(PlannerTimePointUnit.YEAR) |
SymbolPlannerExpression(
- PlannerTimePointUnit.QUARTER) |
SymbolPlannerExpression(PlannerTimePointUnit.MONTH) |
- SymbolPlannerExpression(PlannerTimePointUnit.WEEK) |
SymbolPlannerExpression(
- PlannerTimePointUnit.DAY) |
SymbolPlannerExpression(PlannerTimePointUnit.HOUR) |
- SymbolPlannerExpression(PlannerTimePointUnit.MINUTE) |
SymbolPlannerExpression(
- PlannerTimePointUnit.SECOND)
- if timePoint1.resultType == SqlTimeTypeInfo.DATE
- || timePoint1.resultType == SqlTimeTypeInfo.TIMESTAMP
- || timePoint2.resultType == SqlTimeTypeInfo.DATE
- || timePoint2.resultType == SqlTimeTypeInfo.TIMESTAMP
- || timePoint1.resultType == LocalTimeTypeInfo.LOCAL_DATE
- || timePoint1.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME
- || timePoint2.resultType == LocalTimeTypeInfo.LOCAL_DATE
- || timePoint2.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME =>
- ValidationSuccess
-
- case _ =>
- ValidationFailure(
- s"$this operator does not support unit '$timePointUnit'" +
- s" for input of type ('${timePoint1.resultType}',
'${timePoint2.resultType}').")
- }
- }
-
- override def toString: String = s"timestampDiff(${children.mkString(", ")})"
-
- override private[flink] def resultType = INT_TYPE_INFO
-}
-
-case class ToTimestampLtz(numericEpochTime: PlannerExpression, precision:
PlannerExpression)
- extends PlannerExpression {
-
- override private[flink] def children: Seq[PlannerExpression] =
- numericEpochTime :: precision :: Nil
-
- override private[flink] def validateInput(): ValidationResult = {
- if (
- TypeInfoCheckUtils.assertNumericExpr(numericEpochTime.resultType,
"toTimestampLtz").isFailure
- ) {
- return ValidationFailure(
- s"$this requires numeric type for the first input, " +
- s"but the actual type '${numericEpochTime.resultType}'.")
- }
- if (
- TypeInfoCheckUtils
- .assertNumericExpr(precision.resultType, "toTimestampLtz")
- .isFailure
- ) {
- return ValidationFailure(
- s"$this requires numeric type for the second input, " +
- s"but the actual type '${precision.resultType}'.")
- }
- ValidationSuccess
- }
-
- override def toString: String = s"toTimestampLtz(${children.mkString(", ")})"
-
- override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP
-}