haohui commented on a change in pull request #17994:
URL: https://github.com/apache/flink/pull/17994#discussion_r772266822
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -146,6 +146,7 @@
protected final boolean upsertMode;
protected final String tableIdentifier;
+ protected final Map<KafkaTopicPartition, Long> boundedEndOffsets;
Review comment:
Add comments and move it close to specificStartupOffsets
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -158,6 +158,13 @@
.withDescription(
"Optional offsets used in case of
\"specific-offsets\" startup mode");
+ public static final ConfigOption<String> SCAN_END_SPECIFIC_OFFSETS =
Review comment:
A better name could be `scan.bounded.specific-offsets` to be consistent
with the table API
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -161,6 +162,7 @@ public KafkaDynamicSource(
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis,
boolean upsertMode,
+ @Nullable Map<KafkaTopicPartition, Long> boundedEndOffsets,
Review comment:
why nullable?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
##########
@@ -963,6 +980,7 @@ private static KafkaDynamicSink createExpectedSink(
tableOptions.put("properties.bootstrap.servers", "dummy");
tableOptions.put("scan.startup.mode", "specific-offsets");
tableOptions.put("scan.startup.specific-offsets", PROPS_SCAN_OFFSETS);
+ tableOptions.put("scan.end.specific-offsets",
PROPS_SCAN_BOUNDED_OFFSETS);
Review comment:
I don't think it is necessary to change the default settings, but it
would be helpful to add a specific unit test (via mocks) to show that the
bounded offsets are correctly set up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]