xuyangzhong commented on code in PR #27698:
URL: https://github.com/apache/flink/pull/27698#discussion_r2922006238


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala:
##########
@@ -775,6 +775,70 @@ class DeltaJoinITCase(enableCache: Boolean) extends 
StreamingTestBase {
     assertThat(AsyncTestValueLookupFunction.invokeCount.get()).isEqualTo(6)
   }
 
+  @TestTemplate
+  def testLookupJoinAfterJoin(): Unit = {
+    val data1 = List(
+      changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 
1, 1, 1, 1, 1)),
+      changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022, 
2, 2, 2, 2, 2)),
+      // mismatch
+      changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2033, 
3, 3, 3, 3, 3))
+    )
+
+    val data2 = List(
+      changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021, 
1, 1, 1, 1, 11)),
+      changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022, 
2, 2, 2, 2, 22)),
+      // mismatch
+      changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099, 
2, 2, 2, 2, 2))
+    )
+
+    prepareTable(List("a0"), List("b0"), data1, data2)
+
+    val dimData = List(
+      changelogRow("+I", Int.box(2), "s2"),
+      // mismatch
+      changelogRow("+I", Int.box(4), "s4")
+    )
+
+    tEnv.executeSql(s"""
+                       |create table dim (
+                       |  id int primary key not enforced,
+                       |  dim_value string
+                       |) with (
+                       |  'connector' = 'values',
+                       |  'data-id' = 
'${TestValuesTableFactory.registerData(dimData)}'
+                       |)""".stripMargin)
+
+    // TestValuesRuntimeFunctions#KeyedUpsertingSinkFunction will change the 
RowKind from
+    // "+U" to "+I"
+    val expected = List(
+      "+I[2.0, 2, 2022-02-02T02:02:02, 2, 2.0, 2022-02-02T02:02:22, s2]"
+    )
+
+    tEnv.executeSql("alter table testSnk add (dim_value string)")
+
+    tEnv
+      .executeSql("""
+                    |insert into testSnk
+                    | select a1, a0, a2, b0, b1, b2, dim_value
+                    | from (
+                    |   select
+                    |     *, proctime() as pt
+                    |   from testLeft
+                    |   join testRight
+                    |     on a0 = b0
+                    | ) tmp
+                    | join dim
+                    |   for system_time as of pt
+                    |     on tmp.a0 = dim.id
+                    | on conflict do deduplicate
+                    |""".stripMargin)
+      .await()
+    val result = TestValuesTableFactory.getResultsAsStrings("testSnk")
+
+    assertThat(result.sorted).isEqualTo(expected.sorted)

Review Comment:
   `withExpectedLookupFunctionInvokeCount` is used in the `TestSpec` for 
templated SQL in delta join tests. Here, since the SQL that combines a delta 
join followed by a lookup join is more customized, I didn’t use `TestSpec`. 
Instead of `withExpectedLookupFunctionInvokeCount`, I directly used 
`assertThat(AsyncTestValueLookupFunction.invokeCount.get()).isEqualTo(6)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to