tzulitai commented on code in PR #22:
URL: 
https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159159130


##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java:
##########
@@ -388,6 +391,201 @@ public void testKafkaSourceSinkWithKeyAndFullValue() 
throws Exception {
         deleteTestTopic(topic);
     }
 
+    @Test
+    public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws 
Exception {
+        final String topic = "bounded_upsert_" + format + "_" + 
UUID.randomUUID();
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka 
-------------------
+        final String bootstraps = getBootstrapServers();
+
+        // table with upsert-kafka connector, bounded mode up to offset=2
+        final String createTableSql =
+                String.format(
+                        "CREATE TABLE upsert_kafka (\n"
+                                + "  `user_id` BIGINT,\n"
+                                + "  `event_id` BIGINT,\n"
+                                + "  `payload` STRING,\n"
+                                + "  PRIMARY KEY (event_id, user_id) NOT 
ENFORCED"
+                                + ") WITH (\n"
+                                + "  'connector' = 'upsert-kafka',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'key.format' = '%s',\n"
+                                + "  'value.format' = '%s',\n"
+                                + "  'value.fields-include' = 'ALL',\n"
+                                + "  'scan.bounded.mode' = 
'specific-offsets',\n"
+                                + "  'scan.bounded.specific-offsets' = 
'partition:0,offset:2'"
+                                + ")",
+                        topic, bootstraps, format, format);
+        tEnv.executeSql(createTableSql);
+
+        // insert multiple values to have more records past offset=2
+        final String insertValuesSql =
+                "INSERT INTO upsert_kafka\n"
+                        + "VALUES\n"
+                        + " (1, 100, 'payload 1'),\n"
+                        + " (2, 101, 'payload 2'),\n"
+                        + " (3, 102, 'payload 3'),\n"
+                        + " (1, 100, 'payload')";
+        tEnv.executeSql(insertValuesSql).await();
+
+        // results should only have records up to offset=2
+        final List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from 
upsert_kafka"));
+        final List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 1L, 100L, "payload 1"),
+                        changelogRow("+I", 2L, 101L, "payload 2"));

Review Comment:
   Ah, yes you're right. Another one of those mind-twists I'm still trying to 
wrap my head around 😅
   So the case I mentioned would never happen then - if the end offset is right 
after a `+I` that happens to be an update, then the consumer side would 
normalize it as a `-U` and `+U`. I'll update the test to cover this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to