This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e07ca0  [FLINK-13782] Implement type inference for more logical 
expressions
8e07ca0 is described below

commit 8e07ca008b6eada80cf7668442b81cffc6a7dc0c
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu May 28 13:53:54 2020 +0200

    [FLINK-13782] Implement type inference for more logical expressions
    
    Implemented input & output type inference for:
    AND/OR/NOT/IS NULL/IS NOT NULL/IS TRUE/IS FALSE/IS NOT TRUE/IS NOT
    FALSE/BETWEEN/NOT BETWEEN
    
    This closes #12387
---
 .../functions/BuiltInFunctionDefinitions.java      |  50 ++++++--
 .../expressions/PlannerExpressionConverter.scala   |  44 -------
 .../table/planner/expressions/comparison.scala     | 127 ---------------------
 .../flink/table/planner/expressions/logic.scala    |  40 -------
 4 files changed, 39 insertions(+), 222 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 2b7c47a..8a72ecc 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -25,6 +25,8 @@ import 
org.apache.flink.table.types.inference.ConstantArgumentCount;
 import org.apache.flink.table.types.inference.InputTypeStrategies;
 import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import 
org.apache.flink.table.types.logical.StructuredType.StructuredComparision;
 import org.apache.flink.util.Preconditions;
 
 import java.lang.reflect.Field;
@@ -41,9 +43,12 @@ import static 
org.apache.flink.table.types.inference.InputTypeStrategies.OUTPUT_
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.TWO_EQUALS_COMPARABLE;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.TWO_FULLY_COMPARABLE;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
+import static 
org.apache.flink.table.types.inference.InputTypeStrategies.comparable;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.logical;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.or;
+import static 
org.apache.flink.table.types.inference.InputTypeStrategies.sequence;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.varyingSequence;
+import static 
org.apache.flink.table.types.inference.InputTypeStrategies.wildcardWithCount;
 import static org.apache.flink.table.types.inference.TypeStrategies.explicit;
 import static org.apache.flink.table.types.inference.TypeStrategies.nullable;
 
