This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 353dd8d  [FLINK-13378][table-planner-blink] Fix SINGLE_VALUE is not 
correctly supported in blink planner
353dd8d is described below

commit 353dd8d8087f12fe155537acc8887831cc776c9b
Author: JingsongLi <[email protected]>
AuthorDate: Tue Jul 23 21:28:26 2019 +0800

    [FLINK-13378][table-planner-blink] Fix SINGLE_VALUE is not correctly 
supported in blink planner
    
    This closes #9208
---
 .../planner/expressions/ExpressionBuilder.java     |  2 +-
 .../planner/expressions/RexNodeConverter.java      | 11 ++++++++++
 .../aggfunctions/SingleValueAggFunction.java       |  2 +-
 .../functions/sql/FlinkSqlOperatorTable.java       |  5 -----
 .../functions/sql/SqlThrowExceptionFunction.java   |  9 ++++----
 .../table/planner/codegen/ExprCodeGenerator.scala  |  9 +++++---
 .../expressions/PlannerExpressionConverter.scala   |  9 ++++++++
 .../flink/table/planner/expressions/call.scala     | 20 ++++++++++++++++-
 .../runtime/batch/sql/join/ScalarQueryITCase.scala | 25 +++++++++++++++++++++-
 9 files changed, 75 insertions(+), 17 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ExpressionBuilder.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ExpressionBuilder.java
index 512b775..d96e698 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ExpressionBuilder.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/ExpressionBuilder.java
@@ -140,6 +140,6 @@ public class ExpressionBuilder {
        }
 
        public static Expression throwException(String msg, DataType type) {
-               return call(THROW_EXCEPTION, typeLiteral(type));
+               return call(THROW_EXCEPTION, literal(msg), typeLiteral(type));
        }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
index e562fed..a3d85e7 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
@@ -49,8 +49,11 @@ import 
org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.calcite.RexAggLocalVariable;
 import org.apache.flink.table.planner.calcite.RexDistinctKeyVariable;
+import org.apache.flink.table.planner.functions.InternalFunctionDefinitions;
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.planner.functions.sql.SqlThrowExceptionFunction;
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -305,6 +308,14 @@ public class RexNodeConverter implements 
ExpressionVisitor<RexNode> {
                conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.SHA1, 
exprs -> convert(FlinkSqlOperatorTable.SHA1, exprs));
                
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.STREAM_RECORD_TIMESTAMP,
                                exprs -> 
