[ 
https://issues.apache.org/jira/browse/HUDI-1910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17362564#comment-17362564
 ] 

Vinay commented on HUDI-1910:
-----------------------------

[~nishith29]  Instead of Updating the HoodieWriteCommitCallbackMessage and 
asking user to enable callback config to commit offset to Kafka, I have another 
way in mind. Should we just take the flag as config in delta streamer as 
--commit-offset-to-kafka ? 

We already get the checkpointStr which contains the end offset of each 
partition here - 
[https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L265]

 

If the commit is successful and commit-offset-to-kafka is true  - 
[https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L474]
 , we can commit the offset back to Kafka as well

 
{code:java}
private void commitOffsetToKafka(String checkpointStr) {
  // checkpointStr => hoodie_test,0:300000,1:350000
  // offsetMap => {hoodie_test-0=300000, hoodie_test-1=350000}
  Map<TopicPartition, Long> offsetMap = 
KafkaOffsetGen.CheckpointUtils.strToOffsets(checkpointStr);
  Map<String, Object> kafkaParams = new HashMap<>();
  props.keySet().stream().filter(prop -> {
    return !prop.toString().startsWith("hoodie.");
  }).forEach(prop -> {
    kafkaParams.put(prop.toString(), props.get(prop.toString()));
  });
  Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new 
HashMap<>(offsetMap.size());
  offsetMap.forEach((key, value) -> offsetAndMetadataMap.put(key, new 
OffsetAndMetadata(value)));
  try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
   consumer.commitAsync(offsetAndMetadataMap, new OffsetCommitCallback() {
     @Override
     public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, 
Exception exception) {
       LOG.info("Offsets committed to Kafka successfully "+ offsets.toString());
     }
   });
  }
{code}
What do you think of this approach ?

 

> Supporting Kafka based checkpointing for HoodieDeltaStreamer
> ------------------------------------------------------------
>
>                 Key: HUDI-1910
>                 URL: https://issues.apache.org/jira/browse/HUDI-1910
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: DeltaStreamer
>            Reporter: Nishith Agarwal
>            Assignee: Vinay
>            Priority: Major
>              Labels: sev:normal, triaged
>
> HoodieDeltaStreamer currently supports commit metadata based checkpoint. Some 
> users have requested support for Kafka based checkpoints for freshness 
> auditing purposes. This ticket tracks any implementation for that. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to