tzulitai commented on code in PR #22:
URL: 
https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159168995


##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java:
##########
@@ -388,6 +391,201 @@ public void testKafkaSourceSinkWithKeyAndFullValue() 
throws Exception {
         deleteTestTopic(topic);
     }
 
+    @Test
+    public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws 
Exception {
+        final String topic = "bounded_upsert_" + format + "_" + 
UUID.randomUUID();
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka 
-------------------
+        final String bootstraps = getBootstrapServers();
+
+        // table with upsert-kafka connector, bounded mode up to offset=2
+        final String createTableSql =
+                String.format(
+                        "CREATE TABLE upsert_kafka (\n"
+                                + "  `user_id` BIGINT,\n"
+                                + "  `event_id` BIGINT,\n"
+                                + "  `payload` STRING,\n"
+                                + "  PRIMARY KEY (event_id, user_id) NOT 
ENFORCED"
+                                + ") WITH (\n"
+                                + "  'connector' = 'upsert-kafka',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'key.format' = '%s',\n"
+                                + "  'value.format' = '%s',\n"
+                                + "  'value.fields-include' = 'ALL',\n"
+                                + "  'scan.bounded.mode' = 
'specific-offsets',\n"
+                                + "  'scan.bounded.specific-offsets' = 
'partition:0,offset:2'"
+                                + ")",
+                        topic, bootstraps, format, format);
+        tEnv.executeSql(createTableSql);
+
+        // insert multiple values to have more records past offset=2
+        final String insertValuesSql =
+                "INSERT INTO upsert_kafka\n"
+                        + "VALUES\n"
+                        + " (1, 100, 'payload 1'),\n"
+                        + " (2, 101, 'payload 2'),\n"
+                        + " (3, 102, 'payload 3'),\n"
+                        + " (1, 100, 'payload')";
+        tEnv.executeSql(insertValuesSql).await();
+
+        // results should only have records up to offset=2
+        final List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from 
upsert_kafka"));
+        final List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 1L, 100L, "payload 1"),
+                        changelogRow("+I", 2L, 101L, "payload 2"));

Review Comment:
   61e919ecca80bd094bcc431c6f298faa52c6f5c0
   
   this new commit answers my confusion :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to