MonsterChenzhuo commented on a change in pull request #17994:
URL: https://github.com/apache/flink/pull/17994#discussion_r782766011
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -158,6 +158,13 @@
.withDescription(
"Optional offsets used in case of
\"specific-offsets\" startup mode");
+ public static final ConfigOption<String> SCAN_BOUNDED_SPECIFIC_OFFSETS =
+ ConfigOptions.key("scan.bounded.specific-offsets")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "When all partitions have reached their stop
offsets, the source will exit");
+
public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =
Review comment:
hi @ruanhang1993 You can see if my idea is correct:
First, there are two options for the start read position, an offset and a
timestamp.
When the start read position is an offset, it can be either specified or
currently consumed, and your end position corresponds to
`scan.bounded.stop-offsets`.
When your start bit is a timestamp, your end position should also go to the
corresponding timestamp.
So the logic can be changed to:
with a bounded offset set, first go to determine if `scan.startup.mode` is
set, if not go to the logic of `scan.bounded.stop-offsets`, if it is set see if
it is a timestamp, if not go to the logic of `scan.bounded. stop-offsets`
logic, if it is a timestamp then `scan.bounded.stop-timestamp` logic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]