PatrickRen commented on code in PR #20480:
URL: https://github.com/apache/flink/pull/20480#discussion_r939658739


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala:
##########
@@ -305,19 +320,147 @@ class AsyncLookupJoinITCase(
     fail("NumberFormatException is expected here!")
   }
 
+  @Test
+  def testLookupCacheSharingAcrossSubtasks(): Unit = {
+    if (!enableCache) {
+      return
+    }
+    // Keep the cache for later validation
+    LookupCacheManager.keepCacheOnRelease(true)
+    try {
+      // Use datagen source here to support parallel running
+      val sourceDdl =
+        s"""
+           |CREATE TABLE T (
+           |  id BIGINT,
+           |  proc AS PROCTIME()
+           |) WITH (
+           |  'connector' = 'datagen',
+           |  'fields.id.kind' = 'sequence',
+           |  'fields.id.start' = '1',
+           |  'fields.id.end' = '6'
+           |)
+           |""".stripMargin
+      tEnv.executeSql(sourceDdl)
+      val sql =
+        """
+          |SELECT T.id, D.name, D.age FROM T 
+          |LEFT JOIN user_table FOR SYSTEM_TIME AS OF T.proc AS D 
+          |ON T.id = D.id
+          |""".stripMargin
+      val sink = new TestingAppendSink
+      tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+      env.execute()
+
+      // Validate that only one cache is registered
+      val managedCaches = LookupCacheManager.getInstance().getManagedCaches
+      assertThat(managedCaches.size()).isEqualTo(1)
+
+      // Validate 6 entries are cached
+      val cache = 
managedCaches.get(managedCaches.keySet().iterator().next()).getCache
+      assertThat(cache.size()).isEqualTo(6)
+
+      // Validate contents of cached entries
+      assertThatIterable(cache.getIfPresent(GenericRowData.of(jl(1L))))
+        .containsExactlyInAnyOrder(
+          GenericRowData.of(ji(11), jl(1L), 
BinaryStringData.fromString("Julian")))
+      assertThatIterable(cache.getIfPresent(GenericRowData.of(jl(2L))))
+        .containsExactlyInAnyOrder(
+          GenericRowData.of(ji(22), jl(2L), 
BinaryStringData.fromString("Jark")))
+      assertThatIterable(cache.getIfPresent(GenericRowData.of(jl(3L))))
+        .containsExactlyInAnyOrder(
+          GenericRowData.of(ji(33), jl(3L), 
BinaryStringData.fromString("Fabian")))
+      
assertThatIterable(cache.getIfPresent(GenericRowData.of(jl(4L)))).isEmpty()
+    } finally {
+      LookupCacheManager.getInstance().checkAllReleased()
+      LookupCacheManager.getInstance().clear()
+      LookupCacheManager.keepCacheOnRelease(false)
+    }
+  }
+
+  def ji(i: Int): java.lang.Integer = {
+    new java.lang.Integer(i)
+  }
+
+  def jl(l: Long): java.lang.Long = {
+    new java.lang.Long(l)
+  }
+
 }
 
 object AsyncLookupJoinITCase {
-  @Parameterized.Parameters(
-    name = "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}, 
AsyncOutputMode={3}")
+
+  val LEGACY_TABLE_SOURCE: JBoolean = JBoolean.TRUE;
+  val DYNAMIC_TABLE_SOURCE: JBoolean = JBoolean.FALSE;
+  val ENABLE_OBJECT_REUSE: JBoolean = JBoolean.TRUE;
+  val DISABLE_OBJECT_REUSE: JBoolean = JBoolean.FALSE;
+  val ENABLE_CACHE: JBoolean = JBoolean.TRUE;
+  val DISABLE_CACHE: JBoolean = JBoolean.FALSE;
+
+  @Parameterized.Parameters(name =
+    "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}, 
AsyncOutputMode={3}, EnableCache={4}")
   def parameters(): JCollection[Array[Object]] = {
     Seq[Array[AnyRef]](
-      Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE, 
AsyncOutputMode.ALLOW_UNORDERED),
-      Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE, 
AsyncOutputMode.ORDERED),
-      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE, 
AsyncOutputMode.ORDERED),
-      Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.TRUE, 
AsyncOutputMode.ORDERED),
-      Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.FALSE, 
AsyncOutputMode.ALLOW_UNORDERED),
-      Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE, 
AsyncOutputMode.ALLOW_UNORDERED)
+      Array(
+        LEGACY_TABLE_SOURCE,
+        HEAP_BACKEND,
+        ENABLE_OBJECT_REUSE,
+        AsyncOutputMode.ALLOW_UNORDERED,
+        DISABLE_CACHE),
+      Array(
+        LEGACY_TABLE_SOURCE,
+        ROCKSDB_BACKEND,
+        DISABLE_OBJECT_REUSE,
+        AsyncOutputMode.ORDERED,
+        DISABLE_CACHE),
+      Array(
+        DYNAMIC_TABLE_SOURCE,
+        HEAP_BACKEND,
+        DISABLE_OBJECT_REUSE,
+        AsyncOutputMode.ORDERED,
+        DISABLE_CACHE),
+      Array(
+        DYNAMIC_TABLE_SOURCE,
+        HEAP_BACKEND,
+        ENABLE_OBJECT_REUSE,
+        AsyncOutputMode.ORDERED,
+        DISABLE_CACHE),
+      Array(
+        DYNAMIC_TABLE_SOURCE,
+        ROCKSDB_BACKEND,
+        DISABLE_OBJECT_REUSE,
+        AsyncOutputMode.ALLOW_UNORDERED,
+        DISABLE_CACHE),
+      Array(
+        DYNAMIC_TABLE_SOURCE,
+        ROCKSDB_BACKEND,
+        ENABLE_OBJECT_REUSE,
+        AsyncOutputMode.ALLOW_UNORDERED,
+        DISABLE_CACHE),
+      Array(
+        DYNAMIC_TABLE_SOURCE,
+        HEAP_BACKEND,
+        DISABLE_OBJECT_REUSE,
+        AsyncOutputMode.ORDERED,
+        ENABLE_CACHE),
+      Array(
+        DYNAMIC_TABLE_SOURCE,
+        HEAP_BACKEND,
+        ENABLE_OBJECT_REUSE,
+        AsyncOutputMode.ORDERED,
+        ENABLE_CACHE),
+      Array(
+        DYNAMIC_TABLE_SOURCE,
+        ROCKSDB_BACKEND,
+        DISABLE_OBJECT_REUSE,
+        AsyncOutputMode.ALLOW_UNORDERED,
+        ENABLE_CACHE),
+      Array(
+        DYNAMIC_TABLE_SOURCE,
+        ROCKSDB_BACKEND,
+        ENABLE_OBJECT_REUSE,
+        AsyncOutputMode.ALLOW_UNORDERED,
+        ENABLE_CACHE)

Review Comment:
   I'll remove the ROCKSDB_BACKEND cases with caching as these two seems 
unrelated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to