This is an automated email from the ASF dual-hosted git repository.

wombatu-kun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b5879f95ad14 perf(kafka-connect): reuse AvroConvertor across records 
in the connect writer (#19015)
b5879f95ad14 is described below

commit b5879f95ad1447c41bd75ee284ba509268eaf1f1
Author: Vova Kolmakov <[email protected]>
AuthorDate: Tue Jun 16 17:12:36 2026 +0700

    perf(kafka-connect): reuse AvroConvertor across records in the connect 
writer (#19015)
---
 .../apache/hudi/connect/writers/AbstractConnectWriter.java  | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
index 70fefe431603..149ace0403cd 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
@@ -53,6 +53,10 @@ public abstract class AbstractConnectWriter implements 
ConnectWriter<WriteStatus
   private final KeyGenerator keyGenerator;
   private final SchemaProvider schemaProvider;
   protected final KafkaConnectConfigs connectConfigs;
+  private final String kafkaValueConverter;
+  // Reused across all records of this writer (one writer is created per 
commit, single-threaded),
+  // instead of re-parsing the schema and rebuilding the converter on every 
record.
+  private AvroConvertor convertor;
 
   public AbstractConnectWriter(KafkaConnectConfigs connectConfigs,
                                KeyGenerator keyGenerator,
@@ -61,23 +65,26 @@ public abstract class AbstractConnectWriter implements 
ConnectWriter<WriteStatus
     this.keyGenerator = keyGenerator;
     this.schemaProvider = schemaProvider;
     this.instantTime = instantTime;
+    this.kafkaValueConverter = connectConfigs.getKafkaValueConverter();
   }
 
   @Override
   public void writeRecord(SinkRecord record) throws IOException {
-    AvroConvertor convertor = new 
AvroConvertor(schemaProvider.getSourceHoodieSchema());
     Option<GenericRecord> avroRecord;
-    switch (connectConfigs.getKafkaValueConverter()) {
+    switch (kafkaValueConverter) {
       case KAFKA_AVRO_CONVERTER:
         avroRecord = Option.of((GenericRecord) record.value());
         break;
       case KAFKA_STRING_CONVERTER:
+        if (convertor == null) {
+          convertor = new 
AvroConvertor(schemaProvider.getSourceHoodieSchema());
+        }
         avroRecord = Option.of(convertor.fromJson((String) record.value()));
         break;
       case KAFKA_JSON_CONVERTER:
         throw new UnsupportedEncodingException("Currently JSON objects are not 
supported");
       default:
-        throw new IOException("Unsupported Kafka Format type (" + 
connectConfigs.getKafkaValueConverter() + ")");
+        throw new IOException("Unsupported Kafka Format type (" + 
kafkaValueConverter + ")");
     }
 
     // Tag records with a file ID based on kafka partition and hudi partition.

Reply via email to