Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5282#discussion_r168377173
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
---
@@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase(
*/
public FlinkKafkaConsumerBase<T> setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
+ this.startupOffsetsTimestamp = null;
+ this.specificStartupOffsets = null;
+ return this;
+ }
+
+ /**
+ * Specifies the consumer to start reading partitions from a specified
timestamp.
+ * The specified timestamp must be before the current timestamp.
+ * This lets the consumer ignore any committed group offsets in
Zookeeper / Kafka brokers.
+ *
+ * <p>The consumer will look up the earliest offset whose timestamp is
greater than or equal
+ * to the specific timestamp from Kafka. If there's no such offset, the
consumer will use the
+ * latest offset to read data from kafka.
+ *
+ * <p>This method does not effect where partitions are read from when
the consumer is restored
+ * from a checkpoint or savepoint. When the consumer is restored from a
checkpoint or
+ * savepoint, only the offsets in the restored state will be used.
+ *
+ * @return The consumer object, to allow function chaining.
+ */
+ // NOTE -
+ // This method is implemented in the base class because this is where
the startup logging and verifications live.
+ // However, it is not publicly exposed since only newer Kafka versions
support the functionality.
+ // Version-specific subclasses which can expose the functionality
should override and allow public access.
+ protected FlinkKafkaConsumerBase<T> setStartFromTimestamp(long
startupOffsetsTimestamp) {
+ checkNotNull(startupOffsetsTimestamp,
"startupOffsetsTimestamp");
--- End diff --
I'll change to a more meaningful message.
---