vinothchandar commented on a change in pull request #2300:
URL: https://github.com/apache/hudi/pull/2300#discussion_r546991978
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -125,19 +129,26 @@ private void init(HoodieRecord record) {
Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath,
fileId);
// Set the base commit time as the current instantTime for new inserts
into log files
String baseInstantTime;
+ String baseFile = "";
+ List<String> logFiles = new ArrayList<>();
if (fileSlice.isPresent()) {
baseInstantTime = fileSlice.get().getBaseInstantTime();
+ baseFile =
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
+ logFiles =
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
} else {
baseInstantTime = instantTime;
// This means there is no base data file, start appending to a new log
file
fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime,
this.fileId));
LOG.info("New InsertHandle for partition :" + partitionPath);
Review comment:
while we are at it. fix this log statement? `New AppendHandle for .. `
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -125,19 +129,26 @@ private void init(HoodieRecord record) {
Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath,
fileId);
// Set the base commit time as the current instantTime for new inserts
into log files
String baseInstantTime;
+ String baseFile = "";
+ List<String> logFiles = new ArrayList<>();
if (fileSlice.isPresent()) {
baseInstantTime = fileSlice.get().getBaseInstantTime();
+ baseFile =
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
+ logFiles =
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
} else {
baseInstantTime = instantTime;
// This means there is no base data file, start appending to a new log
file
fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime,
this.fileId));
LOG.info("New InsertHandle for partition :" + partitionPath);
}
- writeStatus.getStat().setPrevCommit(baseInstantTime);
+ HoodieDeltaWriteStat deltaWriteStat = (HoodieDeltaWriteStat)
writeStatus.getStat();
+ deltaWriteStat.setPrevCommit(baseInstantTime);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
- writeStatus.getStat().setPartitionPath(partitionPath);
- writeStatus.getStat().setFileId(fileId);
+ deltaWriteStat.setPartitionPath(partitionPath);
+ deltaWriteStat.setFileId(fileId);
+ deltaWriteStat.setBaseFile(baseFile);
Review comment:
so this sets the `baseFile` and the `logFiles` that existed before this
delta commit?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -258,20 +265,32 @@ public WriteStatus close() {
// flush any remaining records to disk
doAppend(header);
+ String latestLogFile = "";
if (writer != null) {
sizeInBytes = writer.getCurrentSize();
+ latestLogFile = writer.getLogFile().getFileName();
+ filePath = partitionPath.length() == 0 ? new
Path(latestLogFile).toString()
+ : new Path(partitionPath, latestLogFile).toString();
+ logVersion = writer.getLogFile().getLogVersion();
writer.close();
}
- HoodieWriteStat stat = writeStatus.getStat();
+ HoodieDeltaWriteStat stat = (HoodieDeltaWriteStat) writeStatus.getStat();
stat.setFileId(this.fileId);
+ stat.setPath(this.filePath);
+ stat.setLogVersion(logVersion);
+ stat.setLogOffset(logOffset);
stat.setNumWrites(recordsWritten);
stat.setNumUpdateWrites(updatedRecordsWritten);
stat.setNumInserts(insertRecordsWritten);
stat.setNumDeletes(recordsDeleted);
stat.setTotalWriteBytes(estimatedNumberOfBytesWritten);
stat.setFileSizeInBytes(sizeInBytes);
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
+ // update total log file list if the latest log file was new
+ if (!stat.getLogFiles().contains(latestLogFile)) {
Review comment:
wht if we had rolled over the log files in between? i.e created more
than one new log file during write. I think we should add to the list of log
files, during every rollOver, not just at close
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -258,20 +265,32 @@ public WriteStatus close() {
// flush any remaining records to disk
doAppend(header);
+ String latestLogFile = "";
if (writer != null) {
sizeInBytes = writer.getCurrentSize();
+ latestLogFile = writer.getLogFile().getFileName();
+ filePath = partitionPath.length() == 0 ? new
Path(latestLogFile).toString()
+ : new Path(partitionPath, latestLogFile).toString();
+ logVersion = writer.getLogFile().getLogVersion();
writer.close();
}
- HoodieWriteStat stat = writeStatus.getStat();
+ HoodieDeltaWriteStat stat = (HoodieDeltaWriteStat) writeStatus.getStat();
stat.setFileId(this.fileId);
+ stat.setPath(this.filePath);
+ stat.setLogVersion(logVersion);
+ stat.setLogOffset(logOffset);
Review comment:
this offset would still point to the file at which the write started?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -87,6 +88,9 @@
private long averageRecordSize = 0;
private HoodieLogFile currentLogFile;
private Writer writer;
+ private String filePath = "null";
+ private int logVersion = 0;
Review comment:
in general., naming these variables with some context with some
documentation, would help readability of this file.
e.g `initialLogVersion` , `filePath` -> `logFilePathWritten` etc
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -303,6 +322,7 @@ private Writer createLogWriter(Option<FileSlice> fileSlice,
String baseCommitTim
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(),
partitionPath))
.withFileId(fileId).overBaseCommit(baseCommitTime)
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
+ .withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
Review comment:
why is this change needed?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]