This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8e07ca0 [FLINK-13782] Implement type inference for more logical
expressions
8e07ca0 is described below
commit 8e07ca008b6eada80cf7668442b81cffc6a7dc0c
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu May 28 13:53:54 2020 +0200
[FLINK-13782] Implement type inference for more logical expressions
Implemented input & output type inference for:
AND/OR/NOT/IS NULL/IS NOT NULL/IS TRUE/IS FALSE/IS NOT TRUE/IS NOT
FALSE/BETWEEN/NOT BETWEEN
This closes #12387
---
.../functions/BuiltInFunctionDefinitions.java | 50 ++++++--
.../expressions/PlannerExpressionConverter.scala | 44 -------
.../table/planner/expressions/comparison.scala | 127 ---------------------
.../flink/table/planner/expressions/logic.scala | 40 -------
4 files changed, 39 insertions(+), 222 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 2b7c47a..8a72ecc 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -25,6 +25,8 @@ import
org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import
org.apache.flink.table.types.logical.StructuredType.StructuredComparision;
import org.apache.flink.util.Preconditions;
import java.lang.reflect.Field;
@@ -41,9 +43,12 @@ import static
org.apache.flink.table.types.inference.InputTypeStrategies.OUTPUT_
import static
org.apache.flink.table.types.inference.InputTypeStrategies.TWO_EQUALS_COMPARABLE;
import static
org.apache.flink.table.types.inference.InputTypeStrategies.TWO_FULLY_COMPARABLE;
import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
+import static
org.apache.flink.table.types.inference.InputTypeStrategies.comparable;
import static
org.apache.flink.table.types.inference.InputTypeStrategies.logical;
import static org.apache.flink.table.types.inference.InputTypeStrategies.or;
+import static
org.apache.flink.table.types.inference.InputTypeStrategies.sequence;
import static
org.apache.flink.table.types.inference.InputTypeStrategies.varyingSequence;
+import static
org.apache.flink.table.types.inference.InputTypeStrategies.wildcardWithCount;
import static org.apache.flink.table.types.inference.TypeStrategies.explicit;
import static org.apache.flink.table.types.inference.TypeStrategies.nullable;
@@ -58,19 +63,34 @@ public final class BuiltInFunctionDefinitions {
new BuiltInFunctionDefinition.Builder()
.name("and")
.kind(SCALAR)
- .outputTypeStrategy(TypeStrategies.MISSING)
+ .inputTypeStrategy(
+ varyingSequence(
+ logical(LogicalTypeRoot.BOOLEAN),
+ logical(LogicalTypeRoot.BOOLEAN),
+ logical(LogicalTypeRoot.BOOLEAN)
+ )
+ )
+
.outputTypeStrategy(nullable(explicit(DataTypes.BOOLEAN())))
.build();
public static final BuiltInFunctionDefinition OR =
new BuiltInFunctionDefinition.Builder()
.name("or")
.kind(SCALAR)
- .outputTypeStrategy(TypeStrategies.MISSING)
+ .inputTypeStrategy(
+ varyingSequence(
+ logical(LogicalTypeRoot.BOOLEAN),
+ logical(LogicalTypeRoot.BOOLEAN),
+ logical(LogicalTypeRoot.BOOLEAN)
+ )
+ )
+
.outputTypeStrategy(nullable(explicit(DataTypes.BOOLEAN())))
.build();
public static final BuiltInFunctionDefinition NOT =
new BuiltInFunctionDefinition.Builder()
.name("not")
.kind(SCALAR)
- .outputTypeStrategy(TypeStrategies.MISSING)
+
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
+
.outputTypeStrategy(nullable(explicit(DataTypes.BOOLEAN())))
.build();
public static final BuiltInFunctionDefinition IF =
new BuiltInFunctionDefinition.Builder()
@@ -126,49 +146,57 @@ public final class BuiltInFunctionDefinitions {
new BuiltInFunctionDefinition.Builder()
.name("isNull")
.kind(SCALAR)
- .outputTypeStrategy(TypeStrategies.MISSING)
+
.inputTypeStrategy(wildcardWithCount(ConstantArgumentCount.of(1)))
+
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
.build();
public static final BuiltInFunctionDefinition IS_NOT_NULL =
new BuiltInFunctionDefinition.Builder()
.name("isNotNull")
.kind(SCALAR)
- .outputTypeStrategy(TypeStrategies.MISSING)
+
.inputTypeStrategy(wildcardWithCount(ConstantArgumentCount.of(1)))
+
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
.build();
public static final BuiltInFunctionDefinition IS_TRUE =
new BuiltInFunctionDefinition.Builder()
.name("isTrue")
.kind(SCALAR)
- .outputTypeStrategy(TypeStrategies.MISSING)
+
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
+
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
.build();
public static final BuiltInFunctionDefinition IS_FALSE =
new BuiltInFunctionDefinition.Builder()
.name("isFalse")
.kind(SCALAR)
- .outputTypeStrategy(TypeStrategies.MISSING)
+
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
+
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
.build();
public static final BuiltInFunctionDefinition IS_NOT_TRUE =
new BuiltInFunctionDefinition.Builder()
.name("isNotTrue")
.kind(SCALAR)
- .outputTypeStrategy(TypeStrategies.MISSING)
+
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
+
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
.build();
public static final BuiltInFunctionDefinition IS_NOT_FALSE =
new BuiltInFunctionDefinition.Builder()
.name("isNotFalse")
.kind(SCALAR)
- .outputTypeStrategy(TypeStrategies.MISSING)
+
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
+
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
.build();
public static final BuiltInFunctionDefinition BETWEEN =
new BuiltInFunctionDefinition.Builder()
.name("between")
.kind(SCALAR)
- .outputTypeStrategy(TypeStrategies.MISSING)
+
.inputTypeStrategy(comparable(ConstantArgumentCount.of(3),
StructuredComparision.FULL))
+
.outputTypeStrategy(nullable(explicit(DataTypes.BOOLEAN())))
.build();
public static final BuiltInFunctionDefinition NOT_BETWEEN =
new BuiltInFunctionDefinition.Builder()
.name("notBetween")
.kind(SCALAR)
- .outputTypeStrategy(TypeStrategies.MISSING)
+
.inputTypeStrategy(comparable(ConstantArgumentCount.of(3),
StructuredComparision.FULL))
+
.outputTypeStrategy(nullable(explicit(DataTypes.BOOLEAN())))
.build();
// aggregate functions
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index 97a3c19..ea98b0d 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -160,58 +160,14 @@ class PlannerExpressionConverter private extends
ApiExpressionVisitor[PlannerExp
expr.validateInput()
expr
- case AND =>
- assert(args.size >= 2)
- args.reduceLeft(And)
-
- case OR =>
- assert(args.size >= 2)
- args.reduceLeft(Or)
-
- case NOT =>
- assert(args.size == 1)
- Not(args.head)
-
case IN =>
assert(args.size > 1)
In(args.head, args.drop(1))
- case IS_NULL =>
- assert(args.size == 1)
- IsNull(args.head)
-
- case IS_NOT_NULL =>
- assert(args.size == 1)
- IsNotNull(args.head)
-
- case IS_TRUE =>
- assert(args.size == 1)
- IsTrue(args.head)
-
- case IS_FALSE =>
- assert(args.size == 1)
- IsFalse(args.head)
-
- case IS_NOT_TRUE =>
- assert(args.size == 1)
- IsNotTrue(args.head)
-
- case IS_NOT_FALSE =>
- assert(args.size == 1)
- IsNotFalse(args.head)
-
case IF =>
assert(args.size == 3)
If(args.head, args(1), args.last)
- case BETWEEN =>
- assert(args.size == 3)
- Between(args.head, args(1), args.last)
-
- case NOT_BETWEEN =>
- assert(args.size == 3)
- NotBetween(args.head, args(1), args.last)
-
case DISTINCT =>
assert(args.size == 1)
DistinctAgg(args.head)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala
deleted file mode 100644
index b72d4d6..0000000
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
-import org.apache.flink.table.planner.typeutils.TypeInfoCheckUtils.{isArray,
isComparable, isNumeric}
-import org.apache.flink.table.planner.validate._
-import
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
-
-import org.apache.calcite.sql.SqlOperator
-
-abstract class BinaryComparison extends BinaryExpression {
- private[flink] def sqlOperator: SqlOperator
-
- override private[flink] def resultType = BOOLEAN_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult =
- (left.resultType, right.resultType) match {
- case (lType, rType) if isNumeric(lType) && isNumeric(rType) =>
ValidationSuccess
- case (lType, rType) if isComparable(lType) && lType == rType =>
ValidationSuccess
- case (lType, rType) if isComparable(lType) &&
- fromTypeInfoToLogicalType(lType) == fromTypeInfoToLogicalType(rType)
=>
- ValidationSuccess
- case (lType, rType) =>
- ValidationFailure(
- s"Comparison is only supported for numeric types and " +
- s"comparable types of same type, got $lType and $rType")
- }
-}
-
-case class IsNull(child: PlannerExpression) extends UnaryExpression {
- override def toString = s"($child).isNull"
-
- override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
-
-case class IsNotNull(child: PlannerExpression) extends UnaryExpression {
- override def toString = s"($child).isNotNull"
-
- override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
-
-case class IsTrue(child: PlannerExpression) extends UnaryExpression {
- override def toString = s"($child).isTrue"
-
- override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
-
-case class IsFalse(child: PlannerExpression) extends UnaryExpression {
- override def toString = s"($child).isFalse"
-
- override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
-
-case class IsNotTrue(child: PlannerExpression) extends UnaryExpression {
- override def toString = s"($child).isNotTrue"
-
- override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
-
-case class IsNotFalse(child: PlannerExpression) extends UnaryExpression {
- override def toString = s"($child).isNotFalse"
-
- override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
-
-abstract class BetweenComparison(
- expr: PlannerExpression,
- lowerBound: PlannerExpression,
- upperBound: PlannerExpression)
- extends PlannerExpression {
-
- override private[flink] def resultType: TypeInformation[_] =
BasicTypeInfo.BOOLEAN_TYPE_INFO
-
- override private[flink] def children: Seq[PlannerExpression] = Seq(expr,
lowerBound, upperBound)
-
- override private[flink] def validateInput(): ValidationResult = {
- (expr.resultType, lowerBound.resultType, upperBound.resultType) match {
- case (exprType, lowerType, upperType)
- if isNumeric(exprType) && isNumeric(lowerType) &&
isNumeric(upperType) =>
- ValidationSuccess
- case (exprType, lowerType, upperType)
- if isComparable(exprType) && exprType == lowerType && exprType ==
upperType =>
- ValidationSuccess
- case (exprType, lowerType, upperType) =>
- ValidationFailure(
- s"Between is only supported for numeric types and " +
- s"identical comparable types, but got $exprType, $lowerType and
$upperType"
- )
- }
- }
-}
-
-case class Between(
- expr: PlannerExpression,
- lowerBound: PlannerExpression,
- upperBound: PlannerExpression)
- extends BetweenComparison(expr, lowerBound, upperBound) {
-
- override def toString: String = s"($expr).between($lowerBound, $upperBound)"
-}
-
-case class NotBetween(
- expr: PlannerExpression,
- lowerBound: PlannerExpression,
- upperBound: PlannerExpression)
- extends BetweenComparison(expr, lowerBound, upperBound) {
-
- override def toString: String = s"($expr).notBetween($lowerBound,
$upperBound)"
-}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/logic.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/logic.scala
index ec6c88f..2d3bd70 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/logic.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/logic.scala
@@ -22,46 +22,6 @@ import org.apache.flink.table.planner.validate._
import
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isDecimal
-abstract class BinaryPredicate extends BinaryExpression {
- override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult = {
- if (left.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
- right.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
- ValidationSuccess
- } else {
- ValidationFailure(s"$this only accepts children of Boolean type, " +
- s"get $left : ${left.resultType} and $right : ${right.resultType}")
- }
- }
-}
-
-case class Not(child: PlannerExpression) extends UnaryExpression {
-
- override def toString = s"!($child)"
-
- override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult = {
- if (child.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
- ValidationSuccess
- } else {
- ValidationFailure(s"Not operator requires a boolean expression as input,
" +
- s"but $child is of type ${child.resultType}")
- }
- }
-}
-
-case class And(left: PlannerExpression, right: PlannerExpression) extends
BinaryPredicate {
-
- override def toString = s"$left && $right"
-}
-
-case class Or(left: PlannerExpression, right: PlannerExpression) extends
BinaryPredicate {
-
- override def toString = s"$left || $right"
-}
-
@deprecated(
"Use ifThenElse(...) instead. It is available through the implicit Scala
DSL.",
"1.8.0")