This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 63698543a78f refactor: Add lombok annotations to hudi-flink-client 
module (#17534)
63698543a78f is described below

commit 63698543a78fb0c377e507728b48866e4e5a28fb
Author: voonhous <[email protected]>
AuthorDate: Wed Dec 10 10:15:10 2025 +0800

    refactor: Add lombok annotations to hudi-flink-client module (#17534)
---
 hudi-client/hudi-flink-client/pom.xml              |  6 +++
 .../hudi/client/HoodieFlinkTableServiceClient.java | 14 +++---
 .../apache/hudi/client/HoodieFlinkWriteClient.java | 17 +++----
 .../FlinkSizeBasedClusteringPlanStrategy.java      |  5 +-
 .../hudi/client/model/HoodieFlinkInternalRow.java  |  6 +--
 .../hudi/index/state/FlinkInMemoryStateIndex.java  |  6 +--
 .../apache/hudi/io/ExplicitWriteHandleFactory.java | 13 ++----
 .../java/org/apache/hudi/io/FlinkAppendHandle.java |  8 ++--
 .../java/org/apache/hudi/io/FlinkConcatHandle.java |  7 ++-
 .../java/org/apache/hudi/io/FlinkCreateHandle.java | 16 +++----
 ...FileGroupReaderBasedIncrementalMergeHandle.java |  6 +--
 .../io/FlinkFileGroupReaderBasedMergeHandle.java   | 14 +++---
 .../hudi/io/FlinkIncrementalConcatHandle.java      |  7 ++-
 .../hudi/io/FlinkIncrementalMergeHandle.java       |  6 +--
 .../FlinkIncrementalMergeHandleWithChangeLog.java  |  6 +--
 .../java/org/apache/hudi/io/FlinkMergeHandle.java  | 16 +++----
 .../hudi/io/FlinkMergeHandleWithChangeLog.java     |  6 +--
 .../io/storage/row/HoodieRowDataCreateHandle.java  |  7 ++-
 .../row/parquet/ParquetSchemaConverter.java        |  5 +-
 .../apache/hudi/io/v2/RowDataLogWriteHandle.java   |  8 ++--
 .../FlinkHoodieBackedTableMetadataWriter.java      |  5 +-
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |  8 ++--
 .../commit/BaseFlinkCommitActionExecutor.java      | 10 ++--
 .../commit/FlinkPartitionTTLActionExecutor.java    | 10 ++--
 .../HoodieFlinkMergeOnReadTableCompactor.java      |  7 ++-
 .../hudi/table/format/FlinkRecordContext.java      |  6 +--
 .../table/upgrade/FlinkUpgradeDowngradeHelper.java |  7 +--
 .../apache/hudi/util/AvroToRowDataConverters.java  |  6 +--
 .../apache/hudi/util/RowDataAvroQueryContexts.java | 53 ++++++----------------
 .../testutils/HoodieFlinkWriteableTestTable.java   |  7 ++-
 30 files changed, 117 insertions(+), 181 deletions(-)

diff --git a/hudi-client/hudi-flink-client/pom.xml 
b/hudi-client/hudi-flink-client/pom.xml
index 235a173ddaf3..494fcd9cbf01 100644
--- a/hudi-client/hudi-flink-client/pom.xml
+++ b/hudi-client/hudi-flink-client/pom.xml
@@ -126,6 +126,12 @@
             <artifactId>kryo-shaded</artifactId>
         </dependency>
 
+        <!-- Lombok -->
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+
         <!-- Hoodie - Test -->
         <dependency>
             <groupId>org.apache.hudi</groupId>
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
index a0580175ea2f..74acada85bcc 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
@@ -48,17 +48,15 @@ import org.apache.hudi.table.action.compact.CompactHelpers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.util.FlinkClientUtil;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.text.ParseException;
 import java.util.List;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClient<List<HoodieRecord<T>>, List<WriteStatus>, 
List<WriteStatus>> {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieFlinkTableServiceClient.class);
-
   protected HoodieFlinkTableServiceClient(HoodieEngineContext context,
                                           HoodieWriteConfig clientConfig,
                                           Option<EmbeddedTimelineService> 
timelineService) {
@@ -82,7 +80,7 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
       // Do not do any conflict resolution here as we do with regular writes. 
We take the lock here to ensure all writes to metadata table happens within a
       // single lock (single writer). Because more than one write to metadata 
table will result in conflicts since all of them updates the same partition.
       writeTableMetadata(table, compactionCommitTime, metadata);
-      LOG.info("Committing Compaction {} finished with result {}.", 
compactionCommitTime, metadata);
+      log.info("Committing Compaction {} finished with result {}.", 
compactionCommitTime, metadata);
       CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
     } finally {
       this.txnManager.endStateChange(Option.of(compactionInstant));
@@ -100,7 +98,7 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
             + config.getBasePath() + " at time " + compactionCommitTime, e);
       }
     }
