tzulitai commented on code in PR #22:
URL: 
https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159113822


##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java:
##########
@@ -388,6 +391,201 @@ public void testKafkaSourceSinkWithKeyAndFullValue() 
throws Exception {
         deleteTestTopic(topic);
     }
 
+    @Test
+    public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws 
Exception {
+        final String topic = "bounded_upsert_" + format + "_" + 
UUID.randomUUID();
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka 
-------------------
+        final String bootstraps = getBootstrapServers();
+
+        // table with upsert-kafka connector, bounded mode up to offset=2
+        final String createTableSql =
+                String.format(
+                        "CREATE TABLE upsert_kafka (\n"
+                                + "  `user_id` BIGINT,\n"
+                                + "  `event_id` BIGINT,\n"
+                                + "  `payload` STRING,\n"
+                                + "  PRIMARY KEY (event_id, user_id) NOT 
ENFORCED"
+                                + ") WITH (\n"
+                                + "  'connector' = 'upsert-kafka',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'key.format' = '%s',\n"
+                                + "  'value.format' = '%s',\n"
+                                + "  'value.fields-include' = 'ALL',\n"
+                                + "  'scan.bounded.mode' = 
'specific-offsets',\n"
+                                + "  'scan.bounded.specific-offsets' = 
'partition:0,offset:2'"
+                                + ")",
+                        topic, bootstraps, format, format);
+        tEnv.executeSql(createTableSql);
+
+        // insert multiple values to have more records past offset=2
+        final String insertValuesSql =
+                "INSERT INTO upsert_kafka\n"
+                        + "VALUES\n"
+                        + " (1, 100, 'payload 1'),\n"
+                        + " (2, 101, 'payload 2'),\n"
+                        + " (3, 102, 'payload 3'),\n"
+                        + " (1, 100, 'payload')";
+        tEnv.executeSql(insertValuesSql).await();
+
+        // results should only have records up to offset=2
+        final List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from 
upsert_kafka"));
+        final List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 1L, 100L, "payload 1"),
+                        changelogRow("+I", 2L, 101L, "payload 2"));

Review Comment:
   Note: with boundedness set using specific offsets, would there be semantical 
issues if the end offset happens to be between a `-U` and `+U`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to