mas-chen commented on code in PR #20370:
URL: https://github.com/apache/flink/pull/20370#discussion_r994179681


##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java:
##########
@@ -125,15 +125,35 @@ static OffsetsInitializer 
committedOffsets(OffsetResetStrategy offsetResetStrate
     /**
      * Get an {@link OffsetsInitializer} which initializes the offsets in each 
partition so that the
      * initialized offset is the offset of the first record whose record 
timestamp is greater than
-     * or equals the give timestamp (milliseconds).
+     * or equals the give timestamp (milliseconds). If the timestamp does not 
correspond to any
+     * offset, the initializer will default to the latest offset.
      *
      * @param timestamp the timestamp (milliseconds) to start the consumption.
      * @return an {@link OffsetsInitializer} which initializes the offsets 
based on the given
      *     timestamp.
      * @see KafkaAdminClient#listOffsets(Map)
      */
     static OffsetsInitializer timestamp(long timestamp) {
-        return new TimestampOffsetsInitializer(timestamp);
+        return new TimestampOffsetsInitializer(timestamp, 
OffsetResetStrategy.LATEST);
+    }
+
+    /**
+     * Get an {@link OffsetsInitializer} which initializes the offsets in each 
partition so that the
+     * initialized offset is the offset of the first record whose record 
timestamp is greater than
+     * or equals the give timestamp (milliseconds). If the timestamp does not 
correspond to any
+     * offset, the initializer will default to the {@link OffsetResetStrategy} 
to find the offsets.
+     * For example, if {@link OffsetResetStrategy#NONE} is specified, then 
this implementation will
+     * throw an exception.
+     *
+     * @param timestamp the timestamp (milliseconds) to start the consumption.
+     * @param offsetResetStrategy the offset reset strategy to use when the 
timestamp does not
+     *     correspond an existing offset.
+     * @return an {@link OffsetsInitializer} which initializes the offsets 
based on the given
+     *     timestamp.
+     * @see KafkaAdminClient#listOffsets(Map)
+     */
+    static OffsetsInitializer timestamp(long timestamp, OffsetResetStrategy 
offsetResetStrategy) {

Review Comment:
   Thanks for the review, I agree with your approach. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to