This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 63698543a78f refactor: Add lombok annotations to hudi-flink-client
module (#17534)
63698543a78f is described below
commit 63698543a78fb0c377e507728b48866e4e5a28fb
Author: voonhous <[email protected]>
AuthorDate: Wed Dec 10 10:15:10 2025 +0800
refactor: Add lombok annotations to hudi-flink-client module (#17534)
---
hudi-client/hudi-flink-client/pom.xml | 6 +++
.../hudi/client/HoodieFlinkTableServiceClient.java | 14 +++---
.../apache/hudi/client/HoodieFlinkWriteClient.java | 17 +++----
.../FlinkSizeBasedClusteringPlanStrategy.java | 5 +-
.../hudi/client/model/HoodieFlinkInternalRow.java | 6 +--
.../hudi/index/state/FlinkInMemoryStateIndex.java | 6 +--
.../apache/hudi/io/ExplicitWriteHandleFactory.java | 13 ++----
.../java/org/apache/hudi/io/FlinkAppendHandle.java | 8 ++--
.../java/org/apache/hudi/io/FlinkConcatHandle.java | 7 ++-
.../java/org/apache/hudi/io/FlinkCreateHandle.java | 16 +++----
...FileGroupReaderBasedIncrementalMergeHandle.java | 6 +--
.../io/FlinkFileGroupReaderBasedMergeHandle.java | 14 +++---
.../hudi/io/FlinkIncrementalConcatHandle.java | 7 ++-
.../hudi/io/FlinkIncrementalMergeHandle.java | 6 +--
.../FlinkIncrementalMergeHandleWithChangeLog.java | 6 +--
.../java/org/apache/hudi/io/FlinkMergeHandle.java | 16 +++----
.../hudi/io/FlinkMergeHandleWithChangeLog.java | 6 +--
.../io/storage/row/HoodieRowDataCreateHandle.java | 7 ++-
.../row/parquet/ParquetSchemaConverter.java | 5 +-
.../apache/hudi/io/v2/RowDataLogWriteHandle.java | 8 ++--
.../FlinkHoodieBackedTableMetadataWriter.java | 5 +-
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 8 ++--
.../commit/BaseFlinkCommitActionExecutor.java | 10 ++--
.../commit/FlinkPartitionTTLActionExecutor.java | 10 ++--
.../HoodieFlinkMergeOnReadTableCompactor.java | 7 ++-
.../hudi/table/format/FlinkRecordContext.java | 6 +--
.../table/upgrade/FlinkUpgradeDowngradeHelper.java | 7 +--
.../apache/hudi/util/AvroToRowDataConverters.java | 6 +--
.../apache/hudi/util/RowDataAvroQueryContexts.java | 53 ++++++----------------
.../testutils/HoodieFlinkWriteableTestTable.java | 7 ++-
30 files changed, 117 insertions(+), 181 deletions(-)
diff --git a/hudi-client/hudi-flink-client/pom.xml
b/hudi-client/hudi-flink-client/pom.xml
index 235a173ddaf3..494fcd9cbf01 100644
--- a/hudi-client/hudi-flink-client/pom.xml
+++ b/hudi-client/hudi-flink-client/pom.xml
@@ -126,6 +126,12 @@
<artifactId>kryo-shaded</artifactId>
</dependency>
+ <!-- Lombok -->
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+
<!-- Hoodie - Test -->
<dependency>
<groupId>org.apache.hudi</groupId>
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
index a0580175ea2f..74acada85bcc 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
@@ -48,17 +48,15 @@ import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.util.FlinkClientUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.text.ParseException;
import java.util.List;
import java.util.stream.Collectors;
+@Slf4j
public class HoodieFlinkTableServiceClient<T> extends
BaseHoodieTableServiceClient<List<HoodieRecord<T>>, List<WriteStatus>,
List<WriteStatus>> {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieFlinkTableServiceClient.class);
-
protected HoodieFlinkTableServiceClient(HoodieEngineContext context,
HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService>
timelineService) {
@@ -82,7 +80,7 @@ public class HoodieFlinkTableServiceClient<T> extends
BaseHoodieTableServiceClie
// Do not do any conflict resolution here as we do with regular writes.
We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata
table will result in conflicts since all of them updates the same partition.
writeTableMetadata(table, compactionCommitTime, metadata);
- LOG.info("Committing Compaction {} finished with result {}.",
compactionCommitTime, metadata);
+ log.info("Committing Compaction {} finished with result {}.",
compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
} finally {
this.txnManager.endStateChange(Option.of(compactionInstant));
@@ -100,7 +98,7 @@ public class HoodieFlinkTableServiceClient<T> extends
BaseHoodieTableServiceClie
+ config.getBasePath() + " at time " + compactionCommitTime, e);
}
}
- LOG.info("Compacted successfully on commit " + compactionCommitTime);
+ log.info("Compacted successfully on commit " + compactionCommitTime);
}
protected void completeClustering(
@@ -129,7 +127,7 @@ public class HoodieFlinkTableServiceClient<T> extends
BaseHoodieTableServiceClie
// Because more than one write to metadata table will result in
conflicts since all of them updates the same partition.
writeTableMetadata(table, clusteringCommitTime, metadata);
- LOG.info("Committing Clustering {} finished with result {}.",
clusteringCommitTime, metadata);
+ log.info("Committing Clustering {} finished with result {}.",
clusteringCommitTime, metadata);
ClusteringUtils.transitionClusteringOrReplaceInflightToComplete(
false,
clusteringInstant,
@@ -155,7 +153,7 @@ public class HoodieFlinkTableServiceClient<T> extends
BaseHoodieTableServiceClie
+ config.getBasePath() + " at time " + clusteringCommitTime, e);
}
}
- LOG.info("Clustering successfully on commit " + clusteringCommitTime);
+ log.info("Clustering successfully on commit " + clusteringCommitTime);
}
@Override
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index d380b934de27..82f15e8832cd 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -50,9 +50,10 @@ import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
import com.codahale.metrics.Timer;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Comparator;
@@ -71,13 +72,12 @@ import java.util.stream.Collectors;
*
* @param <T> type of the payload
*/
+@Slf4j
@SuppressWarnings("checkstyle:LineLength")
public class HoodieFlinkWriteClient<T>
extends BaseHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>>
implements FlinkRowDataWriteClient<T> {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieFlinkWriteClient.class);
-
/**
* FileID to write handle mapping in order to record the write handles for
each file group,
* so that we can append the mini-batch data buffer incrementally.
@@ -341,9 +341,9 @@ public class HoodieFlinkWriteClient<T>
*/
public void waitForCleaningFinish() {
if (tableServiceClient.asyncCleanerService != null) {
- LOG.info("Cleaner has been spawned already. Waiting for it to finish");
+ log.info("Cleaner has been spawned already. Waiting for it to finish");
tableServiceClient.asyncClean();
- LOG.info("Cleaner has finished");
+ log.info("Cleaner has finished");
}
}
@@ -495,6 +495,7 @@ public class HoodieFlinkWriteClient<T>
}
private final class AutoCloseableWriteHandle implements AutoCloseable {
+ @Getter(AccessLevel.PACKAGE)
private final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
AutoCloseableWriteHandle(
@@ -514,10 +515,6 @@ public class HoodieFlinkWriteClient<T>
this.writeHandle = getOrCreateWriteHandle(bucketInfo, getConfig(),
instantTime, table, recordIterator, overwrite);
}
- HoodieWriteHandle<?, ?, ?, ?> getWriteHandle() {
- return writeHandle;
- }
-
@Override
public void close() {
((MiniBatchHandle) writeHandle).closeGracefully();
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
index a6684339b773..d5f82532c649 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategy.java
@@ -31,8 +31,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.List;
@@ -46,9 +45,9 @@ import static
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_C
* 1) Creates clustering groups based on max size allowed per group.
* 2) Excludes files that are greater than 'small.file.limit' from clustering
plan.
*/
+@Slf4j
public class FlinkSizeBasedClusteringPlanStrategy<T>
extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> {
- private static final Logger LOG =
LoggerFactory.getLogger(FlinkSizeBasedClusteringPlanStrategy.class);
public FlinkSizeBasedClusteringPlanStrategy(HoodieTable table,
HoodieEngineContext
engineContext,
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkInternalRow.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkInternalRow.java
index 608301e4d901..c54e41eb3b21 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkInternalRow.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkInternalRow.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client.model;
import org.apache.hudi.common.model.HoodieOperation;
+import lombok.Getter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
@@ -55,6 +56,7 @@ public class HoodieFlinkInternalRow implements Serializable {
// there is no rowData for index record
private final BooleanValue isIndexRecord;
+ @Getter
private final RowData rowData;
public HoodieFlinkInternalRow(String recordKey, String partitionPath, String
operationType, RowData rowData) {
@@ -118,10 +120,6 @@ public class HoodieFlinkInternalRow implements
Serializable {
return isIndexRecord.getValue();
}
- public RowData getRowData() {
- return rowData;
- }
-
public HoodieFlinkInternalRow copy(RowDataSerializer rowDataSerializer) {
return new HoodieFlinkInternalRow(
this.recordKey.toString(),
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
index 693286df3a66..1d38fb7b7b18 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/state/FlinkInMemoryStateIndex.java
@@ -28,18 +28,16 @@ import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* Hoodie index implementation backed by flink state.
*/
+@Slf4j
public class FlinkInMemoryStateIndex extends HoodieIndex<List<HoodieRecord>,
List<WriteStatus>> {
- private static final Logger LOG =
LoggerFactory.getLogger(FlinkInMemoryStateIndex.class);
-
public FlinkInMemoryStateIndex(HoodieFlinkEngineContext context,
HoodieWriteConfig config) {
super(config);
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java
index c040603db4c4..68cf4af675af 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java
@@ -22,17 +22,18 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
/**
* Create handle factory for Flink writer, use the specified write handle
directly.
*/
+@AllArgsConstructor
+@Getter
public class ExplicitWriteHandleFactory<T, I, K, O>
extends WriteHandleFactory<T, I, K, O> {
private final HoodieWriteHandle<T, I, K, O> writeHandle;
- public ExplicitWriteHandleFactory(HoodieWriteHandle<T, I, K, O> writeHandle)
{
- this.writeHandle = writeHandle;
- }
-
@Override
public HoodieWriteHandle<T, I, K, O> create(
HoodieWriteConfig hoodieConfig, String commitTime,
@@ -40,8 +41,4 @@ public class ExplicitWriteHandleFactory<T, I, K, O>
String fileIdPrefix, TaskContextSupplier taskContextSupplier) {
return writeHandle;
}
-
- public HoodieWriteHandle<T, I, K, O> getWriteHandle() {
- return writeHandle;
- }
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index f40fb84b21b0..030743477ff2 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -28,8 +28,7 @@ import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.util.Iterator;
import java.util.List;
@@ -44,11 +43,10 @@ import java.util.List;
* <p>The back-up writer may rollover on condition(for e.g, the filesystem
does not support append
* or the file size hits the configured threshold).
*/
+@Slf4j
public class FlinkAppendHandle<T, I, K, O>
extends HoodieAppendHandle<T, I, K, O> implements MiniBatchHandle {
- private static final Logger LOG =
LoggerFactory.getLogger(FlinkAppendHandle.class);
-
private boolean isClosed = false;
private final WriteMarkers writeMarkers;
private final BucketType bucketType;
@@ -119,7 +117,7 @@ public class FlinkAppendHandle<T, I, K, O>
} catch (Throwable throwable) {
// The intermediate log file can still append based on the incremental
MERGE semantics,
// there is no need to delete the file.
- LOG.warn("Failed to close the APPEND handle", throwable);
+ log.warn("Failed to close the APPEND handle", throwable);
}
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java
index 5979891490fc..f7249b88e9d1 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkConcatHandle.java
@@ -25,8 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Collections;
@@ -38,9 +37,9 @@ import java.util.Iterator;
* <P>The records iterator for super constructor is reset as empty thus the
initialization for new records
* does nothing. This handle keep the iterator for itself to override the
write behavior.
*/
+@Slf4j
public class FlinkConcatHandle<T, I, K, O>
extends FlinkMergeHandle<T, I, K, O> {
- private static final Logger LOG =
LoggerFactory.getLogger(FlinkConcatHandle.class);
// a representation of incoming records that tolerates duplicate keys
private final Iterator<HoodieRecord<T>> recordItr;
@@ -65,7 +64,7 @@ public class FlinkConcatHandle<T, I, K, O>
String errMsg = String.format(
"Failed to write old record into new file for key %s from old file
%s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath, oldSchema.toString(true));
- LOG.debug("Old record is {}", oldRecord);
+ log.debug("Old record is {}", oldRecord);
throw new HoodieUpsertException(errMsg, e);
}
recordsWritten++;
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
index d277639af973..038c6f8664e6 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
@@ -31,8 +31,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.List;
@@ -47,11 +46,10 @@ import java.util.List;
*
* @see FlinkIncrementalMergeHandle
*/
+@Slf4j
public class FlinkCreateHandle<T, I, K, O>
extends HoodieCreateHandle<T, I, K, O> implements MiniBatchHandle {
- private static final Logger LOG =
LoggerFactory.getLogger(FlinkCreateHandle.class);
-
private boolean isClosed = false;
public FlinkCreateHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
@@ -92,7 +90,7 @@ public class FlinkCreateHandle<T, I, K, O>
final StoragePath path = makeNewFilePath(partitionPath, lastDataFileName);
try {
if (storage.exists(path)) {
- LOG.info("Deleting invalid INSERT file due to task retry: " +
lastDataFileName);
+ log.info("Deleting invalid INSERT file due to task retry: " +
lastDataFileName);
storage.deleteFile(path);
}
} catch (IOException e) {
@@ -118,7 +116,7 @@ public class FlinkCreateHandle<T, I, K, O>
while (storage.exists(path)) {
StoragePath existing = path;
path = newFilePathWithRollover(rollNumber++);
- LOG.warn("Duplicate write for INSERT bucket with path: {}. Will write
to new path [{}] instead", existing, path);
+ log.warn("Duplicate write for INSERT bucket with path: {}. Will write
to new path [{}] instead", existing, path);
}
return path;
} catch (IOException e) {
@@ -157,13 +155,13 @@ public class FlinkCreateHandle<T, I, K, O>
try {
close();
} catch (Throwable throwable) {
- LOG.error("Failed to close the CREATE handle", throwable);
+ log.error("Failed to close the CREATE handle", throwable);
try {
storage.deleteFile(path);
- LOG.info("Successfully deleted the intermediate CREATE data file: {}",
path);
+ log.info("Successfully deleted the intermediate CREATE data file: {}",
path);
} catch (IOException e) {
// logging a warning and ignore the exception.
- LOG.warn("Failed to delete the intermediate CREATE data file: {}",
path, e);
+ log.warn("Failed to delete the intermediate CREATE data file: {}",
path, e);
}
}
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedIncrementalMergeHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedIncrementalMergeHandle.java
index a8e122c50f20..fa5feb8649b5 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedIncrementalMergeHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedIncrementalMergeHandle.java
@@ -28,8 +28,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Iterator;
@@ -46,12 +45,11 @@ import java.util.List;
* for the previous mini-batch ingestion will be rewritten as the last file
path before committing,
* which behaves like all the data are written into the last file.
*/
+@Slf4j
public class FlinkFileGroupReaderBasedIncrementalMergeHandle<T, I, K, O>
extends FlinkFileGroupReaderBasedMergeHandle<T, I, K, O>
implements MiniBatchHandle {
- private static final Logger LOG =
LoggerFactory.getLogger(FlinkFileGroupReaderBasedIncrementalMergeHandle.class);
-
public FlinkFileGroupReaderBasedIncrementalMergeHandle(HoodieWriteConfig
config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier
taskContextSupplier, StoragePath basePath) {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java
index 7ab517662b2a..fa68e055cec5 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkFileGroupReaderBasedMergeHandle.java
@@ -30,8 +30,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Iterator;
@@ -46,12 +45,11 @@ import java.util.Iterator;
* the file path when the data buffer writes finish. When next data buffer
write starts,
* {@link FlinkFileGroupReaderBasedIncrementalMergeHandle} will be used to
write records into a rollover file.
*/
+@Slf4j
public class FlinkFileGroupReaderBasedMergeHandle<T, I, K, O>
extends FileGroupReaderBasedMergeHandle<T, I, K, O>
implements MiniBatchHandle {
- private static final Logger LOG =
LoggerFactory.getLogger(FlinkFileGroupReaderBasedMergeHandle.class);
-
public FlinkFileGroupReaderBasedMergeHandle(HoodieWriteConfig config, String
instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>>
recordItr, String partitionPath, String fileId,
TaskContextSupplier
taskContextSupplier) {
@@ -104,7 +102,7 @@ public class FlinkFileGroupReaderBasedMergeHandle<T, I, K,
O>
}
try {
if (storage.exists(path)) {
- LOG.info("Deleting invalid MERGE base file due to task retry: {}",
lastDataFileName);
+ log.info("Deleting invalid MERGE base file due to task retry: {}",
lastDataFileName);
storage.deleteFile(path);
}
} catch (IOException e) {
@@ -133,13 +131,13 @@ public class FlinkFileGroupReaderBasedMergeHandle<T, I,
K, O>
try {
close();
} catch (Throwable throwable) {
- LOG.error("Failed to close the MERGE handle", throwable);
+ log.error("Failed to close the MERGE handle", throwable);
try {
storage.deleteFile(newFilePath);
- LOG.info("Successfully deleted the intermediate MERGE data file: {}",
newFilePath);
+ log.info("Successfully deleted the intermediate MERGE data file: {}",
newFilePath);
} catch (IOException e) {
// logging a warning and ignore the exception.
- LOG.warn("Failed to delete the intermediate MERGE data file: {}",
newFilePath, e);
+ log.warn("Failed to delete the intermediate MERGE data file: {}",
newFilePath, e);
}
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalConcatHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalConcatHandle.java
index 3b4fee36efef..c35416620d3c 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalConcatHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalConcatHandle.java
@@ -26,8 +26,7 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Collections;
@@ -39,9 +38,9 @@ import java.util.Iterator;
* <P>The records iterator for super constructor is reset as empty thus the
initialization for new records
* does nothing. This handle keep the iterator for itself to override the
write behavior.
*/
+@Slf4j
public class FlinkIncrementalConcatHandle<T, I, K, O>
extends FlinkIncrementalMergeHandle<T, I, K, O> {
- private static final Logger LOG =
LoggerFactory.getLogger(FlinkIncrementalConcatHandle.class);
// a representation of incoming records that tolerates duplicate keys
private final Iterator<HoodieRecord<T>> recordItr;
@@ -66,7 +65,7 @@ public class FlinkIncrementalConcatHandle<T, I, K, O>
String errMsg = String.format(
"Failed to write old record into new file for key %s from old file
%s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath,
writeSchemaWithMetaFields.toString(true));
- LOG.debug("Old record is {}", oldRecord);
+ log.debug("Old record is {}", oldRecord);
throw new HoodieUpsertException(errMsg, e);
}
recordsWritten++;
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalMergeHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalMergeHandle.java
index 9f3680af0543..162e8ef4c957 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalMergeHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalMergeHandle.java
@@ -28,8 +28,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Iterator;
@@ -46,12 +45,11 @@ import java.util.List;
* for the previous mini-batch ingestion will be rewritten as the final file
path before committing,
* which behaves like all the data are written into the last file.
*/
+@Slf4j
public class FlinkIncrementalMergeHandle<T, I, K, O>
extends FlinkMergeHandle<T, I, K, O>
implements MiniBatchHandle {
- private static final Logger LOG =
LoggerFactory.getLogger(FlinkIncrementalMergeHandle.class);
-
public FlinkIncrementalMergeHandle(HoodieWriteConfig config, String
instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr,
String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier,
StoragePath basePath) {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalMergeHandleWithChangeLog.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalMergeHandleWithChangeLog.java
index d4b2cb989c8c..8b6ea645905b 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalMergeHandleWithChangeLog.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkIncrementalMergeHandleWithChangeLog.java
@@ -30,10 +30,9 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
@@ -45,11 +44,10 @@ import java.util.List;
* <p>The cdc about logic is copied from {@link
HoodieMergeHandleWithChangeLog},
* we should refactor it out when there are good abstractions.
*/
+@Slf4j
public class FlinkIncrementalMergeHandleWithChangeLog<T, I, K, O>
extends FlinkIncrementalMergeHandle<T, I, K, O> {
- private static final Logger LOG =
LoggerFactory.getLogger(FlinkIncrementalMergeHandleWithChangeLog.class);
-
private final HoodieCDCLogger cdcLogger;
public FlinkIncrementalMergeHandleWithChangeLog(HoodieWriteConfig config,
String instantTime, HoodieTable<T, I, K, O> hoodieTable,
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index e477a02527e5..3505a0aeea15 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -30,8 +30,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.HashMap;
@@ -47,12 +46,11 @@ import java.util.Iterator;
* the file path when the data buffer writes finish. When next data buffer
write starts,
* {@link FlinkIncrementalMergeHandle} will be used to write records into a
rollover file.
*/
+@Slf4j
public class FlinkMergeHandle<T, I, K, O>
extends HoodieWriteMergeHandle<T, I, K, O>
implements MiniBatchHandle {
- private static final Logger LOG =
LoggerFactory.getLogger(FlinkMergeHandle.class);
-
public FlinkMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String
partitionPath, String fileId,
TaskContextSupplier taskContextSupplier) {
@@ -98,7 +96,7 @@ public class FlinkMergeHandle<T, I, K, O>
}
try {
if (storage.exists(path)) {
- LOG.info("Deleting invalid MERGE base file due to task retry: {}",
lastDataFileName);
+ log.info("Deleting invalid MERGE base file due to task retry: {}",
lastDataFileName);
storage.deleteFile(path);
}
} catch (IOException e) {
@@ -114,7 +112,7 @@ public class FlinkMergeHandle<T, I, K, O>
@Override
protected void initIncomingRecordsMap() {
- LOG.info("Initialize on-heap keyToNewRecords for incoming records.");
+ log.info("Initialize on-heap keyToNewRecords for incoming records.");
// the incoming records are already buffered on heap and the underlying
bytes are managed by memory pool
// in Flink write buffer, so there is no need to use ExternalSpillableMap.
this.keyToNewRecords = new HashMap<>();
@@ -135,13 +133,13 @@ public class FlinkMergeHandle<T, I, K, O>
try {
close();
} catch (Throwable throwable) {
- LOG.error("Failed to close the MERGE handle", throwable);
+ log.error("Failed to close the MERGE handle", throwable);
try {
storage.deleteFile(newFilePath);
- LOG.info("Successfully deleted the intermediate MERGE data file: {}",
newFilePath);
+ log.info("Successfully deleted the intermediate MERGE data file: {}",
newFilePath);
} catch (IOException e) {
// logging a warning and ignore the exception.
- LOG.warn("Failed to delete the intermediate MERGE data file: {}",
newFilePath, e);
+ log.warn("Failed to delete the intermediate MERGE data file: {}",
newFilePath, e);
}
}
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index bc1af16cfc76..17f48e9e3131 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -29,10 +29,9 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
@@ -44,12 +43,11 @@ import java.util.List;
* <p>The cdc about logic is copied from {@link
HoodieMergeHandleWithChangeLog},
* we should refactor it out when there are good abstractions.
*/
+@Slf4j
public class FlinkMergeHandleWithChangeLog<T, I, K, O>
extends FlinkMergeHandle<T, I, K, O> {
private final HoodieCDCLogger cdcLogger;
- private static final Logger LOG =
LoggerFactory.getLogger(FlinkMergeHandleWithChangeLog.class);
-
public FlinkMergeHandleWithChangeLog(HoodieWriteConfig config, String
instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr,
String partitionPath, String fileId,
TaskContextSupplier
taskContextSupplier) {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index 170b6e46c34d..f54254baebfb 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -39,11 +39,10 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -54,10 +53,10 @@ import static
org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
/**
* Create handle with RowData for datasource implementation of bulk insert.
*/
+@Slf4j
public class HoodieRowDataCreateHandle implements Serializable {
private static final long serialVersionUID = 1L;
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieRowDataCreateHandle.class);
private static final AtomicLong SEQGEN = new AtomicLong(1);
private final String instantTime;
@@ -115,7 +114,7 @@ public class HoodieRowDataCreateHandle implements
Serializable {
} catch (IOException e) {
throw new HoodieInsertException("Failed to initialize file writer for
path " + path, e);
}
- LOG.info("New handle created for partition :" + partitionPath + " with
fileId " + fileId);
+ log.info("New handle created for partition :" + partitionPath + " with
fileId " + fileId);
}
/**
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index ec3fbc581d3c..ec8a6fe4dca4 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -20,6 +20,7 @@ package org.apache.hudi.io.storage.row.parquet;
import org.apache.hudi.common.util.collection.Pair;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
@@ -37,8 +38,6 @@ import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.stream.Collectors;
@@ -50,8 +49,8 @@ import static
org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
*
* <p>Reference org.apache.flink.formats.parquet.utils.ParquetSchemaConverter
to support timestamp of INT64 8 bytes.
*/
+@Slf4j
public class ParquetSchemaConverter {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ParquetSchemaConverter.class);
static final String MAP_REPEATED_NAME = "key_value";
static final String MAP_KEY_NAME = "key";
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogWriteHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogWriteHandle.java
index c7f00da98d3f..f7df0ff269cf 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogWriteHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogWriteHandle.java
@@ -42,8 +42,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.util.Lazy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.HashSet;
@@ -68,11 +67,10 @@ import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_CO
* <p>The back-up writer may roll over to a new log file if there already
exists a log file for the
* given file group and instant.
*/
+@Slf4j
public class RowDataLogWriteHandle<T, I, K, O>
extends FlinkAppendHandle<T, I, K, O> implements MiniBatchHandle {
- private static final Logger LOG =
LoggerFactory.getLogger(RowDataLogWriteHandle.class);
-
public RowDataLogWriteHandle(
HoodieWriteConfig config,
String instantTime,
@@ -132,7 +130,7 @@ public class RowDataLogWriteHandle<T, I, K, O>
}
resetWriteCounts();
assert stat.getRuntimeStats() != null;
- LOG.info("WriteHandle for partitionPath {} filePath {}, took {} ms.",
+ log.info("WriteHandle for partitionPath {} filePath {}, took {} ms.",
partitionPath, stat.getPath(),
stat.getRuntimeStats().getTotalUpsertTime());
timer.startTimer();
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index d02ec7b5f90a..e69a7fbbc622 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -39,8 +39,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.table.BulkInsertPartitioner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.List;
@@ -51,8 +50,8 @@ import static
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGE
/**
* Flink hoodie backed table metadata writer.
*/
+@Slf4j
public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetadataWriter<List<HoodieRecord>, List<WriteStatus>> {
- private static final Logger LOG =
LoggerFactory.getLogger(FlinkHoodieBackedTableMetadataWriter.class);
public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
HoodieWriteConfig writeConfig,
HoodieEngineContext context) {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index aee1da3a8134..a6cf00fb6f26 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -41,8 +41,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
-import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieCreateHandle;
+import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieMergeHandleFactory;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.IOUtils;
@@ -69,8 +69,7 @@ import
org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecuto
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Collections;
@@ -86,11 +85,10 @@ import java.util.Map;
* <p>
* UPDATES - Produce a new version of the file, just replacing the updated
records with new values
*/
+@Slf4j
public class HoodieFlinkCopyOnWriteTable<T>
extends HoodieFlinkTable<T> {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieFlinkCopyOnWriteTable.class);
-
public HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config,
HoodieEngineContext context, HoodieTableMetaClient metaClient) {
super(config, context, metaClient);
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
index 7a4d9f90e5e9..b8363a9f052c 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
@@ -34,14 +34,13 @@ import org.apache.hudi.execution.FlinkLazyInsertIterable;
import org.apache.hudi.io.ExplicitWriteHandleFactory;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.HoodieWriteMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.io.HoodieWriteMergeHandle;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.time.Duration;
@@ -63,11 +62,10 @@ import java.util.stream.Collectors;
* <p>Computing the records batch locations all at a time is a pressure to the
engine,
* we should avoid that in streaming system.
*/
+@Slf4j
public abstract class BaseFlinkCommitActionExecutor<T> extends
BaseCommitActionExecutor<T, Iterator<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>, HoodieWriteMetadata> {
- private static final Logger LOG =
LoggerFactory.getLogger(BaseFlinkCommitActionExecutor.class);
-
protected HoodieWriteHandle<?, ?, ?, ?> writeHandle;
protected final BucketInfo bucketInfo;
@@ -182,7 +180,7 @@ public abstract class BaseFlinkCommitActionExecutor<T>
extends
}
} catch (Throwable t) {
String msg = "Error upserting bucketType " + bucketType + " for
partition :" + partitionPath;
- LOG.error(msg, t);
+ log.error(msg, t);
throw new HoodieUpsertException(msg, t);
}
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkPartitionTTLActionExecutor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkPartitionTTLActionExecutor.java
index d8f3f79b1ec6..dc3b38c5d810 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkPartitionTTLActionExecutor.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkPartitionTTLActionExecutor.java
@@ -31,17 +31,15 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import
org.apache.hudi.table.action.ttl.strategy.HoodiePartitionTTLStrategyFactory;
import org.apache.hudi.table.action.ttl.strategy.PartitionTTLStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+@Slf4j
public class FlinkPartitionTTLActionExecutor<T> extends
BaseFlinkCommitActionExecutor<T> {
- private static final Logger LOG =
LoggerFactory.getLogger(FlinkPartitionTTLActionExecutor.class);
-
public FlinkPartitionTTLActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable table,
@@ -60,10 +58,10 @@ public class FlinkPartitionTTLActionExecutor<T> extends
BaseFlinkCommitActionExe
if (expiredPartitions.isEmpty()) {
return emptyResult;
}
- LOG.info("Partition ttl find the following expired partitions to delete:
" + String.join(",", expiredPartitions));
+ log.info("Partition ttl find the following expired partitions to delete:
" + String.join(",", expiredPartitions));
return new FlinkAutoCommitActionExecutor(new
FlinkDeletePartitionCommitActionExecutor<>(context, config, table, instantTime,
expiredPartitions)).execute();
} catch (HoodieDeletePartitionPendingTableServiceException
deletePartitionPendingTableServiceException) {
- LOG.info("Partition is under table service, do nothing, call delete
partition next time.");
+ log.info("Partition is under table service, do nothing, call delete
partition next time.");
return emptyResult;
} catch (IOException e) {
throw new HoodieIOException("Error executing hoodie partition ttl: ", e);
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
index cdcdb5a2e4f4..1e48c6f00228 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
@@ -34,8 +34,7 @@ import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.List;
@@ -47,10 +46,10 @@ import java.util.List;
*
* <p>Note: the compaction logic is invoked through the flink pipeline.
*/
+@Slf4j
@SuppressWarnings("checkstyle:LineLength")
public class HoodieFlinkMergeOnReadTableCompactor<T>
extends HoodieCompactor<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieFlinkMergeOnReadTableCompactor.class);
@Override
public void preCompact(
@@ -74,7 +73,7 @@ public class HoodieFlinkMergeOnReadTableCompactor<T>
HoodieReaderContext<?> readerContext,
HoodieTable table) throws IOException {
String maxInstantTime = getMaxInstantTime(table.getMetaClient());
- LOG.info("Compact using file group reader based compaction, operation:
{}.", operation);
+ log.info("Compact using file group reader based compaction, operation:
{}.", operation);
return compact(
writeConfig,
operation,
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
index 5f48a60763ae..1ba5ef32d4f7 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
@@ -39,6 +39,7 @@ import org.apache.hudi.util.RowDataUtils;
import org.apache.hudi.util.RowProjection;
import org.apache.hudi.util.SchemaEvolvingRowDataProjection;
+import lombok.Setter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -61,6 +62,7 @@ public class FlinkRecordContext extends
RecordContext<RowData> {
// for DELETE cases, it'll not be initialized if primary key semantics is
lost.
// For e.g, if the pk fields are [a, b] but user only select a, then the pk
// semantics is lost.
+ @Setter
private RecordKeyToRowDataConverter recordKeyRowConverter;
private OrderingValueEngineTypeConverter orderingValueConverter;
@@ -201,10 +203,6 @@ public class FlinkRecordContext extends
RecordContext<RowData> {
return rowProjection::project;
}
- public void setRecordKeyRowConverter(RecordKeyToRowDataConverter
recordKeyRowConverter) {
- this.recordKeyRowConverter = recordKeyRowConverter;
- }
-
public void initOrderingValueConverter(Schema dataSchema, List<String>
orderingFieldNames) {
this.orderingValueConverter =
OrderingValueEngineTypeConverter.create(dataSchema, orderingFieldNames,
utcTimezone);
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
index c56494a7cce0..bafb5c017856 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
@@ -27,17 +27,18 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
/**
* Flink upgrade and downgrade helper.
*/
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class FlinkUpgradeDowngradeHelper implements SupportsUpgradeDowngrade {
private static final FlinkUpgradeDowngradeHelper SINGLETON_INSTANCE =
new FlinkUpgradeDowngradeHelper();
- private FlinkUpgradeDowngradeHelper() {
- }
-
public static FlinkUpgradeDowngradeHelper getInstance() {
return SINGLETON_INSTANCE;
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
index 72886782a162..f9107e6df53e 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
@@ -18,6 +18,8 @@
package org.apache.hudi.util;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -297,6 +299,7 @@ public class AvroToRowDataConverters {
* Encapsulates joda optional dependency. Instantiates this class only if
joda is available on the
* classpath.
*/
+ @NoArgsConstructor(access = AccessLevel.PRIVATE)
static class JodaConverter {
private static JodaConverter instance;
@@ -332,8 +335,5 @@ public class AvroToRowDataConverters {
final DateTime value = (DateTime) object;
return value.toDate().getTime();
}
-
- private JodaConverter() {
- }
}
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
index a61c8ffc0e0c..ad020b41912d 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
@@ -18,6 +18,14 @@
package org.apache.hudi.util;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.Triple;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.util.AvroToRowDataConverters.AvroToRowDataConverter;
+import org.apache.hudi.util.RowDataToAvroConverters.RowDataToAvroConverter;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
import org.apache.avro.Schema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
@@ -25,12 +33,6 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.common.util.collection.Triple;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.util.RowDataToAvroConverters.RowDataToAvroConverter;
-import org.apache.hudi.util.AvroToRowDataConverters.AvroToRowDataConverter;
-
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -86,26 +88,17 @@ public class RowDataAvroQueryContexts {
});
}
+ @AllArgsConstructor
public static class RowDataQueryContext {
+ @Getter
private final DataType rowType;
private final Map<String, FieldQueryContext> contextMap;
private final RowData.FieldGetter[] fieldGetters;
+ @Getter
private final RowDataToAvroConverter rowDataToAvroConverter;
+ @Getter
private final AvroToRowDataConverter avroToRowDataConverter;
- private RowDataQueryContext(
- DataType rowType,
- Map<String, FieldQueryContext> contextMap,
- RowData.FieldGetter[] fieldGetters,
- RowDataToAvroConverter rowDataAvroConverter,
- AvroToRowDataConverter avroToRowDataConverter) {
- this.rowType = rowType;
- this.contextMap = contextMap;
- this.fieldGetters = fieldGetters;
- this.rowDataToAvroConverter = rowDataAvroConverter;
- this.avroToRowDataConverter = avroToRowDataConverter;
- }
-
public static RowDataQueryContext create(
DataType rowType,
Map<String, FieldQueryContext> contextMap,
@@ -122,22 +115,12 @@ public class RowDataAvroQueryContexts {
public RowData.FieldGetter[] fieldGetters() {
return fieldGetters;
}
-
- public RowDataToAvroConverter getRowDataToAvroConverter() {
- return rowDataToAvroConverter;
- }
-
- public AvroToRowDataConverter getAvroToRowDataConverter() {
- return avroToRowDataConverter;
- }
-
- public DataType getRowType() {
- return this.rowType;
- }
}
public static class FieldQueryContext {
+ @Getter
private final LogicalType logicalType;
+ @Getter
private final RowData.FieldGetter fieldGetter;
private final Function<Object, Object> javaTypeConverter;
private FieldQueryContext(LogicalType logicalType, RowData.FieldGetter
fieldGetter, boolean utcTimezone) {
@@ -150,14 +133,6 @@ public class RowDataAvroQueryContexts {
return new FieldQueryContext(logicalType, fieldGetter, utcTimezone);
}
- public LogicalType getLogicalType() {
- return logicalType;
- }
-
- public RowData.FieldGetter getFieldGetter() {
- return fieldGetter;
- }
-
public Object getValAsJava(RowData rowData) {
return getValAsJava(rowData, true);
}
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
index bc45999900a6..1405057019a8 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
@@ -38,11 +38,10 @@ import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -55,8 +54,8 @@ import java.util.stream.Collectors;
/**
* Flink hoodie writable table.
*/
+@Slf4j
public class HoodieFlinkWriteableTestTable extends HoodieWriteableTestTable {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieFlinkWriteableTestTable.class);
private HoodieFlinkWriteableTestTable(String basePath, HoodieStorage storage,
HoodieTableMetaClient metaClient,
Schema schema,
@@ -151,7 +150,7 @@ public class HoodieFlinkWriteableTestTable extends
HoodieWriteableTestTable {
HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(),
r.getPartitionPath(), "");
return (IndexedRecord) val;
} catch (IOException e) {
- LOG.warn("Failed to convert record " + r.toString(), e);
+ log.warn("Failed to convert record " + r.toString(), e);
return null;
}
}).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
header, HoodieRecord.RECORD_KEY_METADATA_FIELD));