This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new afbde168b1d8 refactor: Add Lombok annotations to hudi-kafka-connect 
(#17715)
afbde168b1d8 is described below

commit afbde168b1d809be20c1aca054a0f49e23f97a57
Author: voonhous <[email protected]>
AuthorDate: Fri Dec 26 11:01:27 2025 +0800

    refactor: Add Lombok annotations to hudi-kafka-connect (#17715)
---
 hudi-kafka-connect/pom.xml                         |  6 +++
 .../apache/hudi/connect/HoodieSinkConnector.java   | 16 +++---
 .../org/apache/hudi/connect/HoodieSinkTask.java    | 29 +++++------
 .../connect/KafkaConnectFileIdPrefixProvider.java  | 10 ++--
 .../connect/kafka/KafkaConnectControlAgent.java    | 22 ++++----
 .../hudi/connect/kafka/KafkaControlProducer.java   |  6 +--
 .../transaction/ConnectTransactionCoordinator.java | 58 ++++++++++------------
 .../transaction/ConnectTransactionParticipant.java | 37 ++++++--------
 .../hudi/connect/transaction/CoordinatorEvent.java | 25 ++--------
 .../hudi/connect/transaction/TransactionInfo.java  | 25 ++--------
 .../hudi/connect/utils/KafkaConnectUtils.java      | 14 +++---
 .../connect/writers/AbstractConnectWriter.java     |  5 +-
 .../connect/writers/BufferedConnectWriter.java     | 20 +++-----
 .../writers/KafkaConnectTransactionServices.java   | 19 +++----
 .../writers/KafkaConnectWriterProvider.java        |  6 +--
 .../connect/TestConnectTransactionCoordinator.java |  7 +--
 .../connect/TestConnectTransactionParticipant.java | 17 ++-----
 .../org/apache/hudi/helper/MockKafkaConnect.java   | 17 ++-----
 .../apache/hudi/helper/TestHudiWriterProvider.java | 15 ++----
 .../hudi/writers/TestAbstractConnectWriter.java    |  6 +--
 20 files changed, 136 insertions(+), 224 deletions(-)

diff --git a/hudi-kafka-connect/pom.xml b/hudi-kafka-connect/pom.xml
index ad655a6e0d5a..ab6045743071 100644
--- a/hudi-kafka-connect/pom.xml
+++ b/hudi-kafka-connect/pom.xml
@@ -129,6 +129,12 @@
             <artifactId>kryo-shaded</artifactId>
         </dependency>
 
+        <!-- Lombok -->
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-core</artifactId>
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkConnector.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkConnector.java
index 9850d1647078..046cb36a2682 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkConnector.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkConnector.java
@@ -18,11 +18,11 @@
 
 package org.apache.hudi.connect;
 
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -32,19 +32,15 @@ import java.util.Map;
 /**
  * HudiSinkConnector is a Kafka Connect Connector implementation
  * that ingest data from Kafka to Hudi.
+ *
  */
+@NoArgsConstructor // No-arg constructor. It is instantiated by Connect 
framework.
+@Slf4j
 public class HoodieSinkConnector extends SinkConnector {
 
   public static final String VERSION = "0.1.0";
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieSinkConnector.class);
   private Map<String, String> configProps;
 
-  /**
-   * No-arg constructor. It is instantiated by Connect framework.
-   */
-  public HoodieSinkConnector() {
-  }
-
   @Override
   public String version() {
     return VERSION;
@@ -72,7 +68,7 @@ public class HoodieSinkConnector extends SinkConnector {
 
   @Override
   public void stop() {
-    LOG.info(String.format("Shutting down Hudi Sink connector %s", 
configProps.get("name")));
+    log.info("Shutting down Hudi Sink connector {}", configProps.get("name"));
   }
 
   @Override
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java
index 3b274f8b3ba9..bcf126ca7ddb 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java
@@ -27,6 +27,7 @@ import org.apache.hudi.connect.writers.KafkaConnectConfigs;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
@@ -34,8 +35,6 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -47,10 +46,10 @@ import java.util.Map;
  * from the assigned partitions and commit the Kafka offsets.
  * Also, handles re-assignments of partitions.
  */
+@Slf4j
 public class HoodieSinkTask extends SinkTask {
 
   public static final String TASK_ID_CONFIG_NAME = "task.id";
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieSinkTask.class);
 
   private final Map<TopicPartition, TransactionCoordinator> 
transactionCoordinators;
   private final Map<TopicPartition, TransactionParticipant> 
transactionParticipants;
@@ -74,8 +73,7 @@ public class HoodieSinkTask extends SinkTask {
   public void start(Map<String, String> props) {
     connectorName = props.get("name");
     taskId = props.get(TASK_ID_CONFIG_NAME);
-    LOG.info(String.format("Starting Hudi Sink Task for %s connector %s with 
id %s with assignments %s",
-        props, connectorName, taskId, context.assignment()));
+    log.info("Starting Hudi Sink Task for {} connector {} with id {} with 
assignments {}", props, connectorName, taskId, context.assignment());
     try {
       connectConfigs = 
KafkaConnectConfigs.newBuilder().withProperties(props).build();
       controlKafkaClient = KafkaConnectControlAgent.createKafkaControlManager(
@@ -84,8 +82,8 @@ public class HoodieSinkTask extends SinkTask {
     } catch (ConfigException e) {
       throw new ConnectException("Couldn't start HdfsSinkConnector due to 
configuration error.", e);
     } catch (ConnectException e) {
-      LOG.error("Couldn't start HudiSinkConnector:", e);
-      LOG.info("Shutting down HudiSinkConnector.");
+      log.error("Couldn't start HudiSinkConnector:", e);
+      log.info("Shutting down HudiSinkConnector.");
       cleanup();
       // Always throw the original exception that prevent us from starting
       throw e;
@@ -147,13 +145,13 @@ public class HoodieSinkTask extends SinkTask {
 
   @Override
   public void open(Collection<TopicPartition> partitions) {
-    LOG.info("New partitions added " + partitions.toString());
+    log.info("New partitions added {}", partitions.toString());
     bootstrap(partitions);
   }
 
   @Override
   public void close(Collection<TopicPartition> partitions) {
-    LOG.info("Existing partitions deleted " + partitions.toString());
+    log.info("Existing partitions deleted {}", partitions.toString());
     // Close any writers we have. We may get assigned the same partitions and 
end up duplicating
     // some effort since we'll have to reprocess those messages. It may be 
possible to hold on to
     // the TopicPartitionWriter and continue to use the temp file, but this 
can get significantly
@@ -172,18 +170,17 @@ public class HoodieSinkTask extends SinkTask {
       TransactionParticipant worker = 
transactionParticipants.remove(partition);
       if (worker != null) {
         try {
-          LOG.debug("Closing data writer due to task start failure.");
+          log.debug("Closing data writer due to task start failure.");
           worker.stop();
         } catch (Throwable t) {
-          LOG.warn("Error closing and stopping data writer", t);
+          log.warn("Error closing and stopping data writer", t);
         }
       }
     }
   }
 
   private void bootstrap(Collection<TopicPartition> partitions) {
-    LOG.info(String.format("Bootstrap task for connector %s with id %s with 
assignments %s part %s",
-        connectorName, taskId, context.assignment(), partitions));
+    log.info("Bootstrap task for connector {} with id {} with assignments {} 
part {}", connectorName, taskId, context.assignment(), partitions);
     for (TopicPartition partition : partitions) {
       try {
         // If the partition is 0, instantiate the Leader
@@ -199,7 +196,7 @@ public class HoodieSinkTask extends SinkTask {
         transactionParticipants.put(partition, worker);
         worker.start();
       } catch (HoodieException exception) {
-        LOG.error(String.format("Fatal error initializing task %s for 
partition %s", taskId, partition.partition()), exception);
+        log.error("Fatal error initializing task {} for partition {}", taskId, 
partition.partition(), exception);
       }
     }
   }
@@ -209,10 +206,10 @@ public class HoodieSinkTask extends SinkTask {
       TransactionParticipant worker = transactionParticipants.get(partition);
       if (worker != null) {
         try {
-          LOG.debug("Closing data writer due to task start failure.");
+          log.debug("Closing data writer due to task start failure.");
           worker.stop();
         } catch (Throwable t) {
-          LOG.debug("Error closing and stopping data writer", t);
+          log.debug("Error closing and stopping data writer", t);
         }
       }
     }
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
index 59802decdb05..aa8165140215 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
@@ -23,20 +23,19 @@ import org.apache.hudi.connect.utils.KafkaConnectUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.FileIdPrefixProvider;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
+@Slf4j
 public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider {
 
   public static final String KAFKA_CONNECT_PARTITION_ID = 
"hudi.kafka.connect.partition";
-  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConnectFileIdPrefixProvider.class);
 
   private final String kafkaPartition;
 
   public KafkaConnectFileIdPrefixProvider(TypedProperties props) {
     super(props);
     if (!props.containsKey(KAFKA_CONNECT_PARTITION_ID)) {
-      LOG.error("Fatal error due to Kafka Connect Partition Id is not set");
+      log.error("Fatal error due to Kafka Connect Partition Id is not set");
       throw new HoodieException("Kafka Connect Partition Key " + 
KAFKA_CONNECT_PARTITION_ID + " not provided");
     }
     this.kafkaPartition = props.getProperty(KAFKA_CONNECT_PARTITION_ID);
@@ -48,8 +47,7 @@ public class KafkaConnectFileIdPrefixProvider extends 
FileIdPrefixProvider {
     // to generate a fixed sized hash.
     String rawFileIdPrefix = kafkaPartition + partitionPath;
     String hashedPrefix = KafkaConnectUtils.hashDigest(rawFileIdPrefix);
-    LOG.info("CreateFileId for Kafka Partition " + kafkaPartition + " : " + 
partitionPath + " = " + rawFileIdPrefix
-        + " === " + hashedPrefix);
+    log.info("CreateFileId for Kafka Partition {} : {} = {} === {}", 
kafkaPartition, partitionPath, rawFileIdPrefix, hashedPrefix);
     return hashedPrefix;
   }
 }
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaConnectControlAgent.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaConnectControlAgent.java
index 041fc03c5962..074a2a39938d 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaConnectControlAgent.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaConnectControlAgent.java
@@ -22,6 +22,7 @@ import org.apache.hudi.connect.ControlMessage;
 import org.apache.hudi.connect.transaction.TransactionCoordinator;
 import org.apache.hudi.connect.transaction.TransactionParticipant;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -29,8 +30,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.Collections;
@@ -50,9 +49,8 @@ import java.util.concurrent.TimeUnit;
  * Use a single instance per worker (single-threaded),
  * and register multiple tasks that can receive the control messages.
  */
+@Slf4j
 public class KafkaConnectControlAgent implements KafkaControlAgent {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConnectControlAgent.class);
   private static final Object LOCK = new Object();
   private static final long KAFKA_POLL_TIMEOUT_MS = 100;
   private static final int EXEC_SHUTDOWN_TIMEOUT_MS = 5000;
@@ -143,7 +141,7 @@ public class KafkaConnectControlAgent implements 
KafkaControlAgent {
         records = consumer.poll(Duration.ofMillis(KAFKA_POLL_TIMEOUT_MS));
         for (ConsumerRecord<String, byte[]> record : records) {
           try {
-            LOG.debug("Kafka consumerGroupId = {} topic = {}, partition = {}, 
offset = {}, customer = {}, country = {}",
+            log.debug("Kafka consumerGroupId = {} topic = {}, partition = {}, 
offset = {}, customer = {}, country = {}",
                 "", record.topic(), record.partition(), record.offset(), 
record.key(), record.value());
             ControlMessage message = ControlMessage.parseFrom(record.value());
             String senderTopic = message.getTopicName();
@@ -154,23 +152,23 @@ public class KafkaConnectControlAgent implements 
KafkaControlAgent {
                   partitionWorker.processControlEvent(message);
                 }
               } else {
-                LOG.warn("Failed to send message for unregistered participants 
for topic {}", senderTopic);
+                log.warn("Failed to send message for unregistered participants 
for topic {}", senderTopic);
               }
             } else if 
(message.getReceiverType().equals(ControlMessage.EntityType.COORDINATOR)) {
               if (topicCoordinators.containsKey(senderTopic)) {
                 
topicCoordinators.get(senderTopic).processControlEvent(message);
               }
             } else {
-              LOG.warn("Sender type of Control Message unknown {}", 
message.getSenderType().name());
+              log.warn("Sender type of Control Message unknown {}", 
message.getSenderType().name());
             }
           } catch (Exception e) {
-            LOG.error("Fatal error while consuming a kafka record for topic = 
{} partition = {}", record.topic(), record.partition(), e);
+            log.error("Fatal error while consuming a kafka record for topic = 
{} partition = {}", record.topic(), record.partition(), e);
           }
         }
         try {
           consumer.commitSync();
         } catch (CommitFailedException exception) {
-          LOG.error("Fatal error while committing kafka control topic");
+          log.error("Fatal error while committing kafka control topic");
         }
       }
     });
@@ -182,16 +180,16 @@ public class KafkaConnectControlAgent implements 
KafkaControlAgent {
     if (executorService != null) {
       boolean terminated = false;
       try {
-        LOG.info("Shutting down executor service.");
+        log.info("Shutting down executor service.");
         executorService.shutdown();
-        LOG.info("Awaiting termination.");
+        log.info("Awaiting termination.");
         terminated = 
executorService.awaitTermination(EXEC_SHUTDOWN_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         // ignored
       }
 
       if (!terminated) {
-        LOG.warn(
+        log.warn(
             "Unclean Kafka Control Manager executor service shutdown ");
         executorService.shutdownNow();
       }
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlProducer.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlProducer.java
index 2161d88485d2..5dde8795f362 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlProducer.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlProducer.java
@@ -20,14 +20,13 @@ package org.apache.hudi.connect.kafka;
 
 import org.apache.hudi.connect.ControlMessage;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Properties;
 
@@ -36,10 +35,9 @@ import java.util.Properties;
  * Control Topic that coordinates transactions
  * across Participants.
  */
+@Slf4j
 public class KafkaControlProducer {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaControlProducer.class);
-
   private final String bootstrapServers;
   private final String controlTopicName;
   private Producer<String, byte[]> producer;
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
index e7762b942f37..b3bd854a1e7e 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
@@ -28,9 +28,9 @@ import org.apache.hudi.connect.writers.KafkaConnectConfigs;
 import org.apache.hudi.connect.writers.KafkaConnectTransactionServices;
 import org.apache.hudi.exception.HoodieException;
 
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -52,11 +52,10 @@ import java.util.stream.Collectors;
  * coordinates the Hudi write transactions
  * across all the Kafka partitions for a single Kafka Topic.
  */
+@Slf4j
 public class ConnectTransactionCoordinator implements TransactionCoordinator, 
Runnable {
 
   public static final int COORDINATOR_KAFKA_PARTITION = 0;
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(ConnectTransactionCoordinator.class);
   private static final String BOOTSTRAP_SERVERS_CFG = "bootstrap.servers";
   private static final String KAFKA_OFFSET_KEY = "kafka.commit.offsets";
   private static final String KAFKA_OFFSET_DELIMITER = ",";
@@ -66,6 +65,7 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
   private static final int COORDINATOR_EVENT_LOOP_TIMEOUT_MS = 1000;
 
   private final KafkaConnectConfigs configs;
+  @Getter
   private final TopicPartition partition;
   private final KafkaControlAgent kafkaControlClient;
   private final ConnectTransactionServices transactionServices;
@@ -120,8 +120,7 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
       executorService.submit(this);
     }
     kafkaControlClient.registerTransactionCoordinator(this);
-    LOG.info(String.format("Start Transaction Coordinator for topic %s 
partition %s",
-        partition.topic(), partition.partition()));
+    log.info("Start Transaction Coordinator for topic {} partition {}", 
partition.topic(), partition.partition());
 
     initializeGlobalCommittedKafkaOffsets();
     // Submit the first start commit
@@ -139,34 +138,29 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
     if (executorService != null) {
       boolean terminated = false;
       try {
-        LOG.info("Shutting down executor service.");
+        log.info("Shutting down executor service.");
         executorService.shutdown();
-        LOG.info("Awaiting termination.");
+        log.info("Awaiting termination.");
         terminated = executorService.awaitTermination(100, 
TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         // ignored
       }
 
       if (!terminated) {
-        LOG.warn(
+        log.warn(
             "Unclean Kafka Control Manager executor service shutdown ");
         executorService.shutdownNow();
       }
     }
   }
 
-  @Override
-  public TopicPartition getPartition() {
-    return partition;
-  }
-
   @Override
   public void processControlEvent(ControlMessage message) {
     CoordinatorEvent.CoordinatorEventType type;
     if (message.getType().equals(ControlMessage.EventType.WRITE_STATUS)) {
       type = CoordinatorEvent.CoordinatorEventType.WRITE_STATUS;
     } else {
-      LOG.warn("Illegal message type [{}] processee by coordinator", 
message.getType().name());
+      log.warn("Illegal message type [{}] processee by coordinator", 
message.getType().name());
       return;
     }
 
@@ -186,7 +180,7 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
           processCoordinatorEvent(event);
         }
       } catch (InterruptedException exception) {
-        LOG.warn("Error received while polling the event loop in Partition 
Coordinator", exception);
+        log.warn("Error received while polling the event loop in Partition 
Coordinator", exception);
       }
     }
   }
@@ -223,7 +217,7 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
               && currentState.equals(State.ENDED_COMMIT)) {
             onReceiveWriteStatus(event.getMessage());
           } else {
-            LOG.warn("Could not process WRITE_STATUS due to missing message");
+            log.warn("Could not process WRITE_STATUS due to missing message");
           }
           break;
         case ACK_COMMIT:
@@ -236,7 +230,7 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
           throw new IllegalStateException("Partition Coordinator has received 
an illegal event type " + event.getEventType().name());
       }
     } catch (Exception exception) {
-      LOG.warn("Error received while polling the event loop in Partition 
Coordinator", exception);
+      log.warn("Error received while polling the event loop in Partition 
Coordinator", exception);
     }
   }
 
@@ -253,7 +247,7 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
               currentCommitTime),
           configs.getCommitIntervalSecs(), TimeUnit.SECONDS);
     } catch (Exception exception) {
-      LOG.error(String.format("Failed to start a new commit %s, will retry", 
currentCommitTime), exception);
+      log.error("Failed to start a new commit {}, will retry", 
currentCommitTime, exception);
       submitEvent(new 
CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
               partition.topic(),
               StringUtils.EMPTY_STRING),
@@ -265,7 +259,7 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
     try {
       
kafkaControlClient.publishMessage(buildControlMessage(ControlMessage.EventType.END_COMMIT));
     } catch (Exception exception) {
-      LOG.warn("Could not send END_COMMIT message for partition {} and 
commitTime {}",
+      log.warn("Could not send END_COMMIT message for partition {} and 
commitTime {}",
           partition, currentCommitTime, exception);
     }
     currentConsumedKafkaOffsets.clear();
@@ -300,7 +294,7 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
               transformKafkaOffsets(currentConsumedKafkaOffsets));
 
           if (success) {
-            LOG.info("Commit " + currentCommitTime + " successful!");
+            log.info("Commit {} successful!", currentCommitTime);
             currentState = State.WRITE_STATUS_RCVD;
             globalCommittedKafkaOffsets.putAll(currentConsumedKafkaOffsets);
             submitEvent(new 
CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.ACK_COMMIT,
@@ -308,29 +302,29 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
                 currentCommitTime));
             return;
           } else {
-            LOG.error("Commit " + currentCommitTime + " failed!");
+            log.error("Commit {} failed!", currentCommitTime);
           }
         } else if (hasErrors) {
-          LOG.error("Coordinator found errors when writing. Errors/Total=" + 
totalErrorRecords + "/" + totalRecords);
-          LOG.error("Printing out the top 100 errors");
+          log.error("Coordinator found errors when writing. 
Errors/Total={}/{}", totalErrorRecords, totalRecords);
+          log.error("Printing out the top 100 errors");
           
allWriteStatuses.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws 
-> {
-            LOG.error("Global error :", ws.getGlobalError());
+            log.error("Global error :", ws.getGlobalError());
             if (ws.getErrors().size() > 0) {
-              ws.getErrors().forEach((key, value) -> LOG.trace("Error for 
key:" + key + " is " + value));
+              ws.getErrors().forEach((key, value) -> log.trace("Error for 
key:{} is {}", key, value));
             }
           });
         }
 
         // Submit the next start commit, that will rollback the current commit.
         currentState = State.FAILED_COMMIT;
-        LOG.warn("Current commit {} failed. Starting a new commit after 
recovery delay of {} {}",
+        log.warn("Current commit {} failed. Starting a new commit after 
recovery delay of {} {}",
             currentCommitTime, RESTART_COMMIT_DELAY_MS, 
TimeUnit.MILLISECONDS.name());
         submitEvent(new 
CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
                 partition.topic(),
                 StringUtils.EMPTY_STRING),
             RESTART_COMMIT_DELAY_MS, TimeUnit.MILLISECONDS);
       } catch (Exception exception) {
-        LOG.error("Fatal error while committing file", exception);
+        log.error("Fatal error while committing file", exception);
       }
     }
   }
@@ -339,7 +333,7 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
     // If we are still stuck in ENDED_STATE
     if (currentState.equals(State.ENDED_COMMIT)) {
       currentState = State.WRITE_STATUS_TIMEDOUT;
-      LOG.warn("Current commit {} failed after a write status timeout. 
Starting a new commit after recovery delay of {} {}",
+      log.warn("Current commit {} failed after a write status timeout. 
Starting a new commit after recovery delay of {} {}",
           currentCommitTime, RESTART_COMMIT_DELAY_MS, 
TimeUnit.MILLISECONDS.name());
       // Submit the next start commit
       submitEvent(new 
CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
@@ -353,7 +347,7 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
     try {
       
kafkaControlClient.publishMessage(buildControlMessage(ControlMessage.EventType.ACK_COMMIT));
     } catch (Exception exception) {
-      LOG.warn("Could not send ACK_COMMIT message for partition {} and 
commitTime {}", partition, currentCommitTime, exception);
+      log.warn("Could not send ACK_COMMIT message for partition {} and 
commitTime {}", partition, currentCommitTime, exception);
     }
     currentState = State.ACKED_COMMIT;
 
@@ -369,11 +363,11 @@ public class ConnectTransactionCoordinator implements 
TransactionCoordinator, Ru
       Map<String, String> commitMetadata = 
transactionServices.fetchLatestExtraCommitMetadata();
       String latestKafkaOffsets = commitMetadata.get(KAFKA_OFFSET_KEY);
       if (!StringUtils.isNullOrEmpty(latestKafkaOffsets)) {
-        LOG.info("Retrieved Raw Kafka offsets from Hudi Commit File " + 
latestKafkaOffsets);
+        log.info("Retrieved Raw Kafka offsets from Hudi Commit File {}", 
latestKafkaOffsets);
         globalCommittedKafkaOffsets = 
Arrays.stream(latestKafkaOffsets.split(KAFKA_OFFSET_DELIMITER))
             .map(entry -> entry.split(KAFKA_OFFSET_KV_DELIMITER))
             .collect(Collectors.toMap(entry -> Integer.parseInt(entry[0]), 
entry -> Long.parseLong(entry[1])));
-        LOG.info("Initialized the kafka offset commits " + 
globalCommittedKafkaOffsets);
+        log.info("Initialized the kafka offset commits {}", 
globalCommittedKafkaOffsets);
       }
     } catch (Exception exception) {
       throw new HoodieException("Could not deserialize the kafka commit 
offsets", exception);
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
index dda176c2ba9a..747a29f9606e 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
@@ -28,11 +28,11 @@ import 
org.apache.hudi.connect.writers.KafkaConnectWriterProvider;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTaskContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.LinkedList;
@@ -44,12 +44,12 @@ import java.util.concurrent.LinkedBlockingQueue;
  * Implementation of the {@link TransactionParticipant} that coordinates the 
Hudi write transactions
  * based on events from the {@link TransactionCoordinator} and manages the 
Hudi Writes for a specific Kafka Partition.
  */
+@Slf4j
 public class ConnectTransactionParticipant implements TransactionParticipant {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(ConnectTransactionParticipant.class);
-
   private final LinkedList<SinkRecord> buffer;
   private final BlockingQueue<ControlMessage> controlEvents;
+  @Getter
   private final TopicPartition partition;
   private final SinkTaskContext context;
   private final KafkaControlAgent kafkaControlAgent;
@@ -81,7 +81,7 @@ public class ConnectTransactionParticipant implements 
TransactionParticipant {
 
   @Override
   public void start() {
-    LOG.info("Start Hudi Transaction Participant for partition " + 
partition.partition());
+    log.info("Start Hudi Transaction Participant for partition {}", 
partition.partition());
     this.kafkaControlAgent.registerTransactionParticipant(this);
     context.pause(partition);
   }
@@ -107,11 +107,6 @@ public class ConnectTransactionParticipant implements 
TransactionParticipant {
     return committedKafkaOffset;
   }
 
-  @Override
-  public TopicPartition getPartition() {
-    return partition;
-  }
-
   @Override
   public void processRecords() {
     while (!controlEvents.isEmpty()) {
@@ -146,21 +141,21 @@ public class ConnectTransactionParticipant implements 
TransactionParticipant {
     syncKafkaOffsetWithLeader(message);
     context.resume(partition);
     String currentCommitTime = message.getCommitTime();
-    LOG.info("Started a new transaction after receiving START_COMMIT for 
commit " + currentCommitTime);
+    log.info("Started a new transaction after receiving START_COMMIT for 
commit {}", currentCommitTime);
     try {
       ongoingTransactionInfo = new TransactionInfo<>(currentCommitTime, 
writerProvider.getWriter(currentCommitTime));
       ongoingTransactionInfo.setExpectedKafkaOffset(committedKafkaOffset);
     } catch (Exception exception) {
-      LOG.warn("Failed to start a new transaction", exception);
+      log.warn("Failed to start a new transaction", exception);
     }
   }
 
   private void handleEndCommit(ControlMessage message) {
     if (ongoingTransactionInfo == null) {
-      LOG.warn("END_COMMIT {} is received while we were NOT in active 
transaction", message.getCommitTime());
+      log.warn("END_COMMIT {} is received while we were NOT in active 
transaction", message.getCommitTime());
       return;
     } else if 
(!ongoingTransactionInfo.getCommitTime().equals(message.getCommitTime())) {
-      LOG.error("Fatal error received END_COMMIT with commit time {} while 
local transaction commit time {}",
+      log.error("Fatal error received END_COMMIT with commit time {} while 
local transaction commit time {}",
           message.getCommitTime(), ongoingTransactionInfo.getCommitTime());
       // Recovery: A new END_COMMIT from leader caused interruption to an 
existing transaction,
       // explicitly reset Kafka commit offset to ensure no data loss
@@ -194,7 +189,7 @@ public class ConnectTransactionParticipant implements 
TransactionParticipant {
 
       kafkaControlAgent.publishMessage(writeStatusEvent);
     } catch (Exception exception) {
-      LOG.error(String.format("Error writing records and ending commit %s for 
partition %s", message.getCommitTime(), partition.partition()), exception);
+      log.error("Error writing records and ending commit {} for partition {}", 
message.getCommitTime(), partition.partition(), exception);
       throw new HoodieIOException(String.format("Error writing records and 
ending commit %s for partition %s", message.getCommitTime(), 
partition.partition()),
           new IOException(exception));
     }
@@ -219,16 +214,16 @@ public class ConnectTransactionParticipant implements 
TransactionParticipant {
             ongoingTransactionInfo.getWriter().writeRecord(record);
             ongoingTransactionInfo.setExpectedKafkaOffset(record.kafkaOffset() 
+ 1);
           } else if (record != null && record.kafkaOffset() > 
ongoingTransactionInfo.getExpectedKafkaOffset()) {
-            LOG.warn("Received a kafka record with offset {} above the next 
expected kafka offset {} for partition {}. "
+            log.warn("Received a kafka record with offset {} above the next 
expected kafka offset {} for partition {}. "
                 + "Resetting the kafka offset to {}", record.kafkaOffset(), 
ongoingTransactionInfo.getExpectedKafkaOffset(), partition, 
ongoingTransactionInfo.getExpectedKafkaOffset());
             context.offset(partition, 
ongoingTransactionInfo.getExpectedKafkaOffset());
           } else if (record != null && record.kafkaOffset() < 
ongoingTransactionInfo.getExpectedKafkaOffset()) {
-            LOG.info("Received a kafka record with offset {} below the next 
expected kafka offset {} for partition {}. "
+            log.info("Received a kafka record with offset {} below the next 
expected kafka offset {} for partition {}. "
                 + "No action will be taken but this record will be ignored 
since its already written", record.kafkaOffset(), 
ongoingTransactionInfo.getExpectedKafkaOffset(), partition);
           }
           buffer.poll();
         } catch (Exception exception) {
-          LOG.warn("Failed to write records for transaction [{}] in partition 
[{}]",
+          log.warn("Failed to write records for transaction [{}] in partition 
[{}]",
               ongoingTransactionInfo.getCommitTime(), partition.partition(), 
exception);
         }
       }
@@ -241,7 +236,7 @@ public class ConnectTransactionParticipant implements 
TransactionParticipant {
         ongoingTransactionInfo.getWriter().close();
         ongoingTransactionInfo = null;
       } catch (HoodieIOException exception) {
-        LOG.warn("Failed to cleanup existing transaction", exception);
+        log.warn("Failed to cleanup existing transaction", exception);
       }
     }
   }
@@ -254,14 +249,14 @@ public class ConnectTransactionParticipant implements 
TransactionParticipant {
       if (coordinatorCommittedKafkaOffset != null && 
coordinatorCommittedKafkaOffset >= 0) {
         // Debug only messages
         if (coordinatorCommittedKafkaOffset != committedKafkaOffset) {
-          LOG.warn("The coordinator offset for kafka partition {} is {} while 
the locally committed offset is {}. "
+          log.warn("The coordinator offset for kafka partition {} is {} while 
the locally committed offset is {}. "
               + "Resetting the local committed offset to the coordinator 
provided one to ensure consistency", partition, 
coordinatorCommittedKafkaOffset, committedKafkaOffset);
         }
         committedKafkaOffset = coordinatorCommittedKafkaOffset;
         return;
       }
     } else {
-      LOG.warn("The coordinator offset for kafka partition {} is not present 
while the locally committed offset is {}. "
+      log.warn("The coordinator offset for kafka partition {} is not present 
while the locally committed offset is {}. "
           + "Resetting the local committed offset to 0 to avoid data loss", 
partition, committedKafkaOffset);
     }
     // If the coordinator does not have a committed offset for this partition, 
reset to zero offset.
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/CoordinatorEvent.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/CoordinatorEvent.java
index f9f467a83bec..a998afb191c8 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/CoordinatorEvent.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/CoordinatorEvent.java
@@ -20,16 +20,21 @@ package org.apache.hudi.connect.transaction;
 
 import org.apache.hudi.connect.ControlMessage;
 
+import lombok.Getter;
+import lombok.Setter;
+
 /**
  * The events within the Coordinator that trigger
  * the state changes in the state machine of
  * the Coordinator.
  */
+@Getter
 public class CoordinatorEvent {
 
   private final CoordinatorEventType eventType;
   private final String topicName;
   private final String commitTime;
+  @Setter
   private ControlMessage message;
 
   public CoordinatorEvent(CoordinatorEventType eventType,
@@ -40,26 +45,6 @@ public class CoordinatorEvent {
     this.commitTime = commitTime;
   }
 
-  public CoordinatorEventType getEventType() {
-    return eventType;
-  }
-
-  public String getTopicName() {
-    return topicName;
-  }
-
-  public String getCommitTime() {
-    return commitTime;
-  }
-
-  public ControlMessage getMessage() {
-    return message;
-  }
-
-  public void setMessage(ControlMessage message) {
-    this.message = message;
-  }
-
   /**
    * The type of Coordinator Event.
    */
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionInfo.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionInfo.java
index 7c1852e5fa5c..68e9da875b0f 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionInfo.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionInfo.java
@@ -20,15 +20,20 @@ package org.apache.hudi.connect.transaction;
 
 import org.apache.hudi.connect.writers.ConnectWriter;
 
+import lombok.Getter;
+import lombok.Setter;
+
 /**
  * Stores all the state for the current Transaction within a
  * {@link TransactionParticipant}.
  * @param <T> The type of status returned by the underlying writer.
  */
+@Getter
 public class TransactionInfo<T> {
 
   private final String commitTime;
   private final ConnectWriter<T> writer;
+  @Setter
   private long expectedKafkaOffset;
   private boolean commitInitiated;
 
@@ -39,26 +44,6 @@ public class TransactionInfo<T> {
     this.commitInitiated = false;
   }
 
-  public String getCommitTime() {
-    return commitTime;
-  }
-
-  public ConnectWriter<T> getWriter() {
-    return writer;
-  }
-
-  public long getExpectedKafkaOffset() {
-    return expectedKafkaOffset;
-  }
-
-  public boolean isCommitInitiated() {
-    return commitInitiated;
-  }
-
-  public void setExpectedKafkaOffset(long expectedKafkaOffset) {
-    this.expectedKafkaOffset = expectedKafkaOffset;
-  }
-
   public void commitInitiated() {
     this.commitInitiated = true;
   }
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
index 266a2d932b71..d77873065b9b 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
@@ -39,13 +39,12 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.storage.StorageConfiguration;
 
 import com.google.protobuf.ByteString;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.KafkaFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.file.FileVisitOption;
@@ -67,9 +66,8 @@ import static 
org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 /**
  * Helper methods for Kafka.
  */
+@Slf4j
 public class KafkaConnectUtils {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConnectUtils.class);
   private static final String HOODIE_CONF_PREFIX = "hoodie.";
   public static final String HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
   public static final String HADOOP_HOME = "HADOOP_HOME";
@@ -82,10 +80,10 @@ public class KafkaConnectUtils {
       String hadoopHomePath = System.getenv(HADOOP_HOME);
       DEFAULT_HADOOP_CONF_FILES.addAll(getHadoopConfigFiles(hadoopConfigPath, 
hadoopHomePath));
       if (!DEFAULT_HADOOP_CONF_FILES.isEmpty()) {
-        LOG.info(String.format("Found Hadoop default config files %s", 
DEFAULT_HADOOP_CONF_FILES));
+        log.info("Found Hadoop default config files {}", 
DEFAULT_HADOOP_CONF_FILES);
       }
     } catch (IOException e) {
-      LOG.error("An error occurred while getting the default Hadoop 
configuration. "
+      log.error("An error occurred while getting the default Hadoop 
configuration. "
               + "Please use hadoop.conf.dir or hadoop.home to configure Hadoop 
environment variables", e);
     }
   }
@@ -127,7 +125,7 @@ public class KafkaConnectUtils {
       Map<String, KafkaFuture<TopicDescription>> values = result.values();
       KafkaFuture<TopicDescription> topicDescription = values.get(topicName);
       int numPartitions = topicDescription.get().partitions().size();
-      LOG.info(String.format("Latest number of partitions for topic %s is %s", 
topicName, numPartitions));
+      log.info("Latest number of partitions for topic {} is {}", topicName, 
numPartitions);
       return numPartitions;
     } catch (Exception exception) {
       throw new HoodieException("Fatal error fetching the latest partition of 
kafka topic name" + topicName, exception);
@@ -221,7 +219,7 @@ public class KafkaConnectUtils {
     try {
       md = MessageDigest.getInstance("MD5");
     } catch (NoSuchAlgorithmException e) {
-      LOG.error("Fatal error selecting hash algorithm", e);
+      log.error("Fatal error selecting hash algorithm", e);
       throw new HoodieException(e);
     }
     byte[] digest = 
Objects.requireNonNull(md).digest(getUTF8Bytes(stringToHash));
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
index ff53c9dc28b9..70fefe431603 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
@@ -29,10 +29,9 @@ import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.kafka.connect.sink.SinkRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
@@ -43,12 +42,12 @@ import java.util.List;
  * converting them to {@link HoodieRecord}s that can be written to Hudi by
  * the derived implementations of this class.
  */
+@Slf4j
 public abstract class AbstractConnectWriter implements 
ConnectWriter<WriteStatus> {
 
   public static final String KAFKA_AVRO_CONVERTER = 
"io.confluent.connect.avro.AvroConverter";
   public static final String KAFKA_JSON_CONVERTER = 
"org.apache.kafka.connect.json.JsonConverter";
   public static final String KAFKA_STRING_CONVERTER = 
"org.apache.kafka.connect.storage.StringConverter";
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractConnectWriter.class);
   protected final String instantTime;
 
   private final KeyGenerator keyGenerator;
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
index da6fe5db7da1..5956956479dd 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
@@ -35,9 +35,8 @@ import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.schema.SchemaProvider;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -48,10 +47,9 @@ import java.util.List;
  * Specific implementation of a Hudi Writer that buffers all incoming records,
  * and writes them to Hudi files on the end of a transaction using Bulk Insert.
  */
+@Slf4j
 public class BufferedConnectWriter extends AbstractConnectWriter {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(BufferedConnectWriter.class);
-
   private final HoodieEngineContext context;
   private final HoodieJavaWriteClient writeClient;
   private final HoodieWriteConfig config;
@@ -75,7 +73,7 @@ public class BufferedConnectWriter extends 
AbstractConnectWriter {
     try {
       // Load and batch all incoming records in a map
       long memoryForMerge = 
IOUtils.getMaxMemoryPerPartitionMerge(context.getTaskContextSupplier(), config);
-      LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
+      log.info("MaxMemoryPerPartitionMerge => {}", memoryForMerge);
       this.bufferedRecords = new ExternalSpillableMap<>(memoryForMerge,
           config.getSpillableMapBasePath(),
           new DefaultSizeEstimator(),
@@ -97,12 +95,10 @@ public class BufferedConnectWriter extends 
AbstractConnectWriter {
   @Override
   public List<WriteStatus> flushRecords() {
     try {
-      LOG.info("Number of entries in MemoryBasedMap => "
-          + bufferedRecords.getInMemoryMapNumEntries()
-          + ", Total size in bytes of MemoryBasedMap => "
-          + bufferedRecords.getCurrentInMemoryMapSize() + ", Number of entries 
in BitCaskDiskMap => "
-          + bufferedRecords.getDiskBasedMapNumEntries() + ", Size of file 
spilled to disk => "
-          + bufferedRecords.getSizeOfFileOnDiskInBytes());
+      log.info("Number of entries in MemoryBasedMap => {}, Total size in bytes 
of MemoryBasedMap => {}, "
+              + "Number of entries in BitCaskDiskMap => {}, Size of file 
spilled to disk => {}",
+          bufferedRecords.getInMemoryMapNumEntries(), 
bufferedRecords.getCurrentInMemoryMapSize(),
+          bufferedRecords.getDiskBasedMapNumEntries(), 
bufferedRecords.getSizeOfFileOnDiskInBytes());
       List<WriteStatus> writeStatuses = new ArrayList<>();
 
       boolean isMorTable = 
Option.ofNullable(connectConfigs.getString(HoodieTableConfig.TYPE))
@@ -122,7 +118,7 @@ public class BufferedConnectWriter extends 
AbstractConnectWriter {
         }
       }
       bufferedRecords.close();
-      LOG.info("Flushed hudi records and got writeStatuses: " + writeStatuses);
+      log.info("Flushed hudi records and got writeStatuses: {}", 
writeStatuses);
       return writeStatuses;
     } catch (Exception e) {
       throw new HoodieIOException("Write records failed", new IOException(e));
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
index c7a4d6314c59..21c76f5aa8c6 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
@@ -40,10 +40,9 @@ import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.sync.common.HoodieSyncConfig;
 import org.apache.hudi.sync.common.util.SyncUtilHelpers;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -57,10 +56,9 @@ import java.util.Set;
  * {@link TransactionCoordinator}
  * using {@link HoodieJavaWriteClient}.
  */
+@Slf4j
 public class KafkaConnectTransactionServices implements 
ConnectTransactionServices {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConnectTransactionServices.class);
-
   private final KafkaConnectConfigs connectConfigs;
   private final Option<HoodieTableMetaClient> tableMetaClient;
   private final StorageConfiguration<Configuration> storageConf;
@@ -91,8 +89,7 @@ public class KafkaConnectTransactionServices implements 
ConnectTransactionServic
       String partitionColumns = 
KafkaConnectUtils.getPartitionColumnsForKeyGenerator(keyGenerator,
           connectConfigs.getProps());
 
-      LOG.info(String.format("Setting record key %s and partition fields %s 
for table %s",
-          recordKeyFields, partitionColumns, tableBasePath + tableName));
+      log.info("Setting record key {} and partition fields {} for table {}", 
recordKeyFields, partitionColumns, tableBasePath + tableName);
 
       tableMetaClient = Option.of(HoodieTableMetaClient.newTableBuilder()
           .setTableType(HoodieTableType.COPY_ON_WRITE.name())
@@ -116,7 +113,7 @@ public class KafkaConnectTransactionServices implements 
ConnectTransactionServic
   public String startCommit() {
     String newCommitTime = javaClient.startCommit();
     javaClient.transitionInflight(newCommitTime);
-    LOG.info("Starting Hudi commit " + newCommitTime);
+    log.info("Starting Hudi commit {}", newCommitTime);
     return newCommitTime;
   }
 
@@ -124,16 +121,16 @@ public class KafkaConnectTransactionServices implements 
ConnectTransactionServic
   public boolean endCommit(String commitTime, List<WriteStatus> writeStatuses, 
Map<String, String> extraMetadata) {
     boolean success = javaClient.commit(commitTime, writeStatuses, 
Option.of(extraMetadata));
     if (success) {
-      LOG.info("Ending Hudi commit " + commitTime);
+      log.info("Ending Hudi commit {}", commitTime);
 
       // Schedule clustering and compaction as needed.
       if (writeConfig.isAsyncClusteringEnabled()) {
         javaClient.scheduleClustering(Option.empty()).ifPresent(
-            instantTs -> LOG.info("Scheduled clustering at instant time:" + 
instantTs));
+            instantTs -> log.info("Scheduled clustering at instant time:{}", 
instantTs));
       }
       if (isAsyncCompactionEnabled()) {
         javaClient.scheduleCompaction(Option.empty()).ifPresent(
-            instantTs -> LOG.info("Scheduled compaction at instant time:" + 
instantTs));
+            instantTs -> log.info("Scheduled compaction at instant time:{}", 
instantTs));
       }
       syncMeta();
     }
@@ -147,7 +144,7 @@ public class KafkaConnectTransactionServices implements 
ConnectTransactionServic
       if (metadata.isPresent()) {
         return metadata.get().getExtraMetadata();
       } else {
-        LOG.info("Hoodie Extra Metadata from latest commit is absent");
+        log.info("Hoodie Extra Metadata from latest commit is absent");
         return Collections.emptyMap();
       }
     }
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
index acf8baacf3e9..92ed7de4484e 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
@@ -41,10 +41,9 @@ import 
org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
 import org.apache.hudi.schema.SchemaProvider;
 import org.apache.hudi.storage.StorageConfiguration;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 
@@ -52,10 +51,9 @@ import java.util.Collections;
  * Provides the Hudi Writer for the {@link 
org.apache.hudi.connect.transaction.TransactionParticipant}
  * to write the incoming records to Hudi.
  */
+@Slf4j
 public class KafkaConnectWriterProvider implements 
ConnectWriterProvider<WriteStatus> {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConnectWriterProvider.class);
-
   private final KafkaConnectConfigs connectConfigs;
   private final HoodieEngineContext context;
   private final HoodieWriteConfig writeConfig;
diff --git 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
index d939351a58f6..5ce1710bb785 100644
--- 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
+++ 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
@@ -30,6 +30,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.helper.MockConnectTransactionServices;
 import org.apache.hudi.helper.MockKafkaControlAgent;
 
+import lombok.Getter;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.jupiter.api.BeforeEach;
@@ -107,6 +108,7 @@ public class TestConnectTransactionCoordinator {
   private static class MockParticipant implements TransactionParticipant {
 
     private final MockKafkaControlAgent kafkaControlAgent;
+    @Getter
     private final TopicPartition partition;
     private final CountDownLatch latch;
     private final TestScenarios testScenario;
@@ -148,11 +150,6 @@ public class TestConnectTransactionCoordinator {
     public void processRecords() {
     }
 
-    @Override
-    public TopicPartition getPartition() {
-      return partition;
-    }
-
     @Override
     public void processControlEvent(ControlMessage message) {
       assertEquals(message.getSenderType(), 
ControlMessage.EntityType.COORDINATOR);
diff --git 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java
 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java
index a1c1c7ac849c..50c3c45bcfe2 100644
--- 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java
+++ 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java
@@ -28,6 +28,7 @@ import org.apache.hudi.helper.MockKafkaConnect;
 import org.apache.hudi.helper.MockKafkaControlAgent;
 import org.apache.hudi.helper.TestHudiWriterProvider;
 
+import lombok.Getter;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -194,9 +195,12 @@ public class TestConnectTransactionParticipant {
     }
 
     private final KafkaControlAgent kafkaControlAgent;
+    @Getter
     private final TopicPartition partition;
 
+    @Getter
     private Option<ControlMessage> lastReceivedWriteStatusEvent;
+    @Getter
     private long committedKafkaOffset;
 
     public MockCoordinator(KafkaControlAgent kafkaControlAgent) {
@@ -229,14 +233,6 @@ public class TestConnectTransactionParticipant {
       }
     }
 
-    public Option<ControlMessage> getLastReceivedWriteStatusEvent() {
-      return lastReceivedWriteStatusEvent;
-    }
-
-    public long getCommittedKafkaOffset() {
-      return committedKafkaOffset;
-    }
-
     @Override
     public void start() {
       kafkaControlAgent.registerTransactionCoordinator(this);
@@ -247,11 +243,6 @@ public class TestConnectTransactionParticipant {
       kafkaControlAgent.deregisterTransactionCoordinator(this);
     }
 
-    @Override
-    public TopicPartition getPartition() {
-      return partition;
-    }
-
     @Override
     public void processControlEvent(ControlMessage message) {
       if (message.getType().equals(ControlMessage.EventType.WRITE_STATUS)) {
diff --git 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaConnect.java 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaConnect.java
index 66ee2b597cf7..ea0209c419f9 100644
--- 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaConnect.java
+++ 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaConnect.java
@@ -20,6 +20,8 @@ package org.apache.hudi.helper;
 
 import org.apache.hudi.connect.transaction.TransactionParticipant;
 
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -44,8 +46,11 @@ public class MockKafkaConnect implements SinkTaskContext {
 
   private final TopicPartition testPartition;
 
+  @Setter
   private TransactionParticipant participant;
+  @Getter
   private long currentKafkaOffset;
+  @Getter
   private boolean isPaused;
   private boolean isResetOffset;
 
@@ -56,22 +61,10 @@ public class MockKafkaConnect implements SinkTaskContext {
     isResetOffset = false;
   }
 
-  public void setParticipant(TransactionParticipant participant) {
-    this.participant = participant;
-  }
-
-  public boolean isPaused() {
-    return isPaused;
-  }
-
   public boolean isResumed() {
     return !isPaused;
   }
 
-  public long getCurrentKafkaOffset() {
-    return currentKafkaOffset;
-  }
-
   @Override
   public void pause(TopicPartition... partitions) {
     if (Arrays.stream(partitions).allMatch(testPartition::equals)) {
diff --git 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestHudiWriterProvider.java
 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestHudiWriterProvider.java
index 45c9b03725f5..87fd5ab36e10 100644
--- 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestHudiWriterProvider.java
+++ 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestHudiWriterProvider.java
@@ -22,6 +22,8 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.connect.writers.ConnectWriter;
 import org.apache.hudi.connect.writers.ConnectWriterProvider;
 
+import lombok.Getter;
+import lombok.NoArgsConstructor;
 import org.apache.kafka.connect.sink.SinkRecord;
 
 import java.util.List;
@@ -30,13 +32,11 @@ import java.util.List;
  * Helper class the provides a Hudi writer and
  * maintains stats that are used for test validation.
  */
+@NoArgsConstructor
 public class TestHudiWriterProvider implements 
ConnectWriterProvider<WriteStatus>  {
 
   private TestHudiWriter currentWriter;
 
-  public TestHudiWriterProvider() {
-  }
-
   public int getLatestNumberWrites() {
     return (currentWriter != null) ? currentWriter.numberRecords : 0;
   }
@@ -51,6 +51,7 @@ public class TestHudiWriterProvider implements 
ConnectWriterProvider<WriteStatus
     return currentWriter;
   }
 
+  @Getter
   private static class TestHudiWriter implements ConnectWriter<WriteStatus> {
 
     private int numberRecords;
@@ -61,14 +62,6 @@ public class TestHudiWriterProvider implements 
ConnectWriterProvider<WriteStatus
       this.isClosed = false;
     }
 
-    public int getNumberRecords() {
-      return numberRecords;
-    }
-
-    public boolean isClosed() {
-      return isClosed;
-    }
-
     @Override
     public void writeRecord(SinkRecord record) {
       numberRecords++;
diff --git 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
index ce279c89df6a..f9bd2cccbb1d 100644
--- 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
+++ 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
@@ -34,6 +34,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.schema.SchemaProvider;
 
+import lombok.Getter;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
@@ -147,6 +148,7 @@ public class TestAbstractConnectWriter {
 
   private static class AbstractHudiConnectWriterTestWrapper extends 
AbstractConnectWriter {
 
+    @Getter
     private List<HoodieRecord> writtenRecords;
 
     public AbstractHudiConnectWriterTestWrapper(KafkaConnectConfigs 
connectConfigs, KeyGenerator keyGenerator, SchemaProvider schemaProvider) {
@@ -154,10 +156,6 @@ public class TestAbstractConnectWriter {
       writtenRecords = new ArrayList<>();
     }
 
-    public List<HoodieRecord> getWrittenRecords() {
-      return writtenRecords;
-    }
-
     @Override
     protected void writeHudiRecord(HoodieRecord<?> record) {
       writtenRecords.add(record);

Reply via email to