Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5282#discussion_r168377156
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
---
@@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase(
*/
public FlinkKafkaConsumerBase<T> setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
+ this.startupOffsetsTimestamp = null;
+ this.specificStartupOffsets = null;
+ return this;
+ }
+
+ /**
+ * Specifies the consumer to start reading partitions from a specified
timestamp.
+ * The specified timestamp must be before the current timestamp.
+ * This lets the consumer ignore any committed group offsets in
Zookeeper / Kafka brokers.
+ *
+ * <p>The consumer will look up the earliest offset whose timestamp is
greater than or equal
+ * to the specific timestamp from Kafka. If there's no such offset, the
consumer will use the
+ * latest offset to read data from kafka.
+ *
+ * <p>This method does not effect where partitions are read from when
the consumer is restored
--- End diff --
Will fix.
---