@@ -58,19 +63,34 @@ public final class BuiltInFunctionDefinitions {
                new BuiltInFunctionDefinition.Builder()
                        .name("and")
                        .kind(SCALAR)
-                       .outputTypeStrategy(TypeStrategies.MISSING)
+                       .inputTypeStrategy(
+                               varyingSequence(
+                                       logical(LogicalTypeRoot.BOOLEAN),
+                                       logical(LogicalTypeRoot.BOOLEAN),
+                                       logical(LogicalTypeRoot.BOOLEAN)
+                               )
+                       )
+                       
.outputTypeStrategy(nullable(explicit(DataTypes.BOOLEAN())))
                        .build();
        public static final BuiltInFunctionDefinition OR =
                new BuiltInFunctionDefinition.Builder()
                        .name("or")
                        .kind(SCALAR)
-                       .outputTypeStrategy(TypeStrategies.MISSING)
+                       .inputTypeStrategy(
+                               varyingSequence(
+                                       logical(LogicalTypeRoot.BOOLEAN),
+                                       logical(LogicalTypeRoot.BOOLEAN),
+                                       logical(LogicalTypeRoot.BOOLEAN)
+                               )
+                       )
+                       
.outputTypeStrategy(nullable(explicit(DataTypes.BOOLEAN())))
                        .build();
        public static final BuiltInFunctionDefinition NOT =
                new BuiltInFunctionDefinition.Builder()
                        .name("not")
                        .kind(SCALAR)
-                       .outputTypeStrategy(TypeStrategies.MISSING)
+                       
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
+                       
.outputTypeStrategy(nullable(explicit(DataTypes.BOOLEAN())))
                        .build();
        public static final BuiltInFunctionDefinition IF =
                new BuiltInFunctionDefinition.Builder()
@@ -126,49 +146,57 @@ public final class BuiltInFunctionDefinitions {
                new BuiltInFunctionDefinition.Builder()
                        .name("isNull")
                        .kind(SCALAR)
-                       .outputTypeStrategy(TypeStrategies.MISSING)
+                       
.inputTypeStrategy(wildcardWithCount(ConstantArgumentCount.of(1)))
+                       
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
                        .build();
        public static final BuiltInFunctionDefinition IS_NOT_NULL =
                new BuiltInFunctionDefinition.Builder()
                        .name("isNotNull")
                        .kind(SCALAR)
-                       .outputTypeStrategy(TypeStrategies.MISSING)
+                       
.inputTypeStrategy(wildcardWithCount(ConstantArgumentCount.of(1)))
+                       
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
                        .build();
        public static final BuiltInFunctionDefinition IS_TRUE =
                new BuiltInFunctionDefinition.Builder()
                        .name("isTrue")
                        .kind(SCALAR)
-                       .outputTypeStrategy(TypeStrategies.MISSING)
+                       
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
+                       
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
                        .build();
        public static final BuiltInFunctionDefinition IS_FALSE =
                new BuiltInFunctionDefinition.Builder()
                        .name("isFalse")
                        .kind(SCALAR)
-                       .outputTypeStrategy(TypeStrategies.MISSING)
+                       
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
+                       
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
                        .build();
        public static final BuiltInFunctionDefinition IS_NOT_TRUE =
                new BuiltInFunctionDefinition.Builder()
                        .name("isNotTrue")
                        .kind(SCALAR)
-                       .outputTypeStrategy(TypeStrategies.MISSING)
+                       
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
+                       
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
                        .build();
        public static final BuiltInFunctionDefinition IS_NOT_FALSE =
                new BuiltInFunctionDefinition.Builder()
                        .name("isNotFalse")
                        .kind(SCALAR)
-                       .outputTypeStrategy(TypeStrategies.MISSING)
+                       
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
+                       
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
                        .build();
        public static final BuiltInFunctionDefinition BETWEEN =
                new BuiltInFunctionDefinition.Builder()
                        .name("between")
                        .kind(SCALAR)
-                       .outputTypeStrategy(TypeStrategies.MISSING)
+                       
.inputTypeStrategy(comparable(ConstantArgumentCount.of(3), 
StructuredComparision.FULL))
+                       
.outputTypeStrategy(nullable(explicit(DataTypes.BOOLEAN())))
                        .build();
        public static final BuiltInFunctionDefinition NOT_BETWEEN =
                new BuiltInFunctionDefinition.Builder()
                        .name("notBetween")
                        .kind(SCALAR)
-                       .outputTypeStrategy(TypeStrategies.MISSING)
+                       
.inputTypeStrategy(comparable(ConstantArgumentCount.of(3), 
StructuredComparision.FULL))
+                       
.outputTypeStrategy(nullable(explicit(DataTypes.BOOLEAN())))
                        .build();
 
        // aggregate functions
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index 97a3c19..ea98b0d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -160,58 +160,14 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
             expr.validateInput()
             expr
 
-          case AND =>
-            assert(args.size >= 2)
-            args.reduceLeft(And)
-
-          case OR =>
-            assert(args.size >= 2)
-            args.reduceLeft(Or)
-
-          case NOT =>
-            assert(args.size == 1)
-            Not(args.head)
-
           case IN =>
             assert(args.size > 1)
             In(args.head, args.drop(1))
 
-          case IS_NULL =>
-            assert(args.size == 1)
-            IsNull(args.head)
-
-          case IS_NOT_NULL =>
-            assert(args.size == 1)
-            IsNotNull(args.head)
-
-          case IS_TRUE =>
-            assert(args.size == 1)
-            IsTrue(args.head)
-
-          case IS_FALSE =>
-            assert(args.size == 1)
-            IsFalse(args.head)
-
-          case IS_NOT_TRUE =>
-            assert(args.size == 1)
-            IsNotTrue(args.head)
-
-          case IS_NOT_FALSE =>
-            assert(args.size == 1)
-            IsNotFalse(args.head)
-
           case IF =>
             assert(args.size == 3)
             If(args.head, args(1), args.last)
 
-          case BETWEEN =>
-            assert(args.size == 3)
-            Between(args.head, args(1), args.last)
-
-          case NOT_BETWEEN =>
-            assert(args.size == 3)
-            NotBetween(args.head, args(1), args.last)
-
           case DISTINCT =>
             assert(args.size == 1)
             DistinctAgg(args.head)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala
deleted file mode 100644
index b72d4d6..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
-import org.apache.flink.table.planner.typeutils.TypeInfoCheckUtils.{isArray, 
isComparable, isNumeric}
-import org.apache.flink.table.planner.validate._
-import 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
-
-import org.apache.calcite.sql.SqlOperator
-
-abstract class BinaryComparison extends BinaryExpression {
-  private[flink] def sqlOperator: SqlOperator
-
-  override private[flink] def resultType = BOOLEAN_TYPE_INFO
-
-  override private[flink] def validateInput(): ValidationResult =
-    (left.resultType, right.resultType) match {
-      case (lType, rType) if isNumeric(lType) && isNumeric(rType) => 
ValidationSuccess
-      case (lType, rType) if isComparable(lType) && lType == rType => 
ValidationSuccess
-      case (lType, rType) if isComparable(lType) &&
-          fromTypeInfoToLogicalType(lType) == fromTypeInfoToLogicalType(rType) 
=>
-        ValidationSuccess
-      case (lType, rType) =>
-        ValidationFailure(
-          s"Comparison is only supported for numeric types and " +
-            s"comparable types of same type, got $lType and $rType")
-    }
-}
-
-case class IsNull(child: PlannerExpression) extends UnaryExpression {
-  override def toString = s"($child).isNull"
-
-  override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
-
-case class IsNotNull(child: PlannerExpression) extends UnaryExpression {
-  override def toString = s"($child).isNotNull"
-
-  override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
-
-case class IsTrue(child: PlannerExpression) extends UnaryExpression {
-  override def toString = s"($child).isTrue"
-
-  override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
-
-case class IsFalse(child: PlannerExpression) extends UnaryExpression {
-  override def toString = s"($child).isFalse"
-
-  override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
-
-case class IsNotTrue(child: PlannerExpression) extends UnaryExpression {
-  override def toString = s"($child).isNotTrue"
-
-  override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
-
-case class IsNotFalse(child: PlannerExpression) extends UnaryExpression {
-  override def toString = s"($child).isNotFalse"
-
-  override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
-
-abstract class BetweenComparison(
-    expr: PlannerExpression,
-    lowerBound: PlannerExpression,
-    upperBound: PlannerExpression)
-  extends PlannerExpression {
-
-  override private[flink] def resultType: TypeInformation[_] = 
BasicTypeInfo.BOOLEAN_TYPE_INFO
-
-  override private[flink] def children: Seq[PlannerExpression] = Seq(expr, 
lowerBound, upperBound)
-
-  override private[flink] def validateInput(): ValidationResult = {
-    (expr.resultType, lowerBound.resultType, upperBound.resultType) match {
-      case (exprType, lowerType, upperType)
-          if isNumeric(exprType) && isNumeric(lowerType) && 
isNumeric(upperType) =>
-        ValidationSuccess
-      case (exprType, lowerType, upperType)
-          if isComparable(exprType) && exprType == lowerType && exprType == 
upperType =>
-        ValidationSuccess
-      case (exprType, lowerType, upperType) =>
-        ValidationFailure(
-          s"Between is only supported for numeric types and " +
-            s"identical comparable types, but got $exprType, $lowerType and 
$upperType"
-        )
-    }
-  }
-}
-
-case class Between(
-    expr: PlannerExpression,
-    lowerBound: PlannerExpression,
-    upperBound: PlannerExpression)
-  extends BetweenComparison(expr, lowerBound, upperBound) {
-
-  override def toString: String = s"($expr).between($lowerBound, $upperBound)"
-}
-
-case class NotBetween(
-    expr: PlannerExpression,
-    lowerBound: PlannerExpression,
-    upperBound: PlannerExpression)
-  extends BetweenComparison(expr, lowerBound, upperBound) {
-
-  override def toString: String = s"($expr).notBetween($lowerBound, 
$upperBound)"
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/logic.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/logic.scala
index ec6c88f..2d3bd70 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/logic.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/logic.scala
@@ -22,46 +22,6 @@ import org.apache.flink.table.planner.validate._
 import 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isDecimal
 
-abstract class BinaryPredicate extends BinaryExpression {
-  override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
-
-  override private[flink] def validateInput(): ValidationResult = {
-    if (left.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
-        right.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(s"$this only accepts children of Boolean type, " +
-        s"get $left : ${left.resultType} and $right : ${right.resultType}")
-    }
-  }
-}
-
-case class Not(child: PlannerExpression) extends UnaryExpression {
-
-  override def toString = s"!($child)"
-
-  override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
-
-  override private[flink] def validateInput(): ValidationResult = {
-    if (child.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(s"Not operator requires a boolean expression as input, 
" +
-        s"but $child is of type ${child.resultType}")
-    }
-  }
-}
-
-case class And(left: PlannerExpression, right: PlannerExpression) extends 
BinaryPredicate {
-
-  override def toString = s"$left && $right"
-}
-
-case class Or(left: PlannerExpression, right: PlannerExpression) extends 
BinaryPredicate {
-
-  override def toString = s"$left || $right"
-}
-
 @deprecated(
   "Use ifThenElse(...) instead. It is available through the implicit Scala 
DSL.",
   "1.8.0")

Reply via email to