This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4b48eb89283f1d61514a4a52263d8291b724da90 Author: beyond1920 <[email protected]> AuthorDate: Wed Jul 31 13:08:17 2019 +0800 [FLINK-13433][table-planner-blink] Do not fetch data from LookupableTableSource if the JoinKey in left side of LookupJoin contains null value This closes #9285 --- .../planner/codegen/LookupJoinCodeGenerator.scala | 32 +++- .../plan/nodes/common/CommonLookupJoin.scala | 2 +- .../runtime/batch/sql/join/LookupJoinITCase.scala | 173 ++++++++++++--------- .../runtime/stream/sql/LookupJoinITCase.scala | 119 ++++++++++++-- .../utils/InMemoryLookupableTableSource.scala | 7 +- 5 files changed, 240 insertions(+), 93 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala index 044d8dd..9a9bf2e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala @@ -65,7 +65,7 @@ object LookupJoinCodeGenerator { : GeneratedFunction[FlatMapFunction[BaseRow, BaseRow]] = { val ctx = CodeGeneratorContext(config) - val (prepareCode, parameters) = prepareParameters( + val (prepareCode, parameters, nullInParameters) = prepareParameters( ctx, typeFactory, inputType, @@ -87,11 +87,17 @@ object LookupJoinCodeGenerator { s"$lookupFunctionTerm.setCollector($DEFAULT_COLLECTOR_TERM);" } + // TODO: filter all records when there is any nulls on the join key, because + // "IS NOT DISTINCT FROM" is not supported yet. val body = s""" |$prepareCode |$setCollectorCode - |$lookupFunctionTerm.eval($parameters); + |if ($nullInParameters) { + | return; + |} else { + | $lookupFunctionTerm.eval($parameters); + | } """.stripMargin FunctionCodeGenerator.generateFunction( @@ -118,7 +124,7 @@ object LookupJoinCodeGenerator { : GeneratedFunction[AsyncFunction[BaseRow, AnyRef]] = { val ctx = CodeGeneratorContext(config) - val (prepareCode, parameters) = prepareParameters( + val (prepareCode, parameters, nullInParameters) = prepareParameters( ctx, typeFactory, inputType, @@ -130,11 +136,18 @@ object LookupJoinCodeGenerator { val lookupFunctionTerm = ctx.addReusableFunction(asyncLookupFunction) val DELEGATE = className[DelegatingResultFuture[_]] + // TODO: filter all records when there is any nulls on the join key, because + // "IS NOT DISTINCT FROM" is not supported yet. val body = s""" |$prepareCode - |$DELEGATE delegates = new $DELEGATE($DEFAULT_COLLECTOR_TERM); - |$lookupFunctionTerm.eval(delegates.getCompletableFuture(), $parameters); + |if ($nullInParameters) { + | $DEFAULT_COLLECTOR_TERM.complete(java.util.Collections.emptyList()); + | return; + |} else { + | $DELEGATE delegates = new $DELEGATE($DEFAULT_COLLECTOR_TERM); + | $lookupFunctionTerm.eval(delegates.getCompletableFuture(), $parameters); + |} """.stripMargin FunctionCodeGenerator.generateFunction( @@ -156,7 +169,7 @@ object LookupJoinCodeGenerator { lookupKeyInOrder: Array[Int], allLookupFields: Map[Int, LookupKey], isExternalArgs: Boolean, - fieldCopy: Boolean): (String, String) = { + fieldCopy: Boolean): (String, String, String) = { val inputFieldExprs = for (i <- lookupKeyInOrder) yield { allLookupFields.get(i) match { @@ -195,9 +208,12 @@ object LookupJoinCodeGenerator { | $newTerm = $assign; |} """.stripMargin - (code, newTerm) + (code, newTerm, e.nullTerm) } - (codeAndArg.map(_._1).mkString("\n"), codeAndArg.map(_._2).mkString(", ")) + ( + codeAndArg.map(_._1).mkString("\n"), + codeAndArg.map(_._2).mkString(", "), + codeAndArg.map(_._3).mkString("|| ")) } /** diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala index 7a2b133..88800da 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala @@ -560,7 +560,7 @@ abstract class CommonLookupJoin( joinType: JoinRelType): Unit = { // check join on all fields of PRIMARY KEY or (UNIQUE) INDEX - if (allLookupKeys.isEmpty || allLookupKeys.isEmpty) { + if (allLookupKeys.isEmpty) { throw new TableException( "Temporal table join requires an equality condition on fields of " + s"table [${tableSource.explainSource()}].") diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala index fe141f9..6fac55e 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala @@ -22,9 +22,17 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.table.api.Types import org.apache.flink.table.planner.runtime.utils.{BatchTableEnvUtil, BatchTestBase, InMemoryLookupableTableSource} +import org.junit.runner.RunWith +import org.junit.runners.Parameterized import org.junit.{Before, Test} -class LookupJoinITCase extends BatchTestBase { +import java.lang.Boolean +import java.util + +import scala.collection.JavaConversions._ + +@RunWith(classOf[Parameterized]) +class LookupJoinITCase(isAsyncMode: Boolean) extends BatchTestBase { val data = List( BatchTestBase.row(1L, 12L, "Julian"), @@ -33,6 +41,12 @@ class LookupJoinITCase extends BatchTestBase { BatchTestBase.row(8L, 11L, "Hello world"), BatchTestBase.row(9L, 12L, "Hello world!")) + val dataWithNull = List( + BatchTestBase.row(null, 15L, "Hello"), + BatchTestBase.row(3L, 15L, "Fabian"), + BatchTestBase.row(null, 11L, "Hello world"), + BatchTestBase.row(9L, 12L, "Hello world!")) + val typeInfo = new RowTypeInfo(LONG_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO) val userData = List( @@ -55,19 +69,72 @@ class LookupJoinITCase extends BatchTestBase { .enableAsync() .build() + val userDataWithNull = List( + (11, 1L, "Julian"), + (22, null, "Hello"), + (33, 3L, "Fabian"), + (44, null, "Hello world")) + + val userWithNullDataTableSource = InMemoryLookupableTableSource.builder() + .data(userDataWithNull) + .field("age", Types.INT) + .field("id", Types.LONG) + .field("name", Types.STRING) + .build() + + val userAsyncWithNullDataTableSource = InMemoryLookupableTableSource.builder() + .data(userDataWithNull) + .field("age", Types.INT) + .field("id", Types.LONG) + .field("name", Types.STRING) + .enableAsync() + .build() + + var userTable: String = _ + var userTableWithNull: String = _ + @Before override def before() { super.before() BatchTableEnvUtil.registerCollection(tEnv, "T0", data, typeInfo, "id, len, content") val myTable = tEnv.sqlQuery("SELECT *, PROCTIME() as proctime FROM T0") tEnv.registerTable("T", myTable) + + BatchTableEnvUtil.registerCollection( + tEnv, "T1", dataWithNull, typeInfo, "id, len, content") + val myTable1 = tEnv.sqlQuery("SELECT *, PROCTIME() as proctime FROM T1") + tEnv.registerTable("nullableT", myTable1) + tEnv.registerTableSource("userTable", userTableSource) tEnv.registerTableSource("userAsyncTable", userAsyncTableSource) + userTable = if (isAsyncMode) "userAsyncTable" else "userTable" + + tEnv.registerTableSource("userWithNullDataTable", userWithNullDataTableSource) + tEnv.registerTableSource("userWithNullDataAsyncTable", userAsyncWithNullDataTableSource) + userTableWithNull = if (isAsyncMode) "userWithNullDataAsyncTable" else "userWithNullDataTable" + + // TODO: enable object reuse until [FLINK-12351] is fixed. + env.getConfig.disableObjectReuse() + } + + @Test + def testLeftJoinTemporalTableWithLocalPredicate(): Unit = { + val sql = s"SELECT T.id, T.len, T.content, D.name, D.age FROM T LEFT JOIN $userTable " + + "for system_time as of T.proctime AS D ON T.id = D.id " + + "AND T.len > 1 AND D.age > 20 AND D.name = 'Fabian' " + + "WHERE T.id > 1" + + val expected = Seq( + BatchTestBase.row(2, 15, "Hello", null, null), + BatchTestBase.row(3, 15, "Fabian", "Fabian", 33), + BatchTestBase.row(8, 11, "Hello world", null, null), + BatchTestBase.row(9, 12, "Hello world!", null, null)) + checkResult(sql, expected, false) } @Test def testJoinTemporalTable(): Unit = { - val sql = "SELECT T.id, T.len, T.content, D.name FROM T JOIN userTable " + + val sql = s"SELECT T.id, T.len, T.content, D.name FROM T JOIN $userTable " + "for system_time as of T.proctime AS D ON T.id = D.id" val expected = Seq( @@ -79,7 +146,7 @@ class LookupJoinITCase extends BatchTestBase { @Test def testJoinTemporalTableWithPushDown(): Unit = { - val sql = "SELECT T.id, T.len, T.content, D.name FROM T JOIN userTable " + + val sql = s"SELECT T.id, T.len, T.content, D.name FROM T JOIN $userTable " + "for system_time as of T.proctime AS D ON T.id = D.id AND D.age > 20" val expected = Seq( @@ -90,7 +157,7 @@ class LookupJoinITCase extends BatchTestBase { @Test def testJoinTemporalTableWithNonEqualFilter(): Unit = { - val sql = "SELECT T.id, T.len, T.content, D.name, D.age FROM T JOIN userTable " + + val sql = s"SELECT T.id, T.len, T.content, D.name, D.age FROM T JOIN $userTable " + "for system_time as of T.proctime AS D ON T.id = D.id WHERE T.len <= D.age" val expected = Seq( @@ -101,7 +168,7 @@ class LookupJoinITCase extends BatchTestBase { @Test def testJoinTemporalTableOnMultiFields(): Unit = { - val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " + + val sql = s"SELECT T.id, T.len, D.name FROM T JOIN $userTable " + "for system_time as of T.proctime AS D ON T.id = D.id AND T.content = D.name" val expected = Seq( @@ -112,7 +179,7 @@ class LookupJoinITCase extends BatchTestBase { @Test def testJoinTemporalTableOnMultiFieldsWithUdf(): Unit = { - val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " + + val sql = s"SELECT T.id, T.len, D.name FROM T JOIN $userTable " + "for system_time as of T.proctime AS D ON mod(T.id, 4) = D.id AND T.content = D.name" val expected = Seq( @@ -123,7 +190,7 @@ class LookupJoinITCase extends BatchTestBase { @Test def testJoinTemporalTableOnMultiKeyFields(): Unit = { - val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " + + val sql = s"SELECT T.id, T.len, D.name FROM T JOIN $userTable " + "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id" val expected = Seq( @@ -134,7 +201,7 @@ class LookupJoinITCase extends BatchTestBase { @Test def testLeftJoinTemporalTable(): Unit = { - val sql = "SELECT T.id, T.len, D.name, D.age FROM T LEFT JOIN userTable " + + val sql = s"SELECT T.id, T.len, D.name, D.age FROM T LEFT JOIN $userTable " + "for system_time as of T.proctime AS D ON T.id = D.id" val expected = Seq( @@ -147,88 +214,50 @@ class LookupJoinITCase extends BatchTestBase { } @Test - def testAsyncJoinTemporalTable(): Unit = { - // TODO: enable object reuse until [FLINK-12351] is fixed. - env.getConfig.disableObjectReuse() - val sql = "SELECT T.id, T.len, T.content, D.name FROM T JOIN userAsyncTable " + - "for system_time as of T.proctime AS D ON T.id = D.id" - - val expected = Seq( - BatchTestBase.row(1, 12, "Julian", "Julian"), - BatchTestBase.row(2, 15, "Hello", "Jark"), - BatchTestBase.row(3, 15, "Fabian", "Fabian")) - checkResult(sql, expected, false) - } - - @Test - def testAsyncJoinTemporalTableWithPushDown(): Unit = { - // TODO: enable object reuse until [FLINK-12351] is fixed. - env.getConfig.disableObjectReuse() - val sql = "SELECT T.id, T.len, T.content, D.name FROM T JOIN userAsyncTable " + - "for system_time as of T.proctime AS D ON T.id = D.id AND D.age > 20" + def testJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = { + val sql = s"SELECT T.id, T.len, D.name FROM nullableT T JOIN $userTableWithNull " + + "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id" val expected = Seq( - BatchTestBase.row(2, 15, "Hello", "Jark"), - BatchTestBase.row(3, 15, "Fabian", "Fabian")) + BatchTestBase.row(3,15,"Fabian")) checkResult(sql, expected, false) } @Test - def testAsyncJoinTemporalTableWithNonEqualFilter(): Unit = { - // TODO: enable object reuse until [FLINK-12351] is fixed. - env.getConfig.disableObjectReuse() - val sql = "SELECT T.id, T.len, T.content, D.name, D.age FROM T JOIN userAsyncTable " + - "for system_time as of T.proctime AS D ON T.id = D.id WHERE T.len <= D.age" - + def testLeftJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = { + val sql = s"SELECT D.id, T.len, D.name FROM nullableT T LEFT JOIN $userTableWithNull " + + "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id" val expected = Seq( - BatchTestBase.row(2, 15, "Hello", "Jark", 22), - BatchTestBase.row(3, 15, "Fabian", "Fabian", 33)) + BatchTestBase.row(null,15,null), + BatchTestBase.row(3,15,"Fabian"), + BatchTestBase.row(null,11,null), + BatchTestBase.row(null,12,null)) checkResult(sql, expected, false) } @Test - def testAsyncLeftJoinTemporalTableWithLocalPredicate(): Unit = { - // TODO: enable object reuse until [FLINK-12351] is fixed. - env.getConfig.disableObjectReuse() - val sql = "SELECT T.id, T.len, T.content, D.name, D.age FROM T LEFT JOIN userAsyncTable " + - "for system_time as of T.proctime AS D ON T.id = D.id " + - "AND T.len > 1 AND D.age > 20 AND D.name = 'Fabian' " + - "WHERE T.id > 1" - - val expected = Seq( - BatchTestBase.row(2, 15, "Hello", null, null), - BatchTestBase.row(3, 15, "Fabian", "Fabian", 33), - BatchTestBase.row(8, 11, "Hello world", null, null), - BatchTestBase.row(9, 12, "Hello world!", null, null)) + def testJoinTemporalTableOnNullConstantKey(): Unit = { + val sql = s"SELECT T.id, T.len, T.content FROM T JOIN $userTable " + + "for system_time as of T.proctime AS D ON D.id = null" + val expected = Seq() checkResult(sql, expected, false) } @Test - def testAsyncJoinTemporalTableOnMultiFields(): Unit = { - // TODO: enable object reuse until [FLINK-12351] is fixed. - env.getConfig.disableObjectReuse() - val sql = "SELECT T.id, T.len, D.name FROM T JOIN userAsyncTable " + - "for system_time as of T.proctime AS D ON T.id = D.id AND T.content = D.name" - - val expected = Seq( - BatchTestBase.row(1, 12, "Julian"), - BatchTestBase.row(3, 15, "Fabian")) + def testJoinTemporalTableOnMultiKeyFieldsWithNullConstantKey(): Unit = { + val sql = s"SELECT T.id, T.len, D.name FROM T JOIN $userTable " + + "for system_time as of T.proctime AS D ON T.content = D.name AND null = D.id" + val expected = Seq() checkResult(sql, expected, false) } +} - @Test - def testAsyncLeftJoinTemporalTable(): Unit = { - // TODO: enable object reuse until [FLINK-12351] is fixed. - env.getConfig.disableObjectReuse() - val sql = "SELECT T.id, T.len, D.name, D.age FROM T LEFT JOIN userAsyncTable " + - "for system_time as of T.proctime AS D ON T.id = D.id" +object LookupJoinITCase { - val expected = Seq( - BatchTestBase.row(1, 12, "Julian", 11), - BatchTestBase.row(2, 15, "Jark", 22), - BatchTestBase.row(3, 15, "Fabian", 33), - BatchTestBase.row(8, 11, null, null), - BatchTestBase.row(9, 12, null, null)) - checkResult(sql, expected, false) + @Parameterized.Parameters(name = "isAsyncMode = {0}") + def parameters(): util.Collection[Array[java.lang.Object]] = { + Seq[Array[AnyRef]]( + Array(Boolean.TRUE), Array(Boolean.FALSE) + ) } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala index c0dbdc7..f145e2b 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala @@ -26,7 +26,7 @@ import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils import org.apache.flink.table.planner.runtime.utils.{InMemoryLookupableTableSource, StreamingTestBase, TestingAppendSink} import org.apache.flink.types.Row -import org.junit.Assert.assertEquals +import org.junit.Assert.{assertEquals, assertTrue} import org.junit.Test import java.lang.{Integer => JInt, Long => JLong} @@ -46,6 +46,11 @@ class LookupJoinITCase extends StreamingTestBase { Row.of(null, new JInt(11), "Hello world"), Row.of(new JLong(9), new JInt(12), "Hello world!")) + val dataRowType:TypeInformation[Row] = new RowTypeInfo( + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO) + val userData = List( (11, 1L, "Julian"), (22, 2L, "Jark"), @@ -65,6 +70,20 @@ class LookupJoinITCase extends StreamingTestBase { .field("name", Types.STRING) .build() + val userDataWithNull = List( + (11, 1L, "Julian"), + (22, null, "Hello"), + (33, 3L, "Fabian"), + (44, null, "Hello world") + ) + + val userWithNullDataTableSourceWith2Keys = InMemoryLookupableTableSource.builder() + .data(userDataWithNull) + .field("age", Types.INT) + .field("id", Types.LONG) + .field("name", Types.STRING) + .build() + @Test def testJoinTemporalTable(): Unit = { val streamTable = env.fromCollection(data) @@ -137,11 +156,7 @@ class LookupJoinITCase extends StreamingTestBase { @Test def testJoinTemporalTableOnNullableKey(): Unit = { - - implicit val tpe: TypeInformation[Row] = new RowTypeInfo( - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO) + implicit val tpe: TypeInformation[Row] = dataRowType val streamTable = env.fromCollection(dataWithNull) .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime) tEnv.registerTable("T", streamTable) @@ -366,11 +381,7 @@ class LookupJoinITCase extends StreamingTestBase { @Test def testLeftJoinTemporalTableOnNullableKey(): Unit = { - - implicit val tpe: TypeInformation[Row] = new RowTypeInfo( - BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO) + implicit val tpe: TypeInformation[Row] = dataRowType val streamTable = env.fromCollection(dataWithNull) .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime) tEnv.registerTable("T", streamTable) @@ -418,4 +429,90 @@ class LookupJoinITCase extends StreamingTestBase { assertEquals(0, userTableSource.getResourceCounter) } + @Test + def testJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = { + implicit val tpe: TypeInformation[Row] = dataRowType + val streamTable = env.fromCollection(dataWithNull) + .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime) + tEnv.registerTable("T", streamTable) + + tEnv.registerTableSource("userTable", userWithNullDataTableSourceWith2Keys) + + val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " + + "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id" + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) + env.execute() + + val expected = Seq( + "3,15,Fabian") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + assertEquals(0, userTableSourceWith2Keys.getResourceCounter) + } + + @Test + def testLeftJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = { + implicit val tpe: TypeInformation[Row] = dataRowType + val streamTable = env.fromCollection(dataWithNull) + .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime) + tEnv.registerTable("T", streamTable) + + tEnv.registerTableSource("userTable", userWithNullDataTableSourceWith2Keys) + + val sql = "SELECT D.id, T.len, D.name FROM T LEFT JOIN userTable " + + "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id" + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) + env.execute() + + val expected = Seq( + "null,15,null", + "3,15,Fabian", + "null,11,null", + "null,12,null") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + assertEquals(0, userTableSourceWith2Keys.getResourceCounter) + } + + @Test + def testJoinTemporalTableOnNullConstantKey(): Unit = { + implicit val tpe: TypeInformation[Row] = dataRowType + val streamTable = env.fromCollection(dataWithNull) + .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime) + tEnv.registerTable("T", streamTable) + + tEnv.registerTableSource("userTable", userWithNullDataTableSourceWith2Keys) + + val sql = "SELECT T.id, T.len, T.content FROM T JOIN userTable " + + "for system_time as of T.proctime AS D ON D.id = null" + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) + env.execute() + + assertTrue(sink.getAppendResults.isEmpty) + assertEquals(0, userTableSource.getResourceCounter) + } + + @Test + def testJoinTemporalTableOnMultiKeyFieldsWithNullConstantKey(): Unit = { + val streamTable = env.fromCollection(data) + .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime) + tEnv.registerTable("T", streamTable) + + tEnv.registerTableSource("userTable", userTableSourceWith2Keys) + + val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " + + "for system_time as of T.proctime AS D ON T.content = D.name AND null = D.id" + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) + env.execute() + + assertTrue(sink.getAppendResults.isEmpty) + assertEquals(0, userTableSourceWith2Keys.getResourceCounter) + } + } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala index 33d1f60..9997741 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala @@ -205,6 +205,8 @@ object InMemoryLookupableTableSource { @varargs def eval(inputs: AnyRef*): Unit = { val key = Row.of(inputs: _*) + Preconditions.checkArgument(!inputs.contains(null), + s"Lookup key %s contains null value, which would not happen.", key) data.get(key) match { case Some(list) => list.foreach(result => collect(result)) case None => // do nothing @@ -236,8 +238,11 @@ object InMemoryLookupableTableSource { @varargs def eval(resultFuture: CompletableFuture[util.Collection[Row]], inputs: AnyRef*): Unit = { + val key = Row.of(inputs: _*) + Preconditions.checkArgument(!inputs.contains(null), + s"Lookup key %s contains null value, which would not happen.", key) CompletableFuture - .supplyAsync(new CollectionSupplier(data, Row.of(inputs: _*)), executor) + .supplyAsync(new CollectionSupplier(data, key), executor) .thenAccept(new CollectionConsumer(resultFuture)) }
