This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 78d1e68ab120e2fecb931e6d9ea1ee8bef247cde Author: beyond1920 <[email protected]> AuthorDate: Fri Aug 2 10:13:45 2019 +0800 [FLINK-13509][table-planner-blink] Forbidden `IS NOT DISTINCT FROM `(or an expanded version) in LookupJoin. --- .../physical/common/CommonLookupJoinRule.scala | 24 ++++++++++++++++++++++ .../plan/batch/sql/join/LookupJoinTest.scala | 22 ++++++++++++++++++++ .../plan/stream/sql/join/LookupJoinTest.scala | 22 ++++++++++++++++++++ 3 files changed, 68 insertions(+) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala index 08510b2..62bd3dd 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala @@ -22,6 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin import org.apache.flink.table.planner.plan.nodes.logical._ import org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType +import org.apache.flink.table.planner.plan.utils.JoinUtil import org.apache.flink.table.sources.{LookupableTableSource, TableSource} import org.apache.calcite.plan.RelOptRule.{any, operand} @@ -30,6 +31,10 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.core.TableScan import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexProgram} +import java.util + +import scala.collection.JavaConversions._ + /** * Base implementation for both * [[org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecLookupJoinRule]] and @@ -86,6 +91,23 @@ trait CommonLookupJoinRule { } } + // TODO Support `IS NOT DISTINCT FROM` in the future: FLINK-13509 + protected def validateJoin(join: FlinkLogicalJoin): Unit = { + + val filterNulls: Array[Boolean] = { + val filterNulls = new util.ArrayList[java.lang.Boolean] + JoinUtil.createJoinInfo(join.getLeft, join.getRight, join.getCondition, filterNulls) + filterNulls.map(_.booleanValue()).toArray + } + + if (filterNulls.contains(false)) { + throw new TableException( + s"LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " + + s"alternative '(a = b) or (a IS NULL AND b IS NULL)'), the join condition is " + + s"'${join.getCondition}'") + } + } + protected def transform( join: FlinkLogicalJoin, input: FlinkLogicalRel, @@ -115,6 +137,7 @@ abstract class BaseSnapshotOnTableScanRule(description: String) val tableScan = call.rel[RelNode](3) val tableSource = findTableSource(tableScan).orNull + validateJoin(join) val temporalJoin = transform(join, input, tableSource, None) call.transformTo(temporalJoin) } @@ -145,6 +168,7 @@ abstract class BaseSnapshotOnCalcTableScanRule(description: String) val tableScan = call.rel[RelNode](4) val tableSource = findTableSource(tableScan).orNull + validateJoin(join) val temporalJoin = transform( join, input, tableSource, Some(calc.getProgram)) call.transformTo(temporalJoin) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala index 64e1a8b..ae0414dd 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala @@ -86,6 +86,28 @@ class LookupJoinTest extends TableTestBase { } @Test + def testNotDistinctFromInJoinCondition(): Unit = { + + // does not support join condition contains `IS NOT DISTINCT` + expectExceptionThrown( + "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " + + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a IS NOT DISTINCT FROM D.id", + "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " + + "alternative '(a = b) or (a IS NULL AND b IS NULL)')", + classOf[TableException] + ) + + // does not support join condition contains `IS NOT DISTINCT` and similar syntax + expectExceptionThrown( + "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " + + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id OR (T.a IS NULL AND D.id IS NULL)", + "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " + + "alternative '(a = b) or (a IS NULL AND b IS NULL)')", + classOf[TableException] + ) + } + + @Test def testLogicalPlan(): Unit = { val sql1 = """ diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala index 2b21265..5bf012b 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala @@ -101,6 +101,28 @@ class LookupJoinTest extends TableTestBase with Serializable { } @Test + def testNotDistinctFromInJoinCondition(): Unit = { + + // does not support join condition contains `IS NOT DISTINCT` + expectExceptionThrown( + "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " + + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a IS NOT DISTINCT FROM D.id", + "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " + + "alternative '(a = b) or (a IS NULL AND b IS NULL)')", + classOf[TableException] + ) + + // does not support join condition contains `IS NOT DISTINCT` and similar syntax + expectExceptionThrown( + "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " + + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id OR (T.a IS NULL AND D.id IS NULL)", + "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " + + "alternative '(a = b) or (a IS NULL AND b IS NULL)')", + classOf[TableException] + ) + } + + @Test def testInvalidLookupTableFunction(): Unit = { streamUtil.addDataStream[(Int, String, Long, Timestamp)]( "T", 'a, 'b, 'c, 'ts, 'proctime.proctime)