-    LOG.info("Compacted successfully on commit " + compactionCommitTime);
+    log.info("Compacted successfully on commit " + compactionCommitTime);
   }
 
   protected void completeClustering(
@@ -129,7 +127,7 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
       // Because more than one write to metadata table will result in 
conflicts since all of them updates the same partition.
       writeTableMetadata(table, clusteringCommitTime, metadata);
 
-      LOG.info("Committing Clustering {} finished with result {}.", 
clusteringCommitTime, metadata);
+      log.info("Committing Clustering {} finished with result {}.", 
clusteringCommitTime, metadata);
       ClusteringUtils.transitionClusteringOrReplaceInflightToComplete(
           false,
           clusteringInstant,
@@ -155,7 +153,7 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
             + config.getBasePath() + " at time " + clusteringCommitTime, e);
       }
     }
-    LOG.info("Clustering successfully on commit " + clusteringCommitTime);
+    log.info("Clustering successfully on commit " + clusteringCommitTime);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index d380b934de27..82f15e8832cd 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -50,9 +50,10 @@ import org.apache.hudi.table.action.commit.BucketType;
 import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
 
 import com.codahale.metrics.Timer;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Comparator;
@@ -71,13 +72,12 @@ import java.util.stream.Collectors;
  *
  * @param <T> type of the payload
  */
+@Slf4j
 @SuppressWarnings("checkstyle:LineLength")
 public class HoodieFlinkWriteClient<T>
     extends BaseHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>>
     implements FlinkRowDataWriteClient<T> {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieFlinkWriteClient.class);
-
   /**
    * FileID to write handle mapping in order to record the write handles for 
each file group,
    * so that we can append the mini-batch data buffer incrementally.
@@ -341,9 +341,9 @@ public class HoodieFlinkWriteClient<T>
    */
   public void waitForCleaningFinish() {
     if (tableServiceClient.asyncCleanerService != null) {
-      LOG.info("Cleaner has been spawned already. Waiting for it to finish");
+      log.info("Cleaner has been spawned already. Waiting for it to finish");
       tableServiceClient.asyncClean();
-      LOG.info("Cleaner has finished");
+      log.info("Cleaner has finished");
     }
   }
 
@@ -495,6 +495,7 @@ public class HoodieFlinkWriteClient<T>
   }
 
   private final class AutoCloseableWriteHandle implements AutoCloseable {
+    @Getter(AccessLevel.PACKAGE)
     private final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
 
     AutoCloseableWriteHandle(
@@ -514,10 +515,6 @@ public class HoodieFlinkWriteClient<T>
       this.writeHandle = getOrCreateWriteHandle(bucketInfo, getConfig(), 
instantTime, table, recordIterator, overwrite);
     }
 
-    HoodieWriteHandle<?, ?, ?, ?> getWriteHandle() {
-      return writeHandle;
-    }
-
     @Override
     public void close() {
       ((MiniBatchHandle) writeHandle).closeGracefully();
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
index a6684339b773..d5f82532c649 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
@@ -31,8 +31,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import 
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.HashMap;
 import java.util.List;
@@ -46,9 +45,9 @@ import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_C
  * 1) Creates clustering groups based on max size allowed per group.
  * 2) Excludes files that are greater than 'small.file.limit' from clustering 
plan.
  */
