garyli1019 commented on a change in pull request #2300:
URL: https://github.com/apache/hudi/pull/2300#discussion_r547284907



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -125,19 +129,26 @@ private void init(HoodieRecord record) {
       Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, 
fileId);
       // Set the base commit time as the current instantTime for new inserts 
into log files
       String baseInstantTime;
+      String baseFile = "";
+      List<String> logFiles = new ArrayList<>();
       if (fileSlice.isPresent()) {
         baseInstantTime = fileSlice.get().getBaseInstantTime();
+        baseFile = 
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
+        logFiles = 
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
       } else {
         baseInstantTime = instantTime;
         // This means there is no base data file, start appending to a new log 
file
         fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime, 
this.fileId));
         LOG.info("New InsertHandle for partition :" + partitionPath);
       }
-      writeStatus.getStat().setPrevCommit(baseInstantTime);
+      HoodieDeltaWriteStat deltaWriteStat = (HoodieDeltaWriteStat) 
writeStatus.getStat();
+      deltaWriteStat.setPrevCommit(baseInstantTime);
       writeStatus.setFileId(fileId);
       writeStatus.setPartitionPath(partitionPath);
-      writeStatus.getStat().setPartitionPath(partitionPath);
-      writeStatus.getStat().setFileId(fileId);
+      deltaWriteStat.setPartitionPath(partitionPath);
+      deltaWriteStat.setFileId(fileId);
+      deltaWriteStat.setBaseFile(baseFile);

Review comment:
       right, since we got this information from the `flieSlice`

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -258,20 +265,32 @@ public WriteStatus close() {
       // flush any remaining records to disk
       doAppend(header);
 
+      String latestLogFile = "";
       if (writer != null) {
         sizeInBytes = writer.getCurrentSize();
+        latestLogFile = writer.getLogFile().getFileName();
+        filePath = partitionPath.length() == 0 ? new 
Path(latestLogFile).toString()
+            : new Path(partitionPath, latestLogFile).toString();
+        logVersion = writer.getLogFile().getLogVersion();
         writer.close();
       }
 
-      HoodieWriteStat stat = writeStatus.getStat();
+      HoodieDeltaWriteStat stat = (HoodieDeltaWriteStat) writeStatus.getStat();
       stat.setFileId(this.fileId);
+      stat.setPath(this.filePath);
+      stat.setLogVersion(logVersion);
+      stat.setLogOffset(logOffset);

Review comment:
       this offset should be 0 if we write to a new log file, but the file size 
of the existing log file if we do append. Currently this offset seems not 
working because I am don't see anywhere this can get the log file size.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -258,20 +265,32 @@ public WriteStatus close() {
       // flush any remaining records to disk
       doAppend(header);
 
+      String latestLogFile = "";
       if (writer != null) {
         sizeInBytes = writer.getCurrentSize();
+        latestLogFile = writer.getLogFile().getFileName();
+        filePath = partitionPath.length() == 0 ? new 
Path(latestLogFile).toString()
+            : new Path(partitionPath, latestLogFile).toString();
+        logVersion = writer.getLogFile().getLogVersion();
         writer.close();
       }
 
-      HoodieWriteStat stat = writeStatus.getStat();
+      HoodieDeltaWriteStat stat = (HoodieDeltaWriteStat) writeStatus.getStat();
       stat.setFileId(this.fileId);
+      stat.setPath(this.filePath);
+      stat.setLogVersion(logVersion);
+      stat.setLogOffset(logOffset);
       stat.setNumWrites(recordsWritten);
       stat.setNumUpdateWrites(updatedRecordsWritten);
       stat.setNumInserts(insertRecordsWritten);
       stat.setNumDeletes(recordsDeleted);
       stat.setTotalWriteBytes(estimatedNumberOfBytesWritten);
       stat.setFileSizeInBytes(sizeInBytes);
       stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
+      // update total log file list if the latest log file was new
+      if (!stat.getLogFiles().contains(latestLogFile)) {

Review comment:
       Not quite sure about the rollover case. Is that possible that two log 
files will be written in one commit? If we will write two log files, then the 
`path` should contain 2 files as well? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -303,6 +322,7 @@ private Writer createLogWriter(Option<FileSlice> fileSlice, 
String baseCommitTim
         
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(),
 partitionPath))
         .withFileId(fileId).overBaseCommit(baseCommitTime)
         
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
+        .withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))

Review comment:
       trying to store the offset in the writer, so this will be sync with the 
log version. when the log version being changed, we can change the offset as 
well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to