fresh-borzoni commented on code in PR #28090:
URL: https://github.com/apache/flink/pull/28090#discussion_r3373773768


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/ChangelogSourceITCase.scala:
##########
@@ -441,6 +441,189 @@ class ChangelogSourceITCase(
     }
   }
 
+  @TestTemplate
+  def testFilterPushedDownOnNonUpsertKey(): Unit = {
+    // FLINK-38579: Filter pushed down to source on non-upsert key should 
require UPDATE_BEFORE
+
+    val testDataId = TestValuesTableFactory.registerData(
+      Seq(
+        changelogRow("+I", Int.box(1), "tom", Int.box(1)),
+        changelogRow("-U", Int.box(1), "tom", Int.box(1)),
+        changelogRow("+U", Int.box(1), "tom", Int.box(2))
+      ))
+    tEnv.executeSql(s"""
+                       |CREATE TABLE t (
+                       |  a int primary key not enforced,
+                       |  b varchar,
+                       |  c int
+                       |) WITH (
+                       |  'connector' = 'values',
+                       |  'data-id' = '$testDataId',
+                       |  'changelog-mode' = 'I,UA,UB,D',
+                       |  'filterable-fields' = 'c'
+                       |)
+                       |""".stripMargin)
+
+    tEnv.executeSql(s"""
+                       |CREATE TABLE s (
+                       |  a int primary key not enforced,
+                       |  b varchar,
+                       |  c int
+                       |) WITH (
+                       |  'connector' = 'values',
+                       |  'sink-insert-only' = 'false',
+                       |  'sink-changelog-mode-enforced' = 'I,UA,D'
+                       |)
+                       |""".stripMargin)
+
+    // CDC duplicate + MiniBatch is incompatible: ChangelogNormalize (needed 
for CDC deduplication)
+    // requires ONLY_UPDATE_AFTER at the source level, but filter on 
non-upsert key requires UPDATE_BEFORE
+    if (sourceMode == CHANGELOG_SOURCE_WITH_EVENTS_DUPLICATE && miniBatch == 
MiniBatchOn) {
+      assertThatThrownBy(() => tEnv.executeSql("insert into s select * from t 
where c < 2"))
+        .isInstanceOf(classOf[org.apache.flink.table.api.TableException])
+        .hasMessageContaining("Can't generate a valid execution plan")

Review Comment:
   Fixed it here. ChangelogNormalize now accepts BEFORE_AND_AFTER from its 
child, so that combination plans and produces the right result instead of 
erroring. Added test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to