convert(FlinkSqlOperatorTable.STREAMRECORD_TIMESTAMP, exprs));
+
+               // blink expression
+               
conversionsOfBuiltInFunc.put(InternalFunctionDefinitions.THROW_EXCEPTION, exprs 
-> {
+                       DataType type = ((TypeLiteralExpression) 
exprs.get(1)).getOutputDataType();
+                       return convert(new SqlThrowExceptionFunction(
+                                       
typeFactory.createFieldTypeFromLogicalType(fromDataTypeToLogicalType(type))),
+                                       exprs.subList(0, 1));
+               });
        }
 
        @Override
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
index 110f9d9..865c0c2 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
@@ -76,7 +76,7 @@ public abstract class SingleValueAggFunction extends 
DeclarativeAggregateFunctio
        @Override
        public Expression[] accumulateExpressions() {
                return new Expression[] {
-                       /* value = count == 0 ? exception : operand(0) */
+                       /* value = count > 0 ? exception : operand(0) */
                        ifThenElse(greaterThan(count, ZERO),
                                throwException(ERROR_MSG, getResultType()),
                                operand(0)),
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index b2fd438..50ff193 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -955,11 +955,6 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
         */
        public static final SqlIncrSumAggFunction INCR_SUM = new 
SqlIncrSumAggFunction();
 
-       /**
-        * <code>THROW_EXCEPTION</code> scalar function. Only internal used.
-        */
-       public static final SqlFunction THROW_EXCEPTION = new 
SqlThrowExceptionFunction();
-
        // 
-----------------------------------------------------------------------------
        // Window SQL functions
        // 
-----------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlThrowExceptionFunction.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlThrowExceptionFunction.java
index c767b82..4547a36 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlThrowExceptionFunction.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlThrowExceptionFunction.java
@@ -18,24 +18,23 @@
 
 package org.apache.flink.table.planner.functions.sql;
 
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.sql.type.ReturnTypes;
-import org.apache.calcite.sql.type.SqlTypeFamily;
 
 /**
  * Function used to throw an exception, only used internally.
  */
 public class SqlThrowExceptionFunction extends SqlFunction {
-       public SqlThrowExceptionFunction() {
+       public SqlThrowExceptionFunction(RelDataType returnType) {
                super(
                        "THROW_EXCEPTION",
                        SqlKind.OTHER_FUNCTION,
-                       ReturnTypes.ARG0_NULLABLE,
+                       opBinding -> returnType,
                        null,
-                       OperandTypes.family(SqlTypeFamily.ANY, 
SqlTypeFamily.ANY),
+                       OperandTypes.STRING,
                        SqlFunctionCategory.USER_DEFINED_FUNCTION);
        }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 3e1a5d3..e641708 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -28,6 +28,7 @@ import 
org.apache.flink.table.planner.codegen.GeneratedExpression.{NEVER_NULL, N
 import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens._
 import org.apache.flink.table.planner.codegen.calls.{FunctionGenerator, 
ScalarFunctionCallGen, StringCallGen, TableFunctionCallGen}
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable._
+import org.apache.flink.table.planner.functions.sql.SqlThrowExceptionFunction
 import org.apache.flink.table.planner.functions.utils.{ScalarSqlFunction, 
TableSqlFunction}
 import org.apache.flink.table.runtime.types.PlannerTypeUtils.isInteroperable
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils
@@ -717,14 +718,16 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, 
nullableInput: Boolean)
       case STREAMRECORD_TIMESTAMP =>
         generateRowtimeAccess(ctx, contextTerm)
 
-      case THROW_EXCEPTION =>
+      case _: SqlThrowExceptionFunction =>
+        val nullValue = generateNullLiteral(resultType, nullCheck = true)
         val code =
           s"""
              |${operands.map(_.code).mkString("\n")}
+             |${nullValue.code}
              |org.apache.flink.util.ExceptionUtils.rethrow(
-             |  new RuntimeException(${operands(1).resultTerm}.toString()));
+             |  new RuntimeException(${operands.head.resultTerm}.toString()));
              |""".stripMargin
-        GeneratedExpression(operands.head.resultTerm, operands.head.nullTerm, 
code, resultType)
+        GeneratedExpression(nullValue.resultTerm, nullValue.nullTerm, code, 
resultType)
 
       case ssf: ScalarSqlFunction =>
         new ScalarFunctionCallGen(ssf.getScalarFunction).generate(ctx, 
operands, resultType)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index 7a6c3da..020e5cd 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -24,9 +24,11 @@ import 
org.apache.flink.table.expressions.{ApiExpressionVisitor, CallExpression,
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions._
 import org.apache.flink.table.functions._
 import org.apache.flink.table.planner.expressions.{E => PlannerE, UUID => 
PlannerUUID}
+import 
org.apache.flink.table.planner.functions.InternalFunctionDefinitions.THROW_EXCEPTION
 import 
org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
 import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL, 
SYMBOL, TIMESTAMP_WITHOUT_TIME_ZONE}
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks._
+import 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
 
 import _root_.scala.collection.JavaConverters._
 
@@ -85,6 +87,13 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
         val windowReference = translateWindowReference(children.head)
         return RowtimeAttribute(windowReference)
 
+      case THROW_EXCEPTION =>
+        assert(children.size == 2)
+        return ThrowException(
+          children.head.accept(this),
+          fromDataTypeToLegacyInfo(
+            children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType))
+
       case _ =>
     }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
index 5b56b77..8629339 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.planner.expressions
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation, 
Types}
 import org.apache.flink.table.functions._
 import 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
 import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
@@ -221,3 +221,21 @@ case class PlannerTableFunctionCall(
   override def toString =
     s"${tableFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
 }
+
+case class ThrowException(msg: PlannerExpression, tp: TypeInformation[_]) 
extends UnaryExpression {
+
+  override private[flink] def resultType: TypeInformation[_] = tp
+
+  override private[flink] def child: PlannerExpression = msg
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (child.resultType == Types.STRING) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"ThrowException operator requires String input, " +
+          s"but $child is of type ${child.resultType}")
+    }
+  }
+
+  override def toString: String = s"ThrowException($msg)"
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/ScalarQueryITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/ScalarQueryITCase.scala
index 1a948b0..c8db3d3 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/ScalarQueryITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/ScalarQueryITCase.scala
@@ -18,8 +18,12 @@
 
 package org.apache.flink.table.planner.runtime.batch.sql.join
 
+import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.planner.runtime.utils.TestData._
+
+import org.junit.{Before, Test}
 
 import scala.collection.Seq
 
@@ -46,6 +50,25 @@ class ScalarQueryITCase extends BatchTestBase {
     row(6, null)
   )
 
-}
+  @Before
+  override def before(): Unit = {
+    super.before()
+    registerCollection("l", l, INT_DOUBLE, "a, b")
+    registerCollection("r", r, INT_DOUBLE, "c, d")
+  }
 
+  @Test
+  def testScalarSubQuery(): Unit = {
+    checkResult(
+      "SELECT * FROM l WHERE a = (SELECT c FROM r where c = 3)",
+      Seq(row(3, 3.0)))
+  }
+
+  @Test(expected = classOf[JobExecutionException])
+  def testScalarSubQueryException(): Unit = {
+    checkResult(
+      "SELECT * FROM l WHERE a = (SELECT c FROM r)",
+      Seq(row(3, 3.0)))
+  }
+}
 

Reply via email to