This is an automated email from the ASF dual-hosted git repository.
wombatu-kun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8389464e1613 perf(kafka-connect): memoize file id per partition path
in the connect writer (#19016)
8389464e1613 is described below
commit 8389464e161367c917612859a330164e2f7fd406
Author: Vova Kolmakov <[email protected]>
AuthorDate: Tue Jun 16 20:34:19 2026 +0700
perf(kafka-connect): memoize file id per partition path in the connect
writer (#19016)
---
.../apache/hudi/connect/writers/AbstractConnectWriter.java | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
index 149ace0403cd..628a51d16852 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
@@ -35,7 +35,9 @@ import org.apache.kafka.connect.sink.SinkRecord;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Base Hudi Writer that manages reading the raw Kafka records and
@@ -57,6 +59,9 @@ public abstract class AbstractConnectWriter implements
ConnectWriter<WriteStatus
// Reused across all records of this writer (one writer is created per
commit, single-threaded),
// instead of re-parsing the schema and rebuilding the converter on every
record.
private AvroConvertor convertor;
+ // Cache fileId per partition path; kafkaPartition is invariant for a writer
(the participant is
+ // bound to a single TopicPartition), so the digest input is stable for a
given partition path.
+ private final Map<String, String> fileIdByPartitionPath = new HashMap<>();
public AbstractConnectWriter(KafkaConnectConfigs connectConfigs,
KeyGenerator keyGenerator,
@@ -89,7 +94,12 @@ public abstract class AbstractConnectWriter implements
ConnectWriter<WriteStatus
// Tag records with a file ID based on kafka partition and hudi partition.
HoodieRecord<?> hoodieRecord = new
HoodieAvroRecord<>(keyGenerator.getKey(avroRecord.get()), new
HoodieAvroPayload(avroRecord));
- String fileId = KafkaConnectUtils.hashDigest(String.format("%s-%s",
record.kafkaPartition(), hoodieRecord.getPartitionPath()));
+ String partitionPath = hoodieRecord.getPartitionPath();
+ String fileId = fileIdByPartitionPath.get(partitionPath);
+ if (fileId == null) {
+ fileId = KafkaConnectUtils.hashDigest(record.kafkaPartition() + "-" +
partitionPath);
+ fileIdByPartitionPath.put(partitionPath, fileId);
+ }
hoodieRecord.unseal();
hoodieRecord.setCurrentLocation(new HoodieRecordLocation(instantTime,
fileId));
hoodieRecord.setNewLocation(new HoodieRecordLocation(instantTime, fileId));