This is an automated email from the ASF dual-hosted git repository.

wombatu-kun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8389464e1613 perf(kafka-connect): memoize file id per partition path 
in the connect writer (#19016)
8389464e1613 is described below

commit 8389464e161367c917612859a330164e2f7fd406
Author: Vova Kolmakov <[email protected]>
AuthorDate: Tue Jun 16 20:34:19 2026 +0700

    perf(kafka-connect): memoize file id per partition path in the connect 
writer (#19016)
---
 .../apache/hudi/connect/writers/AbstractConnectWriter.java   | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
index 149ace0403cd..628a51d16852 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
@@ -35,7 +35,9 @@ import org.apache.kafka.connect.sink.SinkRecord;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Base Hudi Writer that manages reading the raw Kafka records and
@@ -57,6 +59,9 @@ public abstract class AbstractConnectWriter implements 
ConnectWriter<WriteStatus
   // Reused across all records of this writer (one writer is created per 
commit, single-threaded),
   // instead of re-parsing the schema and rebuilding the converter on every 
record.
   private AvroConvertor convertor;
+  // Cache fileId per partition path; kafkaPartition is invariant for a writer 
(the participant is
+  // bound to a single TopicPartition), so the digest input is stable for a 
given partition path.
+  private final Map<String, String> fileIdByPartitionPath = new HashMap<>();
 
   public AbstractConnectWriter(KafkaConnectConfigs connectConfigs,
                                KeyGenerator keyGenerator,
@@ -89,7 +94,12 @@ public abstract class AbstractConnectWriter implements 
ConnectWriter<WriteStatus
 
     // Tag records with a file ID based on kafka partition and hudi partition.
     HoodieRecord<?> hoodieRecord = new 
HoodieAvroRecord<>(keyGenerator.getKey(avroRecord.get()), new 
HoodieAvroPayload(avroRecord));
-    String fileId = KafkaConnectUtils.hashDigest(String.format("%s-%s", 
record.kafkaPartition(), hoodieRecord.getPartitionPath()));
+    String partitionPath = hoodieRecord.getPartitionPath();
+    String fileId = fileIdByPartitionPath.get(partitionPath);
+    if (fileId == null) {
+      fileId = KafkaConnectUtils.hashDigest(record.kafkaPartition() + "-" + 
partitionPath);
+      fileIdByPartitionPath.put(partitionPath, fileId);
+    }
     hoodieRecord.unseal();
     hoodieRecord.setCurrentLocation(new HoodieRecordLocation(instantTime, 
fileId));
     hoodieRecord.setNewLocation(new HoodieRecordLocation(instantTime, fileId));

Reply via email to