wangxianghu commented on a change in pull request #3175:
URL: https://github.com/apache/hudi/pull/3175#discussion_r660316249
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
##########
@@ -327,4 +330,19 @@ public void commitOffsetToKafka(String checkpointStr) {
LOG.warn("Committing offsets to Kafka failed, this does not impact
processing of records", e);
}
}
+
+ private Map<TopicPartition, Long> getGroupOffsets(KafkaConsumer consumer,
Set<TopicPartition> topicPartitions) {
+ Map<TopicPartition, Long> fromOffsets = new HashMap<>();
+ for (TopicPartition topicPartition : topicPartitions) {
+ OffsetAndMetadata committedOffsetAndMetadata =
consumer.committed(topicPartition);
+ if (committedOffsetAndMetadata != null) {
+ fromOffsets.put(topicPartition, committedOffsetAndMetadata.offset());
+ } else {
+ LOG.warn("There are no commits associated with this consumer group,
starting to consume form latest offset");
+ fromOffsets = consumer.endOffsets(topicPartitions);
+ break;
Review comment:
> @wangxianghu
>
> Yes, if you update the value to earliest/latest in this test case the
consumer will start reading either from 0th offset (earliest) or 500th offset
(latest), instead it should start from 250th offset (as this is the last
committed offset)
>
>
https://github.com/apache/hudi/blob/80a1f1cceca52f266057b948f48cf20f5d273184/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java#L139
>
> I actually don't like the NONE option here and wanted to use GROUP but the
consumer will throw an exception in that case
IIUC, if you have committed the offset to kafka, when you start the consumer
with the same group.id as before and set `auto.offset.set` to `latest`, it will
continue to consume offset committed last time(which means 250th as you
mentioned), right ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]