This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4b48eb89283f1d61514a4a52263d8291b724da90
Author: beyond1920 <[email protected]>
AuthorDate: Wed Jul 31 13:08:17 2019 +0800

    [FLINK-13433][table-planner-blink] Do not fetch data from 
LookupableTableSource if the JoinKey in left side of LookupJoin contains null 
value
    
    This closes #9285
---
 .../planner/codegen/LookupJoinCodeGenerator.scala  |  32 +++-
 .../plan/nodes/common/CommonLookupJoin.scala       |   2 +-
 .../runtime/batch/sql/join/LookupJoinITCase.scala  | 173 ++++++++++++---------
 .../runtime/stream/sql/LookupJoinITCase.scala      | 119 ++++++++++++--
 .../utils/InMemoryLookupableTableSource.scala      |   7 +-
 5 files changed, 240 insertions(+), 93 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
index 044d8dd..9a9bf2e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
@@ -65,7 +65,7 @@ object LookupJoinCodeGenerator {
     : GeneratedFunction[FlatMapFunction[BaseRow, BaseRow]] = {
 
     val ctx = CodeGeneratorContext(config)
-    val (prepareCode, parameters) = prepareParameters(
+    val (prepareCode, parameters, nullInParameters) = prepareParameters(
       ctx,
       typeFactory,
       inputType,
@@ -87,11 +87,17 @@ object LookupJoinCodeGenerator {
         s"$lookupFunctionTerm.setCollector($DEFAULT_COLLECTOR_TERM);"
     }
 
+    // TODO: filter all records when there is any nulls on the join key, 
because
+    //  "IS NOT DISTINCT FROM" is not supported yet.
     val body =
       s"""
          |$prepareCode
          |$setCollectorCode
-         |$lookupFunctionTerm.eval($parameters);
+         |if ($nullInParameters) {
+         |  return;
+         |} else {
+         |  $lookupFunctionTerm.eval($parameters);
+         | }
       """.stripMargin
 
     FunctionCodeGenerator.generateFunction(
@@ -118,7 +124,7 @@ object LookupJoinCodeGenerator {
     : GeneratedFunction[AsyncFunction[BaseRow, AnyRef]] = {
 
     val ctx = CodeGeneratorContext(config)
-    val (prepareCode, parameters) = prepareParameters(
+    val (prepareCode, parameters, nullInParameters) = prepareParameters(
       ctx,
       typeFactory,
       inputType,
@@ -130,11 +136,18 @@ object LookupJoinCodeGenerator {
     val lookupFunctionTerm = ctx.addReusableFunction(asyncLookupFunction)
     val DELEGATE = className[DelegatingResultFuture[_]]
 
+    // TODO: filter all records when there is any nulls on the join key, 
because
+    //  "IS NOT DISTINCT FROM" is not supported yet.
     val body =
       s"""
          |$prepareCode
-         |$DELEGATE delegates = new $DELEGATE($DEFAULT_COLLECTOR_TERM);
-         |$lookupFunctionTerm.eval(delegates.getCompletableFuture(), 
$parameters);
+         |if ($nullInParameters) {
+         |  
$DEFAULT_COLLECTOR_TERM.complete(java.util.Collections.emptyList());
+         |  return;
+         |} else {
+         |  $DELEGATE delegates = new $DELEGATE($DEFAULT_COLLECTOR_TERM);
+         |  $lookupFunctionTerm.eval(delegates.getCompletableFuture(), 
$parameters);
+         |}
       """.stripMargin
 
     FunctionCodeGenerator.generateFunction(
@@ -156,7 +169,7 @@ object LookupJoinCodeGenerator {
       lookupKeyInOrder: Array[Int],
       allLookupFields: Map[Int, LookupKey],
       isExternalArgs: Boolean,
-      fieldCopy: Boolean): (String, String) = {
+      fieldCopy: Boolean): (String, String, String) = {
 
     val inputFieldExprs = for (i <- lookupKeyInOrder) yield {
       allLookupFields.get(i) match {
@@ -195,9 +208,12 @@ object LookupJoinCodeGenerator {
              |  $newTerm = $assign;
              |}
              """.stripMargin
-        (code, newTerm)
+        (code, newTerm, e.nullTerm)
       }
-    (codeAndArg.map(_._1).mkString("\n"), codeAndArg.map(_._2).mkString(", "))
+    (
+      codeAndArg.map(_._1).mkString("\n"),
+      codeAndArg.map(_._2).mkString(", "),
+      codeAndArg.map(_._3).mkString("|| "))
   }
 
   /**
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
index 7a2b133..88800da 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
@@ -560,7 +560,7 @@ abstract class CommonLookupJoin(
       joinType: JoinRelType): Unit = {
 
     // check join on all fields of PRIMARY KEY or (UNIQUE) INDEX
-    if (allLookupKeys.isEmpty || allLookupKeys.isEmpty) {
+    if (allLookupKeys.isEmpty) {
       throw new TableException(
         "Temporal table join requires an equality condition on fields of " +
           s"table [${tableSource.explainSource()}].")
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
index fe141f9..6fac55e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
@@ -22,9 +22,17 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.planner.runtime.utils.{BatchTableEnvUtil, 
BatchTestBase, InMemoryLookupableTableSource}
 
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
 import org.junit.{Before, Test}
 
-class LookupJoinITCase extends BatchTestBase {
+import java.lang.Boolean
+import java.util
+
+import scala.collection.JavaConversions._
+
+@RunWith(classOf[Parameterized])
+class LookupJoinITCase(isAsyncMode: Boolean) extends BatchTestBase {
 
   val data = List(
     BatchTestBase.row(1L, 12L, "Julian"),
@@ -33,6 +41,12 @@ class LookupJoinITCase extends BatchTestBase {
     BatchTestBase.row(8L, 11L, "Hello world"),
     BatchTestBase.row(9L, 12L, "Hello world!"))
 
+  val dataWithNull = List(
+    BatchTestBase.row(null, 15L, "Hello"),
+    BatchTestBase.row(3L, 15L, "Fabian"),
+    BatchTestBase.row(null, 11L, "Hello world"),
+    BatchTestBase.row(9L, 12L, "Hello world!"))
+
   val typeInfo = new RowTypeInfo(LONG_TYPE_INFO, LONG_TYPE_INFO, 
STRING_TYPE_INFO)
 
   val userData = List(
@@ -55,19 +69,72 @@ class LookupJoinITCase extends BatchTestBase {
     .enableAsync()
     .build()
 
+  val userDataWithNull = List(
+    (11, 1L, "Julian"),
+    (22, null, "Hello"),
+    (33, 3L, "Fabian"),
+    (44, null, "Hello world"))
+
+  val userWithNullDataTableSource = InMemoryLookupableTableSource.builder()
+    .data(userDataWithNull)
+    .field("age", Types.INT)
+    .field("id", Types.LONG)
+    .field("name", Types.STRING)
+    .build()
+
+  val userAsyncWithNullDataTableSource = 
InMemoryLookupableTableSource.builder()
+    .data(userDataWithNull)
+    .field("age", Types.INT)
+    .field("id", Types.LONG)
+    .field("name", Types.STRING)
+    .enableAsync()
+    .build()
+
+  var userTable: String = _
+  var userTableWithNull: String = _
+
   @Before
   override def before() {
     super.before()
     BatchTableEnvUtil.registerCollection(tEnv, "T0", data, typeInfo, "id, len, 
content")
     val myTable = tEnv.sqlQuery("SELECT *, PROCTIME() as proctime  FROM T0")
     tEnv.registerTable("T", myTable)
+
+    BatchTableEnvUtil.registerCollection(
+      tEnv, "T1", dataWithNull, typeInfo, "id, len, content")
+    val myTable1 = tEnv.sqlQuery("SELECT *, PROCTIME() as proctime  FROM T1")
+    tEnv.registerTable("nullableT", myTable1)
+
     tEnv.registerTableSource("userTable", userTableSource)
     tEnv.registerTableSource("userAsyncTable", userAsyncTableSource)
+    userTable = if (isAsyncMode) "userAsyncTable" else "userTable"
+
+    tEnv.registerTableSource("userWithNullDataTable", 
userWithNullDataTableSource)
+    tEnv.registerTableSource("userWithNullDataAsyncTable", 
userAsyncWithNullDataTableSource)
+    userTableWithNull = if (isAsyncMode) "userWithNullDataAsyncTable" else 
"userWithNullDataTable"
+
+    // TODO: enable object reuse until [FLINK-12351] is fixed.
+    env.getConfig.disableObjectReuse()
+  }
+
+  @Test
+  def testLeftJoinTemporalTableWithLocalPredicate(): Unit = {
+    val sql = s"SELECT T.id, T.len, T.content, D.name, D.age FROM T LEFT JOIN 
$userTable " +
+      "for system_time as of T.proctime AS D ON T.id = D.id " +
+      "AND T.len > 1 AND D.age > 20 AND D.name = 'Fabian' " +
+      "WHERE T.id > 1"
+
+    val expected = Seq(
+      BatchTestBase.row(2, 15, "Hello", null, null),
+      BatchTestBase.row(3, 15, "Fabian", "Fabian", 33),
+      BatchTestBase.row(8, 11, "Hello world", null, null),
+      BatchTestBase.row(9, 12, "Hello world!", null, null))
+    checkResult(sql, expected, false)
   }
 
   @Test
   def testJoinTemporalTable(): Unit = {
-    val sql = "SELECT T.id, T.len, T.content, D.name FROM T JOIN userTable " +
+    val sql = s"SELECT T.id, T.len, T.content, D.name FROM T JOIN $userTable " 
+
       "for system_time as of T.proctime AS D ON T.id = D.id"
 
     val expected = Seq(
@@ -79,7 +146,7 @@ class LookupJoinITCase extends BatchTestBase {
 
   @Test
   def testJoinTemporalTableWithPushDown(): Unit = {
-    val sql = "SELECT T.id, T.len, T.content, D.name FROM T JOIN userTable " +
+    val sql = s"SELECT T.id, T.len, T.content, D.name FROM T JOIN $userTable " 
+
       "for system_time as of T.proctime AS D ON T.id = D.id AND D.age > 20"
 
     val expected = Seq(
@@ -90,7 +157,7 @@ class LookupJoinITCase extends BatchTestBase {
 
   @Test
   def testJoinTemporalTableWithNonEqualFilter(): Unit = {
-    val sql = "SELECT T.id, T.len, T.content, D.name, D.age FROM T JOIN 
userTable " +
+    val sql = s"SELECT T.id, T.len, T.content, D.name, D.age FROM T JOIN 
$userTable " +
       "for system_time as of T.proctime AS D ON T.id = D.id WHERE T.len <= 
D.age"
 
     val expected = Seq(
@@ -101,7 +168,7 @@ class LookupJoinITCase extends BatchTestBase {
 
   @Test
   def testJoinTemporalTableOnMultiFields(): Unit = {
-    val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
+    val sql = s"SELECT T.id, T.len, D.name FROM T JOIN $userTable " +
       "for system_time as of T.proctime AS D ON T.id = D.id AND T.content = 
D.name"
 
     val expected = Seq(
@@ -112,7 +179,7 @@ class LookupJoinITCase extends BatchTestBase {
 
   @Test
   def testJoinTemporalTableOnMultiFieldsWithUdf(): Unit = {
-    val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
+    val sql = s"SELECT T.id, T.len, D.name FROM T JOIN $userTable " +
       "for system_time as of T.proctime AS D ON mod(T.id, 4) = D.id AND 
T.content = D.name"
 
     val expected = Seq(
@@ -123,7 +190,7 @@ class LookupJoinITCase extends BatchTestBase {
 
   @Test
   def testJoinTemporalTableOnMultiKeyFields(): Unit = {
-    val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
+    val sql = s"SELECT T.id, T.len, D.name FROM T JOIN $userTable " +
       "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = 
D.id"
 
     val expected = Seq(
@@ -134,7 +201,7 @@ class LookupJoinITCase extends BatchTestBase {
 
   @Test
   def testLeftJoinTemporalTable(): Unit = {
-    val sql = "SELECT T.id, T.len, D.name, D.age FROM T LEFT JOIN userTable " +
+    val sql = s"SELECT T.id, T.len, D.name, D.age FROM T LEFT JOIN $userTable 
" +
       "for system_time as of T.proctime AS D ON T.id = D.id"
 
     val expected = Seq(
@@ -147,88 +214,50 @@ class LookupJoinITCase extends BatchTestBase {
   }
 
   @Test
-  def testAsyncJoinTemporalTable(): Unit = {
-    // TODO: enable object reuse until [FLINK-12351] is fixed.
-    env.getConfig.disableObjectReuse()
-    val sql = "SELECT T.id, T.len, T.content, D.name FROM T JOIN 
userAsyncTable " +
-      "for system_time as of T.proctime AS D ON T.id = D.id"
-
-    val expected = Seq(
-      BatchTestBase.row(1, 12, "Julian", "Julian"),
-      BatchTestBase.row(2, 15, "Hello", "Jark"),
-      BatchTestBase.row(3, 15, "Fabian", "Fabian"))
-    checkResult(sql, expected, false)
-  }
-
-  @Test
-  def testAsyncJoinTemporalTableWithPushDown(): Unit = {
-    // TODO: enable object reuse until [FLINK-12351] is fixed.
-    env.getConfig.disableObjectReuse()
-    val sql = "SELECT T.id, T.len, T.content, D.name FROM T JOIN 
userAsyncTable " +
-      "for system_time as of T.proctime AS D ON T.id = D.id AND D.age > 20"
+  def testJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
+    val sql = s"SELECT T.id, T.len, D.name FROM nullableT T JOIN 
$userTableWithNull " +
+      "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = 
D.id"
 
     val expected = Seq(
-      BatchTestBase.row(2, 15, "Hello", "Jark"),
-      BatchTestBase.row(3, 15, "Fabian", "Fabian"))
+      BatchTestBase.row(3,15,"Fabian"))
     checkResult(sql, expected, false)
   }
 
   @Test
-  def testAsyncJoinTemporalTableWithNonEqualFilter(): Unit = {
-    // TODO: enable object reuse until [FLINK-12351] is fixed.
-    env.getConfig.disableObjectReuse()
-    val sql = "SELECT T.id, T.len, T.content, D.name, D.age FROM T JOIN 
userAsyncTable " +
-      "for system_time as of T.proctime AS D ON T.id = D.id WHERE T.len <= 
D.age"
-
+  def testLeftJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
+    val sql = s"SELECT D.id, T.len, D.name FROM nullableT T LEFT JOIN 
$userTableWithNull " +
+      "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = 
D.id"
     val expected = Seq(
-      BatchTestBase.row(2, 15, "Hello", "Jark", 22),
-      BatchTestBase.row(3, 15, "Fabian", "Fabian", 33))
+      BatchTestBase.row(null,15,null),
+      BatchTestBase.row(3,15,"Fabian"),
+      BatchTestBase.row(null,11,null),
+      BatchTestBase.row(null,12,null))
     checkResult(sql, expected, false)
   }
 
   @Test
-  def testAsyncLeftJoinTemporalTableWithLocalPredicate(): Unit = {
-    // TODO: enable object reuse until [FLINK-12351] is fixed.
-    env.getConfig.disableObjectReuse()
-    val sql = "SELECT T.id, T.len, T.content, D.name, D.age FROM T LEFT JOIN 
userAsyncTable " +
-      "for system_time as of T.proctime AS D ON T.id = D.id " +
-      "AND T.len > 1 AND D.age > 20 AND D.name = 'Fabian' " +
-      "WHERE T.id > 1"
-
-    val expected = Seq(
-      BatchTestBase.row(2, 15, "Hello", null, null),
-      BatchTestBase.row(3, 15, "Fabian", "Fabian", 33),
-      BatchTestBase.row(8, 11, "Hello world", null, null),
-      BatchTestBase.row(9, 12, "Hello world!", null, null))
+  def testJoinTemporalTableOnNullConstantKey(): Unit = {
+    val sql = s"SELECT T.id, T.len, T.content FROM T JOIN $userTable " +
+      "for system_time as of T.proctime AS D ON D.id = null"
+    val expected = Seq()
     checkResult(sql, expected, false)
   }
 
   @Test
-  def testAsyncJoinTemporalTableOnMultiFields(): Unit = {
-    // TODO: enable object reuse until [FLINK-12351] is fixed.
-    env.getConfig.disableObjectReuse()
-    val sql = "SELECT T.id, T.len, D.name FROM T JOIN userAsyncTable " +
-      "for system_time as of T.proctime AS D ON T.id = D.id AND T.content = 
D.name"
-
-    val expected = Seq(
-      BatchTestBase.row(1, 12, "Julian"),
-      BatchTestBase.row(3, 15, "Fabian"))
+  def testJoinTemporalTableOnMultiKeyFieldsWithNullConstantKey(): Unit = {
+    val sql = s"SELECT T.id, T.len, D.name FROM T JOIN $userTable " +
+      "for system_time as of T.proctime AS D ON T.content = D.name AND null = 
D.id"
+    val expected = Seq()
     checkResult(sql, expected, false)
   }
+}
 
-  @Test
-  def testAsyncLeftJoinTemporalTable(): Unit = {
-    // TODO: enable object reuse until [FLINK-12351] is fixed.
-    env.getConfig.disableObjectReuse()
-    val sql = "SELECT T.id, T.len, D.name, D.age FROM T LEFT JOIN 
userAsyncTable " +
-      "for system_time as of T.proctime AS D ON T.id = D.id"
+object LookupJoinITCase {
 
-    val expected = Seq(
-      BatchTestBase.row(1, 12, "Julian", 11),
-      BatchTestBase.row(2, 15, "Jark", 22),
-      BatchTestBase.row(3, 15, "Fabian", 33),
-      BatchTestBase.row(8, 11, null, null),
-      BatchTestBase.row(9, 12, null, null))
-    checkResult(sql, expected, false)
+  @Parameterized.Parameters(name = "isAsyncMode = {0}")
+  def parameters(): util.Collection[Array[java.lang.Object]] = {
+    Seq[Array[AnyRef]](
+      Array(Boolean.TRUE), Array(Boolean.FALSE)
+    )
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
index c0dbdc7..f145e2b 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
@@ -26,7 +26,7 @@ import 
org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils
 import 
org.apache.flink.table.planner.runtime.utils.{InMemoryLookupableTableSource, 
StreamingTestBase, TestingAppendSink}
 import org.apache.flink.types.Row
 
-import org.junit.Assert.assertEquals
+import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.Test
 
 import java.lang.{Integer => JInt, Long => JLong}
@@ -46,6 +46,11 @@ class LookupJoinITCase extends StreamingTestBase {
     Row.of(null, new JInt(11), "Hello world"),
     Row.of(new JLong(9), new JInt(12), "Hello world!"))
 
+  val dataRowType:TypeInformation[Row] = new RowTypeInfo(
+    BasicTypeInfo.LONG_TYPE_INFO,
+    BasicTypeInfo.INT_TYPE_INFO,
+    BasicTypeInfo.STRING_TYPE_INFO)
+
   val userData = List(
     (11, 1L, "Julian"),
     (22, 2L, "Jark"),
@@ -65,6 +70,20 @@ class LookupJoinITCase extends StreamingTestBase {
     .field("name", Types.STRING)
     .build()
 
+  val userDataWithNull = List(
+    (11, 1L, "Julian"),
+    (22, null, "Hello"),
+    (33, 3L, "Fabian"),
+    (44, null, "Hello world")
+  )
+
+  val userWithNullDataTableSourceWith2Keys = 
InMemoryLookupableTableSource.builder()
+    .data(userDataWithNull)
+    .field("age", Types.INT)
+    .field("id", Types.LONG)
+    .field("name", Types.STRING)
+    .build()
+
   @Test
   def testJoinTemporalTable(): Unit = {
     val streamTable = env.fromCollection(data)
@@ -137,11 +156,7 @@ class LookupJoinITCase extends StreamingTestBase {
 
   @Test
   def testJoinTemporalTableOnNullableKey(): Unit = {
-
-    implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
-      BasicTypeInfo.LONG_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO)
+    implicit val tpe: TypeInformation[Row] = dataRowType
     val streamTable = env.fromCollection(dataWithNull)
       .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
     tEnv.registerTable("T", streamTable)
@@ -366,11 +381,7 @@ class LookupJoinITCase extends StreamingTestBase {
 
   @Test
   def testLeftJoinTemporalTableOnNullableKey(): Unit = {
-
-    implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
-      BasicTypeInfo.LONG_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO)
+    implicit val tpe: TypeInformation[Row] = dataRowType
     val streamTable = env.fromCollection(dataWithNull)
       .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
     tEnv.registerTable("T", streamTable)
@@ -418,4 +429,90 @@ class LookupJoinITCase extends StreamingTestBase {
     assertEquals(0, userTableSource.getResourceCounter)
   }
 
+  @Test
+  def testJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
+    implicit val tpe: TypeInformation[Row] = dataRowType
+    val streamTable = env.fromCollection(dataWithNull)
+      .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
+    tEnv.registerTable("T", streamTable)
+
+    tEnv.registerTableSource("userTable", userWithNullDataTableSourceWith2Keys)
+
+    val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
+      "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = 
D.id"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = Seq(
+      "3,15,Fabian")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+    assertEquals(0, userTableSourceWith2Keys.getResourceCounter)
+  }
+
+  @Test
+  def testLeftJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
+    implicit val tpe: TypeInformation[Row] = dataRowType
+    val streamTable = env.fromCollection(dataWithNull)
+      .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
+    tEnv.registerTable("T", streamTable)
+
+    tEnv.registerTableSource("userTable", userWithNullDataTableSourceWith2Keys)
+
+    val sql = "SELECT D.id, T.len, D.name FROM T LEFT JOIN userTable " +
+      "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = 
D.id"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = Seq(
+      "null,15,null",
+      "3,15,Fabian",
+      "null,11,null",
+      "null,12,null")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+    assertEquals(0, userTableSourceWith2Keys.getResourceCounter)
+  }
+
+  @Test
+  def testJoinTemporalTableOnNullConstantKey(): Unit = {
+    implicit val tpe: TypeInformation[Row] = dataRowType
+    val streamTable = env.fromCollection(dataWithNull)
+      .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
+    tEnv.registerTable("T", streamTable)
+
+    tEnv.registerTableSource("userTable", userWithNullDataTableSourceWith2Keys)
+
+    val sql = "SELECT T.id, T.len, T.content FROM T JOIN userTable " +
+      "for system_time as of T.proctime AS D ON D.id = null"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    assertTrue(sink.getAppendResults.isEmpty)
+    assertEquals(0, userTableSource.getResourceCounter)
+  }
+
+  @Test
+  def testJoinTemporalTableOnMultiKeyFieldsWithNullConstantKey(): Unit = {
+    val streamTable = env.fromCollection(data)
+      .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
+    tEnv.registerTable("T", streamTable)
+
+    tEnv.registerTableSource("userTable", userTableSourceWith2Keys)
+
+    val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
+      "for system_time as of T.proctime AS D ON T.content = D.name AND null = 
D.id"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    assertTrue(sink.getAppendResults.isEmpty)
+    assertEquals(0, userTableSourceWith2Keys.getResourceCounter)
+  }
+
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala
index 33d1f60..9997741 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala
@@ -205,6 +205,8 @@ object InMemoryLookupableTableSource {
     @varargs
     def eval(inputs: AnyRef*): Unit = {
       val key = Row.of(inputs: _*)
+      Preconditions.checkArgument(!inputs.contains(null),
+        s"Lookup key %s contains null value, which would not happen.", key)
       data.get(key) match {
         case Some(list) => list.foreach(result => collect(result))
         case None => // do nothing
@@ -236,8 +238,11 @@ object InMemoryLookupableTableSource {
 
     @varargs
     def eval(resultFuture: CompletableFuture[util.Collection[Row]], inputs: 
AnyRef*): Unit = {
+      val key = Row.of(inputs: _*)
+      Preconditions.checkArgument(!inputs.contains(null),
+        s"Lookup key %s contains null value, which would not happen.", key)
       CompletableFuture
-        .supplyAsync(new CollectionSupplier(data, Row.of(inputs: _*)), 
executor)
+        .supplyAsync(new CollectionSupplier(data, key), executor)
         .thenAccept(new CollectionConsumer(resultFuture))
     }
 

Reply via email to