Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5282#discussion_r168377228
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
---
@@ -441,28 +481,57 @@ public void open(Configuration configuration) throws
Exception {
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
} else {
// use the partition discoverer to fetch the initial
seed partitions,
- // and set their initial offsets depending on the
startup mode
- for (KafkaTopicPartition seedPartition : allPartitions)
{
- if (startupMode !=
StartupMode.SPECIFIC_OFFSETS) {
-
subscribedPartitionsToStartOffsets.put(seedPartition,
startupMode.getStateSentinel());
- } else {
+ // and set their initial offsets depending on the
startup mode.
+ // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the
specific offsets now;
+ // for other modes (EARLIEST, LATEST, and
GROUP_OFFSETS), the offset is lazily determined
+ // when the partition is actually read.
+ switch (startupMode) {
+ case SPECIFIC_OFFSETS:
if (specificStartupOffsets == null) {
throw new
IllegalArgumentException(
"Startup mode for the
consumer set to " + StartupMode.SPECIFIC_OFFSETS +
- ", but no
specific offsets were specified");
+ ", but no
specific offsets were specified.");
}
- Long specificOffset =
specificStartupOffsets.get(seedPartition);
- if (specificOffset != null) {
- // since the specified offsets
represent the next record to read, we subtract
- // it by one so that the
initial state of the consumer will be correct
-
subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
- } else {
- // default to group offset
behaviour if the user-provided specific offsets
- // do not contain a value for
this partition
-
subscribedPartitionsToStartOffsets.put(seedPartition,
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+ for (KafkaTopicPartition seedPartition
: allPartitions) {
+ Long specificOffset =
specificStartupOffsets.get(seedPartition);
+ if (specificOffset != null) {
+ // since the specified
offsets represent the next record to read, we subtract
+ // it by one so that
the initial state of the consumer will be correct
+
subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
+ } else {
+ // default to group
offset behaviour if the user-provided specific offsets
+ // do not contain a
value for this partition
+
subscribedPartitionsToStartOffsets.put(seedPartition,
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+ }
+ }
+
+ break;
+ case TIMESTAMP:
+ if (startupOffsetsTimestamp == null) {
+ throw new
IllegalArgumentException(
--- End diff --
That makes sense, will change (including usage in existing code)
---