mas-chen commented on code in PR #20370:
URL: https://github.com/apache/flink/pull/20370#discussion_r994179681
##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java:
##########
@@ -125,15 +125,35 @@ static OffsetsInitializer
committedOffsets(OffsetResetStrategy offsetResetStrate
/**
* Get an {@link OffsetsInitializer} which initializes the offsets in each
partition so that the
* initialized offset is the offset of the first record whose record
timestamp is greater than
- * or equals the give timestamp (milliseconds).
+ * or equals the give timestamp (milliseconds). If the timestamp does not
correspond to any
+ * offset, the initializer will default to the latest offset.
*
* @param timestamp the timestamp (milliseconds) to start the consumption.
* @return an {@link OffsetsInitializer} which initializes the offsets
based on the given
* timestamp.
* @see KafkaAdminClient#listOffsets(Map)
*/
static OffsetsInitializer timestamp(long timestamp) {
- return new TimestampOffsetsInitializer(timestamp);
+ return new TimestampOffsetsInitializer(timestamp,
OffsetResetStrategy.LATEST);
+ }
+
+ /**
+ * Get an {@link OffsetsInitializer} which initializes the offsets in each
partition so that the
+ * initialized offset is the offset of the first record whose record
timestamp is greater than
+ * or equals the give timestamp (milliseconds). If the timestamp does not
correspond to any
+ * offset, the initializer will default to the {@link OffsetResetStrategy}
to find the offsets.
+ * For example, if {@link OffsetResetStrategy#NONE} is specified, then
this implementation will
+ * throw an exception.
+ *
+ * @param timestamp the timestamp (milliseconds) to start the consumption.
+ * @param offsetResetStrategy the offset reset strategy to use when the
timestamp does not
+ * correspond an existing offset.
+ * @return an {@link OffsetsInitializer} which initializes the offsets
based on the given
+ * timestamp.
+ * @see KafkaAdminClient#listOffsets(Map)
+ */
+ static OffsetsInitializer timestamp(long timestamp, OffsetResetStrategy
offsetResetStrategy) {
Review Comment:
Thanks for the review, I agree with your approach.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]