This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2c0fc56fdd1 [MINOR] Refactor the init method of HoodieAppendHandle
(#10693)
2c0fc56fdd1 is described below
commit 2c0fc56fdd11f471c67f4641cf48eb96fc2d0f04
Author: Fantasy-Jay <[email protected]>
AuthorDate: Tue Feb 20 09:59:30 2024 +0800
[MINOR] Refactor the init method of HoodieAppendHandle (#10693)
---
.../org/apache/hudi/io/HoodieAppendHandle.java | 80 +++++++++++-----------
1 file changed, 41 insertions(+), 39 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index f8cc77274c2..ffdb7c70a57 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -173,50 +173,52 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
}
private void init(HoodieRecord record) {
- if (doInit) {
- String prevCommit = instantTime;
- String baseFile = "";
- List<String> logFiles = new ArrayList<>();
- if (config.isCDCEnabled()) {
- // the cdc reader needs the base file metadata to have deterministic
update sequence.
- TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
- Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath,
fileId);
- if (fileSlice.isPresent()) {
- prevCommit = fileSlice.get().getBaseInstantTime();
- baseFile =
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
- logFiles =
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
- }
+ if (!doInit) {
+ return;
+ }
+
+ String prevCommit = instantTime;
+ String baseFile = "";
+ List<String> logFiles = new ArrayList<>();
+ if (config.isCDCEnabled()) {
+ // the cdc reader needs the base file metadata to have deterministic
update sequence.
+ TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
+ Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath,
fileId);
+ if (fileSlice.isPresent()) {
+ prevCommit = fileSlice.get().getBaseInstantTime();
+ baseFile =
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
+ logFiles =
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
}
+ }
- // Prepare the first write status
- HoodieDeltaWriteStat deltaWriteStat = new HoodieDeltaWriteStat();
- writeStatus.setStat(deltaWriteStat);
- writeStatus.setFileId(fileId);
- writeStatus.setPartitionPath(partitionPath);
- averageRecordSize = sizeEstimator.sizeEstimate(record);
+ // Prepare the first write status
+ HoodieDeltaWriteStat deltaWriteStat = new HoodieDeltaWriteStat();
+ writeStatus.setStat(deltaWriteStat);
+ writeStatus.setFileId(fileId);
+ writeStatus.setPartitionPath(partitionPath);
+ averageRecordSize = sizeEstimator.sizeEstimate(record);
- deltaWriteStat.setPrevCommit(prevCommit);
- deltaWriteStat.setPartitionPath(partitionPath);
- deltaWriteStat.setFileId(fileId);
- deltaWriteStat.setBaseFile(baseFile);
- deltaWriteStat.setLogFiles(logFiles);
+ deltaWriteStat.setPrevCommit(prevCommit);
+ deltaWriteStat.setPartitionPath(partitionPath);
+ deltaWriteStat.setFileId(fileId);
+ deltaWriteStat.setBaseFile(baseFile);
+ deltaWriteStat.setLogFiles(logFiles);
- try {
- // Save hoodie partition meta in the partition path
- HoodiePartitionMetadata partitionMetadata = new
HoodiePartitionMetadata(fs, instantTime,
- new Path(config.getBasePath()),
FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
- hoodieTable.getPartitionMetafileFormat());
- partitionMetadata.trySave(getPartitionId());
-
- this.writer = createLogWriter(getFileInstant(record));
- } catch (Exception e) {
- LOG.error("Error in update task at commit " + instantTime, e);
- writeStatus.setGlobalError(e);
- throw new HoodieUpsertException("Failed to initialize
HoodieAppendHandle for FileId: " + fileId + " on commit "
- + instantTime + " on HDFS path " +
hoodieTable.getMetaClient().getBasePathV2() + "/" + partitionPath, e);
- }
- doInit = false;
+ try {
+ // Save hoodie partition meta in the partition path
+ HoodiePartitionMetadata partitionMetadata = new
HoodiePartitionMetadata(fs, instantTime,
+ new Path(config.getBasePath()),
FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
+ hoodieTable.getPartitionMetafileFormat());
+ partitionMetadata.trySave(getPartitionId());
+
+ this.writer = createLogWriter(getFileInstant(record));
+ } catch (Exception e) {
+ LOG.error("Error in update task at commit " + instantTime, e);
+ writeStatus.setGlobalError(e);
+ throw new HoodieUpsertException("Failed to initialize HoodieAppendHandle
for FileId: " + fileId + " on commit "
+ + instantTime + " on HDFS path " +
hoodieTable.getMetaClient().getBasePathV2() + "/" + partitionPath, e);
}
+ doInit = false;
}
/**