Repository: flink Updated Branches: refs/heads/master 82047f723 -> f6c9b32c1
[FLINK-5524] [table] Support early out for code generated AND/OR conditions This closes #3372. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6c9b32c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6c9b32c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6c9b32c Branch: refs/heads/master Commit: f6c9b32c1f1fd7521e2dbe16ab20985a45d7cb07 Parents: 82047f7 Author: Kurt Young <[email protected]> Authored: Tue Feb 21 14:35:17 2017 +0800 Committer: twalthr <[email protected]> Committed: Thu Mar 2 16:13:43 2017 +0100 ---------------------------------------------------------------------- .../table/codegen/calls/ScalarOperators.scala | 152 +++++++++++-------- .../table/expressions/ScalarOperatorsTest.scala | 50 +++++- .../utils/UserDefinedScalarFunctions.scala | 6 + 3 files changed, 134 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f6c9b32c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala index 3f7c91f..47a81ab 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala @@ -286,42 +286,51 @@ object ScalarOperators { // Unknown && False -> False // Unknown && Unknown -> Unknown s""" - |${left.code} - |${right.code} - |boolean $resultTerm; - |boolean $nullTerm; - |if (!${left.nullTerm} && !${right.nullTerm}) { - | $resultTerm = ${left.resultTerm} && ${right.resultTerm}; - | $nullTerm = false; - |} - |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) { - | $resultTerm = false; - | $nullTerm = true; - |} - |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) { - | $resultTerm = false; - | $nullTerm = false; - |} - |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) { - | $resultTerm = false; - | $nullTerm = true; - |} - |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) { - | $resultTerm = false; - | $nullTerm = false; - |} - |else { - | $resultTerm = false; - | $nullTerm = true; - |} - |""".stripMargin + |${left.code} + | + |boolean $resultTerm = false; + |boolean $nullTerm = false; + |if (!${left.nullTerm} && !${left.resultTerm}) { + | // left expr is false, skip right expr + |} else { + | ${right.code} + | + | if (!${left.nullTerm} && !${right.nullTerm}) { + | $resultTerm = ${left.resultTerm} && ${right.resultTerm}; + | $nullTerm = false; + | } + | else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) { + | $resultTerm = false; + | $nullTerm = true; + | } + | else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) { + | $resultTerm = false; + | $nullTerm = false; + | } + | else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) { + | $resultTerm = false; + | $nullTerm = true; + | } + | else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) { + | $resultTerm = false; + | $nullTerm = false; + | } + | else { + | $resultTerm = false; + | $nullTerm = true; + | } + |} + """.stripMargin } else { s""" - |${left.code} - |${right.code} - |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm}; - |""".stripMargin + |${left.code} + |boolean $resultTerm = false; + |if (${left.resultTerm}) { + | ${right.code} + | $resultTerm = ${right.resultTerm}; + |} + |""".stripMargin } GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO) @@ -338,47 +347,56 @@ object ScalarOperators { val operatorCode = if (nullCheck) { // Three-valued logic: // no Unknown -> Two-valued logic - // True && Unknown -> True - // False && Unknown -> Unknown - // Unknown && True -> True - // Unknown && False -> Unknown - // Unknown && Unknown -> Unknown + // True || Unknown -> True + // False || Unknown -> Unknown + // Unknown || True -> True + // Unknown || False -> Unknown + // Unknown || Unknown -> Unknown s""" |${left.code} - |${right.code} - |boolean $resultTerm; - |boolean $nullTerm; - |if (!${left.nullTerm} && !${right.nullTerm}) { - | $resultTerm = ${left.resultTerm} || ${right.resultTerm}; - | $nullTerm = false; - |} - |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) { - | $resultTerm = true; - | $nullTerm = false; - |} - |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) { - | $resultTerm = false; - | $nullTerm = true; - |} - |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) { - | $resultTerm = true; - | $nullTerm = false; - |} - |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) { - | $resultTerm = false; - | $nullTerm = true; - |} - |else { - | $resultTerm = false; - | $nullTerm = true; + | + |boolean $resultTerm = true; + |boolean $nullTerm = false; + |if (!${left.nullTerm} && ${left.resultTerm}) { + | // left expr is true, skip right expr + |} else { + | ${right.code} + | + | if (!${left.nullTerm} && !${right.nullTerm}) { + | $resultTerm = ${left.resultTerm} || ${right.resultTerm}; + | $nullTerm = false; + | } + | else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) { + | $resultTerm = true; + | $nullTerm = false; + | } + | else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) { + | $resultTerm = false; + | $nullTerm = true; + | } + | else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) { + | $resultTerm = true; + | $nullTerm = false; + | } + | else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) { + | $resultTerm = false; + | $nullTerm = true; + | } + | else { + | $resultTerm = false; + | $nullTerm = true; + | } |} |""".stripMargin } else { s""" - |${left.code} - |${right.code} - |boolean $resultTerm = ${left.resultTerm} || ${right.resultTerm}; + |${left.code} + |boolean $resultTerm = true; + |if (!${left.resultTerm}) { + | ${right.code} + | $resultTerm = ${right.resultTerm}; + |} |""".stripMargin } http://git-wip-us.apache.org/repos/asf/flink/blob/f6c9b32c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala index ea8ac8a..a4dca93 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala @@ -23,7 +23,8 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.types.Row import org.apache.flink.table.api.{Types, ValidationException} import org.apache.flink.table.api.scala._ -import org.apache.flink.table.expressions.utils.ExpressionTestBase +import org.apache.flink.table.expressions.utils.{ExpressionTestBase, ShouldNotExecuteFunc} +import org.apache.flink.table.functions.ScalarFunction import org.junit.Test class ScalarOperatorsTest extends ExpressionTestBase { @@ -89,10 +90,37 @@ class ScalarOperatorsTest extends ExpressionTestBase { testTableApi( +'f8, "+f8", "5") // additional space before "+" required because of checkstyle testTableApi(3.toExpr + 'f8, "3 + f8", "8") - // boolean arithmetic - testTableApi('f6 && true, "f6 && true", "true") - testTableApi('f6 && false, "f6 && false", "false") - testTableApi('f6 || false, "f6 || false", "true") + // boolean arithmetic: AND + testTableApi('f6 && true, "f6 && true", "true") // true && true + testTableApi('f6 && false, "f6 && false", "false") // true && false + testTableApi('f11 && true, "f11 && true", "false") // false && true + testTableApi('f11 && false, "f11 && false", "false") // false && false + testTableApi('f6 && 'f12, "f6 && f12", "null") // true && null + testTableApi('f11 && 'f12, "f11 && f12", "false") // false && null + testTableApi('f12 && true, "f12 && true", "null") // null && true + testTableApi('f12 && false, "f12 && false", "false") // null && false + testTableApi('f12 && 'f12, "f12 && f12", "null") // null && null + testTableApi('f11 && ShouldNotExecuteFunc('f10), // early out + "f11 && ShouldNotExecuteFunc(f10)", "false") + testTableApi('f6 && 'f11 && ShouldNotExecuteFunc('f10), // early out + "f6 && f11 && ShouldNotExecuteFunc(f10)", "false") + + // boolean arithmetic: OR + testTableApi('f6 || true, "f6 || true", "true") // true || true + testTableApi('f6 || false, "f6 || false", "true") // true || false + testTableApi('f11 || true, "f11 || true", "true") // false || true + testTableApi('f11 || false, "f11 || false", "false") // false || false + testTableApi('f6 || 'f12, "f6 || f12", "true") // true || null + testTableApi('f11 || 'f12, "f11 || f12", "null") // false || null + testTableApi('f12 || true, "f12 || true", "true") // null || true + testTableApi('f12 || false, "f12 || false", "null") // null || false + testTableApi('f12 || 'f12, "f12 || f12", "null") // null || null + testTableApi('f6 || ShouldNotExecuteFunc('f10), // early out + "f6 || ShouldNotExecuteFunc(f10)", "true") + testTableApi('f11 || 'f6 || ShouldNotExecuteFunc('f10), // early out + "f11 || f6 || ShouldNotExecuteFunc(f10)", "true") + + // boolean arithmetic: NOT testTableApi(!'f6, "!f6", "false") // comparison @@ -187,7 +215,7 @@ class ScalarOperatorsTest extends ExpressionTestBase { // ---------------------------------------------------------------------------------------------- def testData = { - val testData = new Row(11) + val testData = new Row(13) testData.setField(0, 1: Byte) testData.setField(1, 1: Short) testData.setField(2, 1) @@ -199,6 +227,8 @@ class ScalarOperatorsTest extends ExpressionTestBase { testData.setField(8, 5) testData.setField(9, 10) testData.setField(10, "String") + testData.setField(11, false) + testData.setField(12, null) testData } @@ -214,8 +244,14 @@ class ScalarOperatorsTest extends ExpressionTestBase { Types.DOUBLE, Types.INT, Types.INT, - Types.STRING + Types.STRING, + Types.BOOLEAN, + Types.BOOLEAN ).asInstanceOf[TypeInformation[Any]] } + override def functions: Map[String, ScalarFunction] = Map( + "shouldNotExecuteFunc" -> ShouldNotExecuteFunc + ) + } http://git-wip-us.apache.org/repos/asf/flink/blob/f6c9b32c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala index 4fee3b2..1258137 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala @@ -124,6 +124,12 @@ object Func12 extends ScalarFunction { } } +object ShouldNotExecuteFunc extends ScalarFunction { + def eval(s: String): Boolean = { + throw new Exception("This func should never be executed") + } +} + class RichFunc0 extends ScalarFunction { var openCalled = false var closeCalled = false
