Gerrrr commented on code in PR #22:
URL:
https://github.com/apache/flink-connector-kafka/pull/22#discussion_r1159155943
##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java:
##########
@@ -388,6 +391,201 @@ public void testKafkaSourceSinkWithKeyAndFullValue()
throws Exception {
deleteTestTopic(topic);
}
+ @Test
+ public void testUpsertKafkaSourceSinkWithBoundedSpecificOffsets() throws
Exception {
+ final String topic = "bounded_upsert_" + format + "_" +
UUID.randomUUID();
+ createTestTopic(topic, 1, 1);
+
+ // ---------- Produce an event time stream into Kafka
-------------------
+ final String bootstraps = getBootstrapServers();
+
+ // table with upsert-kafka connector, bounded mode up to offset=2
+ final String createTableSql =
+ String.format(
+ "CREATE TABLE upsert_kafka (\n"
+ + " `user_id` BIGINT,\n"
+ + " `event_id` BIGINT,\n"
+ + " `payload` STRING,\n"
+ + " PRIMARY KEY (event_id, user_id) NOT
ENFORCED"
+ + ") WITH (\n"
+ + " 'connector' = 'upsert-kafka',\n"
+ + " 'topic' = '%s',\n"
+ + " 'properties.bootstrap.servers' = '%s',\n"
+ + " 'key.format' = '%s',\n"
+ + " 'value.format' = '%s',\n"
+ + " 'value.fields-include' = 'ALL',\n"
+ + " 'scan.bounded.mode' =
'specific-offsets',\n"
+ + " 'scan.bounded.specific-offsets' =
'partition:0,offset:2'"
+ + ")",
+ topic, bootstraps, format, format);
+ tEnv.executeSql(createTableSql);
+
+ // insert multiple values to have more records past offset=2
+ final String insertValuesSql =
+ "INSERT INTO upsert_kafka\n"
+ + "VALUES\n"
+ + " (1, 100, 'payload 1'),\n"
+ + " (2, 101, 'payload 2'),\n"
+ + " (3, 102, 'payload 3'),\n"
+ + " (1, 100, 'payload')";
+ tEnv.executeSql(insertValuesSql).await();
+
+ // results should only have records up to offset=2
+ final List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from
upsert_kafka"));
+ final List<Row> expected =
+ Arrays.asList(
+ changelogRow("+I", 1L, 100L, "payload 1"),
+ changelogRow("+I", 2L, 101L, "payload 2"));
Review Comment:
Doesn't upsert kafka alway insert `I` and `D` and derive `-U` and `+U` on
the consumer side via a normalization node?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]