garyli1019 commented on a change in pull request #2300:
URL: https://github.com/apache/hudi/pull/2300#discussion_r547284907
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -125,19 +129,26 @@ private void init(HoodieRecord record) {
Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath,
fileId);
// Set the base commit time as the current instantTime for new inserts
into log files
String baseInstantTime;
+ String baseFile = "";
+ List<String> logFiles = new ArrayList<>();
if (fileSlice.isPresent()) {
baseInstantTime = fileSlice.get().getBaseInstantTime();
+ baseFile =
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
+ logFiles =
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
} else {
baseInstantTime = instantTime;
// This means there is no base data file, start appending to a new log
file
fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime,
this.fileId));
LOG.info("New InsertHandle for partition :" + partitionPath);
}
- writeStatus.getStat().setPrevCommit(baseInstantTime);
+ HoodieDeltaWriteStat deltaWriteStat = (HoodieDeltaWriteStat)
writeStatus.getStat();
+ deltaWriteStat.setPrevCommit(baseInstantTime);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
- writeStatus.getStat().setPartitionPath(partitionPath);
- writeStatus.getStat().setFileId(fileId);
+ deltaWriteStat.setPartitionPath(partitionPath);
+ deltaWriteStat.setFileId(fileId);
+ deltaWriteStat.setBaseFile(baseFile);
Review comment:
right, since we got this information from the `flieSlice`
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -258,20 +265,32 @@ public WriteStatus close() {
// flush any remaining records to disk
doAppend(header);
+ String latestLogFile = "";
if (writer != null) {
sizeInBytes = writer.getCurrentSize();
+ latestLogFile = writer.getLogFile().getFileName();
+ filePath = partitionPath.length() == 0 ? new
Path(latestLogFile).toString()
+ : new Path(partitionPath, latestLogFile).toString();
+ logVersion = writer.getLogFile().getLogVersion();
writer.close();
}
- HoodieWriteStat stat = writeStatus.getStat();
+ HoodieDeltaWriteStat stat = (HoodieDeltaWriteStat) writeStatus.getStat();
stat.setFileId(this.fileId);
+ stat.setPath(this.filePath);
+ stat.setLogVersion(logVersion);
+ stat.setLogOffset(logOffset);
Review comment:
this offset should be 0 if we write to a new log file, but the file size
of the existing log file if we do append. Currently this offset seems not
working because I am don't see anywhere this can get the log file size.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -258,20 +265,32 @@ public WriteStatus close() {
// flush any remaining records to disk
doAppend(header);
+ String latestLogFile = "";
if (writer != null) {
sizeInBytes = writer.getCurrentSize();
+ latestLogFile = writer.getLogFile().getFileName();
+ filePath = partitionPath.length() == 0 ? new
Path(latestLogFile).toString()
+ : new Path(partitionPath, latestLogFile).toString();
+ logVersion = writer.getLogFile().getLogVersion();
writer.close();
}
- HoodieWriteStat stat = writeStatus.getStat();
+ HoodieDeltaWriteStat stat = (HoodieDeltaWriteStat) writeStatus.getStat();
stat.setFileId(this.fileId);
+ stat.setPath(this.filePath);
+ stat.setLogVersion(logVersion);
+ stat.setLogOffset(logOffset);
stat.setNumWrites(recordsWritten);
stat.setNumUpdateWrites(updatedRecordsWritten);
stat.setNumInserts(insertRecordsWritten);
stat.setNumDeletes(recordsDeleted);
stat.setTotalWriteBytes(estimatedNumberOfBytesWritten);
stat.setFileSizeInBytes(sizeInBytes);
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
+ // update total log file list if the latest log file was new
+ if (!stat.getLogFiles().contains(latestLogFile)) {
Review comment:
Not quite sure about the rollover case. Is that possible that two log
files will be written in one commit? If we will write two log files, then the
`path` should contain 2 files as well?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -303,6 +322,7 @@ private Writer createLogWriter(Option<FileSlice> fileSlice,
String baseCommitTim
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(),
partitionPath))
.withFileId(fileId).overBaseCommit(baseCommitTime)
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
+ .withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
Review comment:
trying to store the offset in the writer, so this will be sync with the
log version. when the log version being changed, we can change the offset as
well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]