[
https://issues.apache.org/jira/browse/HUDI-1910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17362564#comment-17362564
]
Vinay commented on HUDI-1910:
-----------------------------
[~nishith29] Instead of Updating the HoodieWriteCommitCallbackMessage and
asking user to enable callback config to commit offset to Kafka, I have another
way in mind. Should we just take the flag as config in delta streamer as
--commit-offset-to-kafka ?
We already get the checkpointStr which contains the end offset of each
partition here -
[https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L265]
If the commit is successful and commit-offset-to-kafka is true -
[https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L474]
, we can commit the offset back to Kafka as well
{code:java}
private void commitOffsetToKafka(String checkpointStr) {
// checkpointStr => hoodie_test,0:300000,1:350000
// offsetMap => {hoodie_test-0=300000, hoodie_test-1=350000}
Map<TopicPartition, Long> offsetMap =
KafkaOffsetGen.CheckpointUtils.strToOffsets(checkpointStr);
Map<String, Object> kafkaParams = new HashMap<>();
props.keySet().stream().filter(prop -> {
return !prop.toString().startsWith("hoodie.");
}).forEach(prop -> {
kafkaParams.put(prop.toString(), props.get(prop.toString()));
});
Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new
HashMap<>(offsetMap.size());
offsetMap.forEach((key, value) -> offsetAndMetadataMap.put(key, new
OffsetAndMetadata(value)));
try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
consumer.commitAsync(offsetAndMetadataMap, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
LOG.info("Offsets committed to Kafka successfully "+ offsets.toString());
}
});
}
{code}
What do you think of this approach ?
> Supporting Kafka based checkpointing for HoodieDeltaStreamer
> ------------------------------------------------------------
>
> Key: HUDI-1910
> URL: https://issues.apache.org/jira/browse/HUDI-1910
> Project: Apache Hudi
> Issue Type: Improvement
> Components: DeltaStreamer
> Reporter: Nishith Agarwal
> Assignee: Vinay
> Priority: Major
> Labels: sev:normal, triaged
>
> HoodieDeltaStreamer currently supports commit metadata based checkpoint. Some
> users have requested support for Kafka based checkpoints for freshness
> auditing purposes. This ticket tracks any implementation for that.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)