+@Slf4j
 public class FlinkSizeBasedClusteringPlanStrategy<T>
     extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkSizeBasedClusteringPlanStrategy.class);
 
   public FlinkSizeBasedClusteringPlanStrategy(HoodieTable table,
                                               HoodieEngineContext 
engineContext,
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkInternalRow.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkInternalRow.java
index 608301e4d901..c54e41eb3b21 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkInternalRow.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkInternalRow.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client.model;
 
 import org.apache.hudi.common.model.HoodieOperation;
 
+import lombok.Getter;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
@@ -55,6 +56,7 @@ public class HoodieFlinkInternalRow implements Serializable {
   // there is no rowData for index record
   private final BooleanValue isIndexRecord;
 
+  @Getter
   private final RowData rowData;
 
   public HoodieFlinkInternalRow(String recordKey, String partitionPath, String 
operationType, RowData rowData) {
@@ -118,10 +120,6 @@ public class HoodieFlinkInternalRow implements 
Serializable {
     return isIndexRecord.getValue();
   }
 
-  public RowData getRowData() {
-    return rowData;
-  }
-
   public HoodieFlinkInternalRow copy(RowDataSerializer rowDataSerializer) {
     return new HoodieFlinkInternalRow(
         this.recordKey.toString(),
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
index 693286df3a66..1d38fb7b7b18 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
@@ -28,18 +28,16 @@ import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.List;
 
 /**
  * Hoodie index implementation backed by flink state.
  */
+@Slf4j
 public class FlinkInMemoryStateIndex extends HoodieIndex<List<HoodieRecord>, 
List<WriteStatus>> {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkInMemoryStateIndex.class);
-
   public FlinkInMemoryStateIndex(HoodieFlinkEngineContext context, 
HoodieWriteConfig config) {
     super(config);
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java
index c040603db4c4..68cf4af675af 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java
@@ -22,17 +22,18 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
 /**
  * Create handle factory for Flink writer, use the specified write handle 
directly.
  */
+@AllArgsConstructor
+@Getter
 public class ExplicitWriteHandleFactory<T, I, K, O>
     extends WriteHandleFactory<T, I, K, O> {
   private final HoodieWriteHandle<T, I, K, O> writeHandle;
 
-  public ExplicitWriteHandleFactory(HoodieWriteHandle<T, I, K, O> writeHandle) 
{
-    this.writeHandle = writeHandle;
-  }
-
   @Override
   public HoodieWriteHandle<T, I, K, O> create(
       HoodieWriteConfig hoodieConfig, String commitTime,
@@ -40,8 +41,4 @@ public class ExplicitWriteHandleFactory<T, I, K, O>
       String fileIdPrefix, TaskContextSupplier taskContextSupplier) {
     return writeHandle;
   }
-
-  public HoodieWriteHandle<T, I, K, O> getWriteHandle() {
-    return writeHandle;
-  }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index f40fb84b21b0..030743477ff2 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -28,8 +28,7 @@ import org.apache.hudi.table.action.commit.BucketType;
 import org.apache.hudi.table.marker.WriteMarkers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.Iterator;
 import java.util.List;
@@ -44,11 +43,10 @@ import java.util.List;
  * <p>The back-up writer may rollover on condition(for e.g, the filesystem 
does not support append
  * or the file size hits the configured threshold).
  */
+@Slf4j
 public class FlinkAppendHandle<T, I, K, O>
     extends HoodieAppendHandle<T, I, K, O> implements MiniBatchHandle {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkAppendHandle.class);
-
   private boolean isClosed = false;
   private final WriteMarkers writeMarkers;
   private final BucketType bucketType;
@@ -119,7 +117,7 @@ public class FlinkAppendHandle<T, I, K, O>
     } catch (Throwable throwable) {
       // The intermediate log file can still append based on the incremental 
MERGE semantics,
       // there is no need to delete the file.
-      LOG.warn("Failed to close the APPEND handle", throwable);
+      log.warn("Failed to close the APPEND handle", throwable);
     }
   }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java
index 5979891490fc..f7249b88e9d1 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java
@@ -25,8 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.table.HoodieTable;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -38,9 +37,9 @@ import java.util.Iterator;
  * <P>The records iterator for super constructor is reset as empty thus the 
initialization for new records
  * does nothing. This handle keep the iterator for itself to override the 
write behavior.
  */
+@Slf4j
 public class FlinkConcatHandle<T, I, K, O>
     extends FlinkMergeHandle<T, I, K, O> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkConcatHandle.class);
 
   // a representation of incoming records that tolerates duplicate keys
   private final Iterator<HoodieRecord<T>> recordItr;
@@ -65,7 +64,7 @@ public class FlinkConcatHandle<T, I, K, O>
       String errMsg = String.format(
           "Failed to write old record into new file for key %s from old file 
%s to new file %s with writerSchema %s",
           key, getOldFilePath(), newFilePath, oldSchema.toString(true));
-      LOG.debug("Old record is {}", oldRecord);
+      log.debug("Old record is {}", oldRecord);
       throw new HoodieUpsertException(errMsg, e);
     }
     recordsWritten++;
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
index d277639af973..038c6f8664e6 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
@@ -31,8 +31,7 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.WriteMarkers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.List;
@@ -47,11 +46,10 @@ import java.util.List;
  *
  * @see FlinkIncrementalMergeHandle
  */
+@Slf4j
 public class FlinkCreateHandle<T, I, K, O>
     extends HoodieCreateHandle<T, I, K, O> implements MiniBatchHandle {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkCreateHandle.class);
-
   private boolean isClosed = false;
 
   public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
@@ -92,7 +90,7 @@ public class FlinkCreateHandle<T, I, K, O>
     final StoragePath path = makeNewFilePath(partitionPath, lastDataFileName);
     try {
       if (storage.exists(path)) {
-        LOG.info("Deleting invalid INSERT file due to task retry: " + 
lastDataFileName);
+        log.info("Deleting invalid INSERT file due to task retry: " + 
lastDataFileName);
         storage.deleteFile(path);
       }
     } catch (IOException e) {
@@ -118,7 +116,7 @@ public class FlinkCreateHandle<T, I, K, O>
       while (storage.exists(path)) {
         StoragePath existing = path;
         path = newFilePathWithRollover(rollNumber++);
-        LOG.warn("Duplicate write for INSERT bucket with path: {}. Will write 
to new path [{}] instead", existing, path);
+        log.warn("Duplicate write for INSERT bucket with path: {}. Will write 
to new path [{}] instead", existing, path);
       }
       return path;
     } catch (IOException e) {
@@ -157,13 +155,13 @@ public class FlinkCreateHandle<T, I, K, O>
     try {
       close();
     } catch (Throwable throwable) {
-      LOG.error("Failed to close the CREATE handle", throwable);
+      log.error("Failed to close the CREATE handle", throwable);
       try {
         storage.deleteFile(path);
-        LOG.info("Successfully deleted the intermediate CREATE data file: {}", 
path);
+        log.info("Successfully deleted the intermediate CREATE data file: {}", 
path);
       } catch (IOException e) {
         // logging a warning and ignore the exception.
-        LOG.warn("Failed to delete the intermediate CREATE data file: {}", 
path, e);
+        log.warn("Failed to delete the intermediate CREATE data file: {}", 
path, e);
       }
     }
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedIncrementalMergeHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedIncrementalMergeHandle.java
index a8e122c50f20..fa5feb8649b5 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedIncrementalMergeHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedIncrementalMergeHandle.java
@@ -28,8 +28,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -46,12 +45,11 @@ import java.util.List;
  * for the previous mini-batch ingestion will be rewritten as the last file 
path before committing,
  * which behaves like all the data are written into the last file.
  */
+@Slf4j
 public class FlinkFileGroupReaderBasedIncrementalMergeHandle<T, I, K, O>
     extends FlinkFileGroupReaderBasedMergeHandle<T, I, K, O>
     implements MiniBatchHandle {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkFileGroupReaderBasedIncrementalMergeHandle.class);
-
   public FlinkFileGroupReaderBasedIncrementalMergeHandle(HoodieWriteConfig 
config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
                                                          
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
                                                          TaskContextSupplier 
taskContextSupplier, StoragePath basePath) {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java
index 7ab517662b2a..fa68e055cec5 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java
@@ -30,8 +30,7 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.WriteMarkers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -46,12 +45,11 @@ import java.util.Iterator;
  * the file path when the data buffer writes finish. When next data buffer 
write starts,
  * {@link FlinkFileGroupReaderBasedIncrementalMergeHandle} will be used to 
write records into a rollover file.
  */
+@Slf4j
 public class FlinkFileGroupReaderBasedMergeHandle<T, I, K, O>
     extends FileGroupReaderBasedMergeHandle<T, I, K, O>
     implements MiniBatchHandle {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkFileGroupReaderBasedMergeHandle.class);
-
   public FlinkFileGroupReaderBasedMergeHandle(HoodieWriteConfig config, String 
instantTime, HoodieTable<T, I, K, O> hoodieTable,
                                               Iterator<HoodieRecord<T>> 
recordItr, String partitionPath, String fileId,
                                               TaskContextSupplier 
taskContextSupplier) {
@@ -104,7 +102,7 @@ public class FlinkFileGroupReaderBasedMergeHandle<T, I, K, 
O>
     }
     try {
       if (storage.exists(path)) {
-        LOG.info("Deleting invalid MERGE base file due to task retry: {}", 
lastDataFileName);
+        log.info("Deleting invalid MERGE base file due to task retry: {}", 
lastDataFileName);
         storage.deleteFile(path);
       }
     } catch (IOException e) {
@@ -133,13 +131,13 @@ public class FlinkFileGroupReaderBasedMergeHandle<T, I, 
K, O>
     try {
       close();
     } catch (Throwable throwable) {
-      LOG.error("Failed to close the MERGE handle", throwable);
+      log.error("Failed to close the MERGE handle", throwable);
       try {
         storage.deleteFile(newFilePath);
-        LOG.info("Successfully deleted the intermediate MERGE data file: {}", 
newFilePath);
+        log.info("Successfully deleted the intermediate MERGE data file: {}", 
newFilePath);
       } catch (IOException e) {
         // logging a warning and ignore the exception.
-        LOG.warn("Failed to delete the intermediate MERGE data file: {}", 
newFilePath, e);
+        log.warn("Failed to delete the intermediate MERGE data file: {}", 
newFilePath, e);
 
       }
     }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalConcatHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalConcatHandle.java
index 3b4fee36efef..c35416620d3c 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalConcatHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalConcatHandle.java
@@ -26,8 +26,7 @@ import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -39,9 +38,9 @@ import java.util.Iterator;
  * <P>The records iterator for super constructor is reset as empty thus the 
initialization for new records
  * does nothing. This handle keep the iterator for itself to override the 
write behavior.
  */
+@Slf4j
 public class FlinkIncrementalConcatHandle<T, I, K, O>
     extends FlinkIncrementalMergeHandle<T, I, K, O> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkIncrementalConcatHandle.class);
 
   // a representation of incoming records that tolerates duplicate keys
   private final Iterator<HoodieRecord<T>> recordItr;
@@ -66,7 +65,7 @@ public class FlinkIncrementalConcatHandle<T, I, K, O>
       String errMsg = String.format(
           "Failed to write old record into new file for key %s from old file 
%s to new file %s with writerSchema %s",
           key, getOldFilePath(), newFilePath, 
writeSchemaWithMetaFields.toString(true));
-      LOG.debug("Old record is {}", oldRecord);
+      log.debug("Old record is {}", oldRecord);
       throw new HoodieUpsertException(errMsg, e);
     }
     recordsWritten++;
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalMergeHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalMergeHandle.java
index 9f3680af0543..162e8ef4c957 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalMergeHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalMergeHandle.java
@@ -28,8 +28,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -46,12 +45,11 @@ import java.util.List;
  * for the previous mini-batch ingestion will be rewritten as the final file 
path before committing,
  * which behaves like all the data are written into the last file.
  */
+@Slf4j
 public class FlinkIncrementalMergeHandle<T, I, K, O>
     extends FlinkMergeHandle<T, I, K, O>
     implements MiniBatchHandle {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkIncrementalMergeHandle.class);
-
   public FlinkIncrementalMergeHandle(HoodieWriteConfig config, String 
instantTime, HoodieTable<T, I, K, O> hoodieTable,
                                      Iterator<HoodieRecord<T>> recordItr, 
String partitionPath, String fileId,
                                      TaskContextSupplier taskContextSupplier, 
StoragePath basePath) {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalMergeHandleWithChangeLog.java
index d4b2cb989c8c..8b6ea645905b 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalMergeHandleWithChangeLog.java
@@ -30,10 +30,9 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -45,11 +44,10 @@ import java.util.List;
  * <p>The cdc about logic is copied from {@link 
HoodieMergeHandleWithChangeLog},
  * we should refactor it out when there are good abstractions.
  */
+@Slf4j
 public class FlinkIncrementalMergeHandleWithChangeLog<T, I, K, O>
     extends FlinkIncrementalMergeHandle<T, I, K, O> {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkIncrementalMergeHandleWithChangeLog.class);
-
   private final HoodieCDCLogger cdcLogger;
 
   public FlinkIncrementalMergeHandleWithChangeLog(HoodieWriteConfig config, 
String instantTime, HoodieTable<T, I, K, O> hoodieTable,
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index e477a02527e5..3505a0aeea15 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -30,8 +30,7 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.WriteMarkers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -47,12 +46,11 @@ import java.util.Iterator;
  * the file path when the data buffer writes finish. When next data buffer 
write starts,
  * {@link FlinkIncrementalMergeHandle} will be used to write records into a 
rollover file.
  */
+@Slf4j
 public class FlinkMergeHandle<T, I, K, O>
     extends HoodieWriteMergeHandle<T, I, K, O>
     implements MiniBatchHandle {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkMergeHandle.class);
-
   public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
                           Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
                           TaskContextSupplier taskContextSupplier) {
@@ -98,7 +96,7 @@ public class FlinkMergeHandle<T, I, K, O>
     }
     try {
       if (storage.exists(path)) {
-        LOG.info("Deleting invalid MERGE base file due to task retry: {}", 
lastDataFileName);
+        log.info("Deleting invalid MERGE base file due to task retry: {}", 
lastDataFileName);
         storage.deleteFile(path);
       }
     } catch (IOException e) {
@@ -114,7 +112,7 @@ public class FlinkMergeHandle<T, I, K, O>
 
   @Override
   protected void initIncomingRecordsMap() {
-    LOG.info("Initialize on-heap keyToNewRecords for incoming records.");
+    log.info("Initialize on-heap keyToNewRecords for incoming records.");
     // the incoming records are already buffered on heap and the underlying 
bytes are managed by memory pool
     // in Flink write buffer, so there is no need to use ExternalSpillableMap.
     this.keyToNewRecords = new HashMap<>();
@@ -135,13 +133,13 @@ public class FlinkMergeHandle<T, I, K, O>
     try {
       close();
     } catch (Throwable throwable) {
-      LOG.error("Failed to close the MERGE handle", throwable);
+      log.error("Failed to close the MERGE handle", throwable);
       try {
         storage.deleteFile(newFilePath);
-        LOG.info("Successfully deleted the intermediate MERGE data file: {}", 
newFilePath);
+        log.info("Successfully deleted the intermediate MERGE data file: {}", 
newFilePath);
       } catch (IOException e) {
         // logging a warning and ignore the exception.
-        LOG.warn("Failed to delete the intermediate MERGE data file: {}", 
newFilePath, e);
+        log.warn("Failed to delete the intermediate MERGE data file: {}", 
newFilePath, e);
       }
     }
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index bc1af16cfc76..17f48e9e3131 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -29,10 +29,9 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -44,12 +43,11 @@ import java.util.List;
  * <p>The cdc about logic is copied from {@link 
HoodieMergeHandleWithChangeLog},
  * we should refactor it out when there are good abstractions.
  */
+@Slf4j
 public class FlinkMergeHandleWithChangeLog<T, I, K, O>
     extends FlinkMergeHandle<T, I, K, O> {
   private final HoodieCDCLogger cdcLogger;
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkMergeHandleWithChangeLog.class);
-
   public FlinkMergeHandleWithChangeLog(HoodieWriteConfig config, String 
instantTime, HoodieTable<T, I, K, O> hoodieTable,
                                        Iterator<HoodieRecord<T>> recordItr, 
String partitionPath, String fileId,
                                        TaskContextSupplier 
taskContextSupplier) {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index 170b6e46c34d..f54254baebfb 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -39,11 +39,10 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.WriteMarkers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -54,10 +53,10 @@ import static 
org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
 /**
  * Create handle with RowData for datasource implementation of bulk insert.
  */
+@Slf4j
 public class HoodieRowDataCreateHandle implements Serializable {
 
   private static final long serialVersionUID = 1L;
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRowDataCreateHandle.class);
   private static final AtomicLong SEQGEN = new AtomicLong(1);
 
   private final String instantTime;
@@ -115,7 +114,7 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
     } catch (IOException e) {
       throw new HoodieInsertException("Failed to initialize file writer for 
path " + path, e);
     }
-    LOG.info("New handle created for partition :" + partitionPath + " with 
fileId " + fileId);
+    log.info("New handle created for partition :" + partitionPath + " with 
fileId " + fileId);
   }
 
   /**
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index ec3fbc581d3c..ec8a6fe4dca4 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -20,6 +20,7 @@ package org.apache.hudi.io.storage.row.parquet;
 
 import org.apache.hudi.common.util.collection.Pair;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.ArrayType;
@@ -37,8 +38,6 @@ import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Types;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -50,8 +49,8 @@ import static 
org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
  *
  * <p>Reference org.apache.flink.formats.parquet.utils.ParquetSchemaConverter 
to support timestamp of INT64 8 bytes.
  */
+@Slf4j
 public class ParquetSchemaConverter {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ParquetSchemaConverter.class);
 
   static final String MAP_REPEATED_NAME = "key_value";
   static final String MAP_KEY_NAME = "key";
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogWriteHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogWriteHandle.java
index c7f00da98d3f..f7df0ff269cf 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogWriteHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogWriteHandle.java
@@ -42,8 +42,7 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.commit.BucketType;
 import org.apache.hudi.util.Lazy;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.HashMap;
 import java.util.HashSet;
@@ -68,11 +67,10 @@ import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_CO
  * <p>The back-up writer may roll over to a new log file if there already 
exists a log file for the
  * given file group and instant.
  */
+@Slf4j
 public class RowDataLogWriteHandle<T, I, K, O>
     extends FlinkAppendHandle<T, I, K, O> implements MiniBatchHandle {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(RowDataLogWriteHandle.class);
-
   public RowDataLogWriteHandle(
       HoodieWriteConfig config,
       String instantTime,
@@ -132,7 +130,7 @@ public class RowDataLogWriteHandle<T, I, K, O>
     }
     resetWriteCounts();
     assert stat.getRuntimeStats() != null;
-    LOG.info("WriteHandle for partitionPath {} filePath {}, took {} ms.",
+    log.info("WriteHandle for partitionPath {} filePath {}, took {} ms.",
         partitionPath, stat.getPath(), 
stat.getRuntimeStats().getTotalUpsertTime());
     timer.startTimer();
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index d02ec7b5f90a..e69a7fbbc622 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -39,8 +39,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.Collections;
 import java.util.List;
@@ -51,8 +50,8 @@ import static 
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGE
 /**
  * Flink hoodie backed table metadata writer.
  */
+@Slf4j
 public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetadataWriter<List<HoodieRecord>, List<WriteStatus>> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkHoodieBackedTableMetadataWriter.class);
 
   public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, 
HoodieWriteConfig writeConfig,
                                                  HoodieEngineContext context) {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index aee1da3a8134..a6cf00fb6f26 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -41,8 +41,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
-import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.HoodieCreateHandle;
+import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.HoodieMergeHandleFactory;
 import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.io.IOUtils;
@@ -69,8 +69,7 @@ import 
org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecuto
 import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -86,11 +85,10 @@ import java.util.Map;
  * <p>
  * UPDATES - Produce a new version of the file, just replacing the updated 
records with new values
  */
+@Slf4j
 public class HoodieFlinkCopyOnWriteTable<T>
     extends HoodieFlinkTable<T> {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieFlinkCopyOnWriteTable.class);
-
   public HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, 
HoodieEngineContext context, HoodieTableMetaClient metaClient) {
     super(config, context, metaClient);
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
index 7a4d9f90e5e9..b8363a9f052c 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
@@ -34,14 +34,13 @@ import org.apache.hudi.execution.FlinkLazyInsertIterable;
 import org.apache.hudi.io.ExplicitWriteHandleFactory;
 import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.HoodieWriteMergeHandle;
 import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.io.HoodieWriteMergeHandle;
 import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -63,11 +62,10 @@ import java.util.stream.Collectors;
  * <p>Computing the records batch locations all at a time is a pressure to the 
engine,
  * we should avoid that in streaming system.
  */
+@Slf4j
 public abstract class BaseFlinkCommitActionExecutor<T> extends
     BaseCommitActionExecutor<T, Iterator<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>, HoodieWriteMetadata> {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(BaseFlinkCommitActionExecutor.class);
-
   protected HoodieWriteHandle<?, ?, ?, ?> writeHandle;
 
   protected final BucketInfo bucketInfo;
@@ -182,7 +180,7 @@ public abstract class BaseFlinkCommitActionExecutor<T> 
extends
       }
     } catch (Throwable t) {
       String msg = "Error upserting bucketType " + bucketType + " for 
partition :" + partitionPath;
-      LOG.error(msg, t);
+      log.error(msg, t);
       throw new HoodieUpsertException(msg, t);
     }
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkPartitionTTLActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkPartitionTTLActionExecutor.java
index d8f3f79b1ec6..dc3b38c5d810 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkPartitionTTLActionExecutor.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkPartitionTTLActionExecutor.java
@@ -31,17 +31,15 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 import 
org.apache.hudi.table.action.ttl.strategy.HoodiePartitionTTLStrategyFactory;
 import org.apache.hudi.table.action.ttl.strategy.PartitionTTLStrategy;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
+@Slf4j
 public class FlinkPartitionTTLActionExecutor<T> extends 
BaseFlinkCommitActionExecutor<T> {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkPartitionTTLActionExecutor.class);
-
   public FlinkPartitionTTLActionExecutor(HoodieEngineContext context,
                                          HoodieWriteConfig config,
                                          HoodieTable table,
@@ -60,10 +58,10 @@ public class FlinkPartitionTTLActionExecutor<T> extends 
BaseFlinkCommitActionExe
       if (expiredPartitions.isEmpty()) {
         return emptyResult;
       }
-      LOG.info("Partition ttl find the following expired partitions to delete: 
 " + String.join(",", expiredPartitions));
+      log.info("Partition ttl find the following expired partitions to delete: 
 " + String.join(",", expiredPartitions));
       return new FlinkAutoCommitActionExecutor(new 
FlinkDeletePartitionCommitActionExecutor<>(context, config, table, instantTime, 
expiredPartitions)).execute();
     } catch (HoodieDeletePartitionPendingTableServiceException 
deletePartitionPendingTableServiceException) {
-      LOG.info("Partition is under table service, do nothing, call delete 
partition next time.");
+      log.info("Partition is under table service, do nothing, call delete 
partition next time.");
       return emptyResult;
     } catch (IOException e) {
       throw new HoodieIOException("Error executing hoodie partition ttl: ", e);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
index cdcdb5a2e4f4..1e48c6f00228 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
@@ -34,8 +34,7 @@ import org.apache.hudi.common.table.timeline.InstantGenerator;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.List;
@@ -47,10 +46,10 @@ import java.util.List;
  *
  * <p>Note: the compaction logic is invoked through the flink pipeline.
  */
+@Slf4j
 @SuppressWarnings("checkstyle:LineLength")
 public class HoodieFlinkMergeOnReadTableCompactor<T>
     extends HoodieCompactor<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieFlinkMergeOnReadTableCompactor.class);
 
   @Override
   public void preCompact(
@@ -74,7 +73,7 @@ public class HoodieFlinkMergeOnReadTableCompactor<T>
                                    HoodieReaderContext<?> readerContext,
                                    HoodieTable table) throws IOException {
     String maxInstantTime = getMaxInstantTime(table.getMetaClient());
-    LOG.info("Compact using file group reader based compaction, operation: 
{}.", operation);
+    log.info("Compact using file group reader based compaction, operation: 
{}.", operation);
     return compact(
         writeConfig,
         operation,
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
index 5f48a60763ae..1ba5ef32d4f7 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
@@ -39,6 +39,7 @@ import org.apache.hudi.util.RowDataUtils;
 import org.apache.hudi.util.RowProjection;
 import org.apache.hudi.util.SchemaEvolvingRowDataProjection;
 
+import lombok.Setter;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
@@ -61,6 +62,7 @@ public class FlinkRecordContext extends 
RecordContext<RowData> {
   // for DELETE cases, it'll not be initialized if primary key semantics is 
lost.
   // For e.g, if the pk fields are [a, b] but user only select a, then the pk
   // semantics is lost.
+  @Setter
   private RecordKeyToRowDataConverter recordKeyRowConverter;
   private OrderingValueEngineTypeConverter orderingValueConverter;
 
@@ -201,10 +203,6 @@ public class FlinkRecordContext extends 
RecordContext<RowData> {
     return rowProjection::project;
   }
 
-  public void setRecordKeyRowConverter(RecordKeyToRowDataConverter 
recordKeyRowConverter) {
-    this.recordKeyRowConverter = recordKeyRowConverter;
-  }
-
   public void initOrderingValueConverter(Schema dataSchema, List<String> 
orderingFieldNames) {
     this.orderingValueConverter = 
OrderingValueEngineTypeConverter.create(dataSchema, orderingFieldNames, 
utcTimezone);
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
index c56494a7cce0..bafb5c017856 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
@@ -27,17 +27,18 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
 
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
 /**
  * Flink upgrade and downgrade helper.
  */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class FlinkUpgradeDowngradeHelper implements SupportsUpgradeDowngrade {
 
   private static final FlinkUpgradeDowngradeHelper SINGLETON_INSTANCE =
       new FlinkUpgradeDowngradeHelper();
 
-  private FlinkUpgradeDowngradeHelper() {
-  }
-
   public static FlinkUpgradeDowngradeHelper getInstance() {
     return SINGLETON_INSTANCE;
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
index 72886782a162..f9107e6df53e 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.util;
 
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
@@ -297,6 +299,7 @@ public class AvroToRowDataConverters {
    * Encapsulates joda optional dependency. Instantiates this class only if 
joda is available on the
    * classpath.
    */
+  @NoArgsConstructor(access = AccessLevel.PRIVATE)
   static class JodaConverter {
 
     private static JodaConverter instance;
@@ -332,8 +335,5 @@ public class AvroToRowDataConverters {
       final DateTime value = (DateTime) object;
       return value.toDate().getTime();
     }
-
-    private JodaConverter() {
-    }
   }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
index a61c8ffc0e0c..ad020b41912d 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
@@ -18,6 +18,14 @@
 
 package org.apache.hudi.util;
 
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.Triple;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.util.AvroToRowDataConverters.AvroToRowDataConverter;
+import org.apache.hudi.util.RowDataToAvroConverters.RowDataToAvroConverter;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
 import org.apache.avro.Schema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
@@ -25,12 +33,6 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.common.util.collection.Triple;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.util.RowDataToAvroConverters.RowDataToAvroConverter;
-import org.apache.hudi.util.AvroToRowDataConverters.AvroToRowDataConverter;
-
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -86,26 +88,17 @@ public class RowDataAvroQueryContexts {
     });
   }
 
+  @AllArgsConstructor
   public static class RowDataQueryContext {
+    @Getter
     private final DataType rowType;
     private final Map<String, FieldQueryContext> contextMap;
     private final RowData.FieldGetter[] fieldGetters;
+    @Getter
     private final RowDataToAvroConverter rowDataToAvroConverter;
+    @Getter
     private final AvroToRowDataConverter avroToRowDataConverter;
 
-    private RowDataQueryContext(
-        DataType rowType,
-        Map<String, FieldQueryContext> contextMap,
-        RowData.FieldGetter[] fieldGetters,
-        RowDataToAvroConverter rowDataAvroConverter,
-        AvroToRowDataConverter avroToRowDataConverter) {
-      this.rowType = rowType;
-      this.contextMap = contextMap;
-      this.fieldGetters = fieldGetters;
-      this.rowDataToAvroConverter = rowDataAvroConverter;
-      this.avroToRowDataConverter = avroToRowDataConverter;
-    }
-
     public static RowDataQueryContext create(
         DataType rowType,
         Map<String, FieldQueryContext> contextMap,
@@ -122,22 +115,12 @@ public class RowDataAvroQueryContexts {
     public RowData.FieldGetter[] fieldGetters() {
       return fieldGetters;
     }
-
-    public RowDataToAvroConverter getRowDataToAvroConverter() {
-      return rowDataToAvroConverter;
-    }
-
-    public AvroToRowDataConverter getAvroToRowDataConverter() {
-      return avroToRowDataConverter;
-    }
-
-    public DataType getRowType() {
-      return this.rowType;
-    }
   }
 
   public static class FieldQueryContext {
+    @Getter
     private final LogicalType logicalType;
+    @Getter
     private final RowData.FieldGetter fieldGetter;
     private final Function<Object, Object> javaTypeConverter;
     private FieldQueryContext(LogicalType logicalType, RowData.FieldGetter 
fieldGetter, boolean utcTimezone) {
@@ -150,14 +133,6 @@ public class RowDataAvroQueryContexts {
       return new FieldQueryContext(logicalType, fieldGetter, utcTimezone);
     }
 
-    public LogicalType getLogicalType() {
-      return logicalType;
-    }
-
-    public RowData.FieldGetter getFieldGetter() {
-      return fieldGetter;
-    }
-
     public Object getValAsJava(RowData rowData) {
       return getValAsJava(rowData, true);
     }
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
index bc45999900a6..1405057019a8 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
@@ -38,11 +38,10 @@ import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -55,8 +54,8 @@ import java.util.stream.Collectors;
 /**
  * Flink hoodie writable table.
  */
+@Slf4j
 public class HoodieFlinkWriteableTestTable extends HoodieWriteableTestTable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieFlinkWriteableTestTable.class);
 
   private HoodieFlinkWriteableTestTable(String basePath, HoodieStorage storage,
                                         HoodieTableMetaClient metaClient, 
Schema schema,
@@ -151,7 +150,7 @@ public class HoodieFlinkWriteableTestTable extends 
HoodieWriteableTestTable {
           HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), 
r.getPartitionPath(), "");
           return (IndexedRecord) val;
         } catch (IOException e) {
-          LOG.warn("Failed to convert record " + r.toString(), e);
+          log.warn("Failed to convert record " + r.toString(), e);
           return null;
         }
       }).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()), 
header, HoodieRecord.RECORD_KEY_METADATA_FIELD));

Reply via email to