tzulitai commented on code in PR #22:
URL:
https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159168995
##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java:
##########
@@ -388,6 +391,201 @@ public void testKafkaSourceSinkWithKeyAndFullValue()
throws Exception {
deleteTestTopic(topic);
}
+ @Test
+ public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws
Exception {
+ final String topic = "bounded_upsert_" + format + "_" +
UUID.randomUUID();
+ createTestTopic(topic, 1, 1);
+
+ // ---------- Produce an event time stream into Kafka
-------------------
+ final String bootstraps = getBootstrapServers();
+
+ // table with upsert-kafka connector, bounded mode up to offset=2
+ final String createTableSql =
+ String.format(
+ "CREATE TABLE upsert_kafka (\n"
+ + " `user_id` BIGINT,\n"
+ + " `event_id` BIGINT,\n"
+ + " `payload` STRING,\n"
+ + " PRIMARY KEY (event_id, user_id) NOT
ENFORCED"
+ + ") WITH (\n"
+ + " 'connector' = 'upsert-kafka',\n"
+ + " 'topic' = '%s',\n"
+ + " 'properties.bootstrap.servers' = '%s',\n"
+ + " 'key.format' = '%s',\n"
+ + " 'value.format' = '%s',\n"
+ + " 'value.fields-include' = 'ALL',\n"
+ + " 'scan.bounded.mode' =
'specific-offsets',\n"
+ + " 'scan.bounded.specific-offsets' =
'partition:0,offset:2'"
+ + ")",
+ topic, bootstraps, format, format);
+ tEnv.executeSql(createTableSql);
+
+ // insert multiple values to have more records past offset=2
+ final String insertValuesSql =
+ "INSERT INTO upsert_kafka\n"
+ + "VALUES\n"
+ + " (1, 100, 'payload 1'),\n"
+ + " (2, 101, 'payload 2'),\n"
+ + " (3, 102, 'payload 3'),\n"
+ + " (1, 100, 'payload')";
+ tEnv.executeSql(insertValuesSql).await();
+
+ // results should only have records up to offset=2
+ final List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from
upsert_kafka"));
+ final List<Row> expected =
+ Arrays.asList(
+ changelogRow("+I", 1L, 100L, "payload 1"),
+ changelogRow("+I", 2L, 101L, "payload 2"));
Review Comment:
https://github.com/apache/flink-connector-kafka/pull/22/commits/61e919ecca80bd094bcc431c6f298faa52c6f5c0
this new commit answers my confusion :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]