wangxianghu commented on a change in pull request #3175:
URL: https://github.com/apache/hudi/pull/3175#discussion_r660328873
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
##########
@@ -327,4 +330,19 @@ public void commitOffsetToKafka(String checkpointStr) {
LOG.warn("Committing offsets to Kafka failed, this does not impact
processing of records", e);
}
}
+
+ private Map<TopicPartition, Long> getGroupOffsets(KafkaConsumer consumer,
Set<TopicPartition> topicPartitions) {
+ Map<TopicPartition, Long> fromOffsets = new HashMap<>();
+ for (TopicPartition topicPartition : topicPartitions) {
+ OffsetAndMetadata committedOffsetAndMetadata =
consumer.committed(topicPartition);
+ if (committedOffsetAndMetadata != null) {
+ fromOffsets.put(topicPartition, committedOffsetAndMetadata.offset());
+ } else {
+ LOG.warn("There are no commits associated with this consumer group,
starting to consume form latest offset");
+ fromOffsets = consumer.endOffsets(topicPartitions);
+ break;
Review comment:
> @wangxianghu yes, but that's not happening, I think it is because of
the way we are explicitly setting the `fromOffsets` here -
https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java#L221
which moves the consumer to end, that is it starts reading from 500th offset
for partition0 and partition1 in test case
>
> Copying doc for `KafkaConsumer#endOffsets`
>
> ```
> Get the end offsets for the given partitions. In the default
read_uncommitted isolation level, the end offset is the high watermark (that
is, the offset of the last successfully replicated message plus one)
> ```
you are right.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]