[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365129#comment-16365129 ]
ASF GitHub Bot commented on FLINK-6352: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5282#discussion_r168377228 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -441,28 +481,57 @@ public void open(Configuration configuration) throws Exception { getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets); } else { // use the partition discoverer to fetch the initial seed partitions, - // and set their initial offsets depending on the startup mode - for (KafkaTopicPartition seedPartition : allPartitions) { - if (startupMode != StartupMode.SPECIFIC_OFFSETS) { - subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel()); - } else { + // and set their initial offsets depending on the startup mode. + // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now; + // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined + // when the partition is actually read. + switch (startupMode) { + case SPECIFIC_OFFSETS: if (specificStartupOffsets == null) { throw new IllegalArgumentException( "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS + - ", but no specific offsets were specified"); + ", but no specific offsets were specified."); } - Long specificOffset = specificStartupOffsets.get(seedPartition); - if (specificOffset != null) { - // since the specified offsets represent the next record to read, we subtract - // it by one so that the initial state of the consumer will be correct - subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1); - } else { - // default to group offset behaviour if the user-provided specific offsets - // do not contain a value for this partition - subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + for (KafkaTopicPartition seedPartition : allPartitions) { + Long specificOffset = specificStartupOffsets.get(seedPartition); + if (specificOffset != null) { + // since the specified offsets represent the next record to read, we subtract + // it by one so that the initial state of the consumer will be correct + subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1); + } else { + // default to group offset behaviour if the user-provided specific offsets + // do not contain a value for this partition + subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + } + } + + break; + case TIMESTAMP: + if (startupOffsetsTimestamp == null) { + throw new IllegalArgumentException( --- End diff -- That makes sense, will change (including usage in existing code) > FlinkKafkaConsumer should support to use timestamp to set up start offset > ------------------------------------------------------------------------- > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Reporter: Fang Yong > Assignee: Fang Yong > Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "yyyy-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)