This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d1fa236 [FLINK-11449][table-planner] Represent built-in expressions
as planner expressions
d1fa236 is described below
commit d1fa236bf3c8fcb49868e17d69c3578f47940baf
Author: sunjincheng121 <[email protected]>
AuthorDate: Wed Feb 20 20:09:41 2019 +0800
[FLINK-11449][table-planner] Represent built-in expressions as planner
expressions
This commit introduces `PlannerExpression extends Expression` and lets
built-in expressions extend it.
---
.../scala/org/apache/flink/table/api/windows.scala | 8 ++++----
.../flink/table/expressions/Expression.scala | 8 +++++---
.../flink/table/expressions/InputTypeSpec.scala | 2 +-
.../flink/table/expressions/aggregations.scala | 2 +-
.../org/apache/flink/table/expressions/call.scala | 10 ++++-----
.../flink/table/expressions/collection.scala | 12 +++++------
.../flink/table/expressions/comparison.scala | 2 +-
.../flink/table/expressions/fieldExpression.scala | 2 +-
.../org/apache/flink/table/expressions/logic.scala | 2 +-
.../flink/table/expressions/mathExpressions.scala | 11 ++++++----
.../table/expressions/stringExpressions.scala | 24 +++++++++++-----------
.../apache/flink/table/expressions/subquery.scala | 2 +-
.../org/apache/flink/table/expressions/time.scala | 10 ++++-----
13 files changed, 50 insertions(+), 45 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala
index ee022b1..52e3b67 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala
@@ -33,7 +33,7 @@ case class OverWindow(
private[flink] val preceding: Expression,
private[flink] val following: Expression)
-case class CurrentRow() extends Expression {
+case class CurrentRow() extends PlannerExpression {
override private[flink] def resultType = RowIntervalTypeInfo.INTERVAL_ROWS
override private[flink] def children = Seq()
@@ -41,7 +41,7 @@ case class CurrentRow() extends Expression {
override def toString = "CURRENT ROW"
}
-case class CurrentRange() extends Expression {
+case class CurrentRange() extends PlannerExpression {
override private[flink] def resultType = TimeIntervalTypeInfo.INTERVAL_MILLIS
override private[flink] def children = Seq()
@@ -49,7 +49,7 @@ case class CurrentRange() extends Expression {
override def toString = "CURRENT RANGE"
}
-case class UnboundedRow() extends Expression {
+case class UnboundedRow() extends PlannerExpression {
override private[flink] def resultType = RowIntervalTypeInfo.INTERVAL_ROWS
override private[flink] def children = Seq()
@@ -57,7 +57,7 @@ case class UnboundedRow() extends Expression {
override def toString = "UNBOUNDED ROW"
}
-case class UnboundedRange() extends Expression {
+case class UnboundedRange() extends PlannerExpression {
override private[flink] def resultType = TimeIntervalTypeInfo.INTERVAL_MILLIS
override private[flink] def children = Seq()
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/Expression.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/Expression.scala
index 14d899d..ebfca92 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/Expression.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/Expression.scala
@@ -72,17 +72,19 @@ abstract class Expression extends TreeNode[Expression] {
}
}
-abstract class BinaryExpression extends Expression {
+abstract class PlannerExpression extends Expression
+
+abstract class BinaryExpression extends PlannerExpression {
private[flink] def left: Expression
private[flink] def right: Expression
private[flink] def children = Seq(left, right)
}
-abstract class UnaryExpression extends Expression {
+abstract class UnaryExpression extends PlannerExpression {
private[flink] def child: Expression
private[flink] def children = Seq(child)
}
-abstract class LeafExpression extends Expression {
+abstract class LeafExpression extends PlannerExpression {
private[flink] val children = Nil
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
index 9b2d20f..3506e9a 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.validate._
/**
* Expressions that have strict data type specification on its inputs.
*/
-trait InputTypeSpec extends Expression {
+trait InputTypeSpec extends PlannerExpression {
/**
* Input type specification for each child.
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
index f6ea4bd..afbc756 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
@@ -32,7 +32,7 @@ import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.validate.{ValidationFailure, ValidationResult,
ValidationSuccess}
-abstract sealed class Aggregation extends Expression {
+abstract sealed class Aggregation extends PlannerExpression {
override def toString = s"Aggregate"
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
index bd02791..de423c9 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -41,7 +41,7 @@ import _root_.scala.collection.JavaConverters._
* General expression for unresolved function calls. The function can be a
built-in
* scalar function or a user-defined scalar function.
*/
-case class Call(functionName: String, args: Seq[Expression]) extends
Expression {
+case class Call(functionName: String, args: Seq[Expression]) extends
PlannerExpression {
override private[flink] def children: Seq[Expression] = args
@@ -64,7 +64,7 @@ case class Call(functionName: String, args: Seq[Expression])
extends Expression
* @param agg The aggregation of the over call.
* @param alias The alias of the referenced over window.
*/
-case class UnresolvedOverCall(agg: Expression, alias: Expression) extends
Expression {
+case class UnresolvedOverCall(agg: Expression, alias: Expression) extends
PlannerExpression {
override private[flink] def validateInput() =
ValidationFailure(s"Over window with alias $alias could not be resolved.")
@@ -88,7 +88,7 @@ case class OverCall(
partitionBy: Seq[Expression],
orderBy: Expression,
preceding: Expression,
- following: Expression) extends Expression {
+ following: Expression) extends PlannerExpression {
override def toString: String = s"$agg OVER (" +
s"PARTITION BY (${partitionBy.mkString(", ")}) " +
@@ -261,7 +261,7 @@ case class OverCall(
case class ScalarFunctionCall(
scalarFunction: ScalarFunction,
parameters: Seq[Expression])
- extends Expression {
+ extends PlannerExpression {
private var foundSignature: Option[Array[Class[_]]] = None
@@ -314,7 +314,7 @@ case class TableFunctionCall(
tableFunction: TableFunction[_],
parameters: Seq[Expression],
resultType: TypeInformation[_])
- extends Expression {
+ extends PlannerExpression {
private var aliases: Option[Seq[String]] = None
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/collection.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/collection.scala
index 951ae27..83bf6bd 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/collection.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/collection.scala
@@ -30,7 +30,7 @@ import org.apache.flink.table.validate.{ValidationFailure,
ValidationResult, Val
import scala.collection.JavaConverters._
-case class RowConstructor(elements: Seq[Expression]) extends Expression {
+case class RowConstructor(elements: Seq[Expression]) extends PlannerExpression
{
override private[flink] def children: Seq[Expression] = elements
@@ -59,7 +59,7 @@ case class RowConstructor(elements: Seq[Expression]) extends
Expression {
}
}
-case class ArrayConstructor(elements: Seq[Expression]) extends Expression {
+case class ArrayConstructor(elements: Seq[Expression]) extends
PlannerExpression {
override private[flink] def children: Seq[Expression] = elements
@@ -91,7 +91,7 @@ case class ArrayConstructor(elements: Seq[Expression])
extends Expression {
}
}
-case class MapConstructor(elements: Seq[Expression]) extends Expression {
+case class MapConstructor(elements: Seq[Expression]) extends PlannerExpression
{
override private[flink] def children: Seq[Expression] = elements
override private[flink] def toRexNode(implicit relBuilder: RelBuilder):
RexNode = {
@@ -132,7 +132,7 @@ case class MapConstructor(elements: Seq[Expression])
extends Expression {
}
}
-case class ArrayElement(array: Expression) extends Expression {
+case class ArrayElement(array: Expression) extends PlannerExpression {
override private[flink] def children: Seq[Expression] = Seq(array)
@@ -158,7 +158,7 @@ case class ArrayElement(array: Expression) extends
Expression {
}
}
-case class Cardinality(container: Expression) extends Expression {
+case class Cardinality(container: Expression) extends PlannerExpression {
override private[flink] def children: Seq[Expression] = Seq(container)
@@ -181,7 +181,7 @@ case class Cardinality(container: Expression) extends
Expression {
}
}
-case class ItemAt(container: Expression, key: Expression) extends Expression {
+case class ItemAt(container: Expression, key: Expression) extends
PlannerExpression {
override private[flink] def children: Seq[Expression] = Seq(container, key)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/comparison.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/comparison.scala
index 847d3e7..2bfacb7 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/comparison.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/comparison.scala
@@ -168,7 +168,7 @@ abstract class BetweenComparison(
expr: Expression,
lowerBound: Expression,
upperBound: Expression)
- extends Expression {
+ extends PlannerExpression {
override private[flink] def resultType: TypeInformation[_] =
BasicTypeInfo.BOOLEAN_TYPE_INFO
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
index 9721e62..ef5ab84 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
@@ -27,7 +27,7 @@ import
org.apache.flink.table.functions.sql.StreamRecordTimestampSqlFunction
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
import org.apache.flink.table.validate.{ValidationFailure, ValidationResult,
ValidationSuccess}
-trait NamedExpression extends Expression {
+trait NamedExpression extends PlannerExpression {
private[flink] def name: String
private[flink] def toAttribute: Attribute
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/logic.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/logic.scala
index dfe00cc..17a09e9 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/logic.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/logic.scala
@@ -80,7 +80,7 @@ case class If(
condition: Expression,
ifTrue: Expression,
ifFalse: Expression)
- extends Expression {
+ extends PlannerExpression {
private[flink] def children = Seq(condition, ifTrue, ifFalse)
override private[flink] def resultType = ifTrue.resultType
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
index da214f9..74179c9 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
@@ -118,7 +118,8 @@ case class Cosh(child: Expression) extends UnaryExpression {
override def toString = s"cosh($child)"
}
-case class Log(base: Expression, antilogarithm: Expression) extends Expression
with InputTypeSpec {
+case class Log(base: Expression, antilogarithm: Expression)
+ extends PlannerExpression with InputTypeSpec {
def this(antilogarithm: Expression) = this(null, antilogarithm)
override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
@@ -394,7 +395,7 @@ case class E() extends LeafExpression {
}
}
-case class Rand(seed: Expression) extends Expression with InputTypeSpec {
+case class Rand(seed: Expression) extends PlannerExpression with InputTypeSpec
{
def this() = this(null)
@@ -423,7 +424,8 @@ case class Rand(seed: Expression) extends Expression with
InputTypeSpec {
}
}
-case class RandInteger(seed: Expression, bound: Expression) extends Expression
with InputTypeSpec {
+case class RandInteger(seed: Expression, bound: Expression)
+ extends PlannerExpression with InputTypeSpec {
def this(bound: Expression) = this(null, bound)
@@ -494,7 +496,8 @@ case class UUID() extends LeafExpression {
}
}
-case class Truncate(base: Expression, num: Expression) extends Expression with
InputTypeSpec {
+case class Truncate(base: Expression, num: Expression)
+ extends PlannerExpression with InputTypeSpec {
def this(base: Expression) = this(base, null)
override private[flink] def resultType: TypeInformation[_] = base.resultType
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index d65de86..4dd06cd 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -151,7 +151,7 @@ case class Similar(str: Expression, pattern: Expression)
extends BinaryExpressio
case class Substring(
str: Expression,
begin: Expression,
- length: Expression) extends Expression with InputTypeSpec {
+ length: Expression) extends PlannerExpression with InputTypeSpec {
def this(str: Expression, begin: Expression) = this(str, begin,
CharLength(str))
@@ -175,7 +175,7 @@ case class Substring(
case class Trim(
trimMode: Expression,
trimString: Expression,
- str: Expression) extends Expression {
+ str: Expression) extends PlannerExpression {
override private[flink] def children: Seq[Expression] = trimMode ::
trimString :: str :: Nil
@@ -230,7 +230,7 @@ case class Upper(child: Expression) extends UnaryExpression
with InputTypeSpec {
* Returns the position of string needle in string haystack.
*/
case class Position(needle: Expression, haystack: Expression)
- extends Expression with InputTypeSpec {
+ extends PlannerExpression with InputTypeSpec {
override private[flink] def children: Seq[Expression] = Seq(needle, haystack)
@@ -255,7 +255,7 @@ case class Overlay(
replacement: Expression,
starting: Expression,
position: Expression)
- extends Expression with InputTypeSpec {
+ extends PlannerExpression with InputTypeSpec {
def this(str: Expression, replacement: Expression, starting: Expression) =
this(str, replacement, starting, CharLength(replacement))
@@ -284,7 +284,7 @@ case class Overlay(
* Returns the string that results from concatenating the arguments.
* Returns NULL if any argument is NULL.
*/
-case class Concat(strings: Seq[Expression]) extends Expression with
InputTypeSpec {
+case class Concat(strings: Seq[Expression]) extends PlannerExpression with
InputTypeSpec {
override private[flink] def children: Seq[Expression] = strings
@@ -308,7 +308,7 @@ case class Concat(strings: Seq[Expression]) extends
Expression with InputTypeSpe
* values after the separator argument.
**/
case class ConcatWs(separator: Expression, strings: Seq[Expression])
- extends Expression with InputTypeSpec {
+ extends PlannerExpression with InputTypeSpec {
override private[flink] def children: Seq[Expression] = Seq(separator) ++
strings
@@ -325,7 +325,7 @@ case class ConcatWs(separator: Expression, strings:
Seq[Expression])
}
case class Lpad(text: Expression, len: Expression, pad: Expression)
- extends Expression with InputTypeSpec {
+ extends PlannerExpression with InputTypeSpec {
override private[flink] def children: Seq[Expression] = Seq(text, len, pad)
@@ -342,7 +342,7 @@ case class Lpad(text: Expression, len: Expression, pad:
Expression)
}
case class Rpad(text: Expression, len: Expression, pad: Expression)
- extends Expression with InputTypeSpec {
+ extends PlannerExpression with InputTypeSpec {
override private[flink] def children: Seq[Expression] = Seq(text, len, pad)
@@ -363,7 +363,7 @@ case class Rpad(text: Expression, len: Expression, pad:
Expression)
* being replaced.
*/
case class RegexpReplace(str: Expression, regex: Expression, replacement:
Expression)
- extends Expression with InputTypeSpec {
+ extends PlannerExpression with InputTypeSpec {
override private[flink] def resultType: TypeInformation[_] =
BasicTypeInfo.STRING_TYPE_INFO
@@ -386,7 +386,7 @@ case class RegexpReplace(str: Expression, regex:
Expression, replacement: Expres
* Returns a string extracted with a specified regular expression and a regex
match group index.
*/
case class RegexpExtract(str: Expression, regex: Expression, extractIndex:
Expression)
- extends Expression with InputTypeSpec {
+ extends PlannerExpression with InputTypeSpec {
def this(str: Expression, regex: Expression) = this(str, regex, null)
override private[flink] def resultType: TypeInformation[_] =
BasicTypeInfo.STRING_TYPE_INFO
@@ -526,7 +526,7 @@ case class RTrim(child: Expression) extends UnaryExpression
with InputTypeSpec {
/**
* Returns a string that repeats the base str n times.
*/
-case class Repeat(str: Expression, n: Expression) extends Expression with
InputTypeSpec {
+case class Repeat(str: Expression, n: Expression) extends PlannerExpression
with InputTypeSpec {
override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
@@ -557,7 +557,7 @@ case class Repeat(str: Expression, n: Expression) extends
Expression with InputT
*/
case class Replace(str: Expression,
search: Expression,
- replacement: Expression) extends Expression with InputTypeSpec {
+ replacement: Expression) extends PlannerExpression with InputTypeSpec {
def this(str: Expression, begin: Expression) = this(str, begin,
CharLength(str))
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/subquery.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/subquery.scala
index 9352365..e47d57e 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/subquery.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/subquery.scala
@@ -28,7 +28,7 @@ import org.apache.flink.table.api.StreamTableEnvironment
import org.apache.flink.table.typeutils.TypeCheckUtils._
import org.apache.flink.table.validate.{ValidationFailure, ValidationResult,
ValidationSuccess}
-case class In(expression: Expression, elements: Seq[Expression]) extends
Expression {
+case class In(expression: Expression, elements: Seq[Expression]) extends
PlannerExpression {
override def toString = s"$expression.in(${elements.mkString(", ")})"
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/time.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/time.scala
index b17537e..f860492 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/time.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -33,7 +33,7 @@ import org.apache.flink.table.validate.{ValidationFailure,
ValidationResult, Val
import scala.collection.JavaConversions._
-case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends
Expression {
+case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends
PlannerExpression {
override private[flink] def children: Seq[Expression] = timeIntervalUnit ::
temporal :: Nil
@@ -85,7 +85,7 @@ case class Extract(timeIntervalUnit: Expression, temporal:
Expression) extends E
abstract class TemporalCeilFloor(
timeIntervalUnit: Expression,
temporal: Expression)
- extends Expression {
+ extends PlannerExpression {
override private[flink] def children: Seq[Expression] = timeIntervalUnit ::
temporal :: Nil
@@ -231,7 +231,7 @@ case class TemporalOverlaps(
leftTemporal: Expression,
rightTimePoint: Expression,
rightTemporal: Expression)
- extends Expression {
+ extends PlannerExpression {
override private[flink] def children: Seq[Expression] =
Seq(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
@@ -330,7 +330,7 @@ case class TemporalOverlaps(
}
}
-case class DateFormat(timestamp: Expression, format: Expression) extends
Expression {
+case class DateFormat(timestamp: Expression, format: Expression) extends
PlannerExpression {
override private[flink] def children = timestamp :: format :: Nil
override private[flink] def toRexNode(implicit relBuilder: RelBuilder) =
@@ -345,7 +345,7 @@ case class TimestampDiff(
timePointUnit: Expression,
timePoint1: Expression,
timePoint2: Expression)
- extends Expression {
+ extends PlannerExpression {
override private[flink] def children: Seq[Expression] =
timePointUnit :: timePoint1 :: timePoint2 :: Nil