zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r777895112



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -134,12 +161,199 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
         LOG.info("No Instants to archive");
       }
 
+      if (config.getArchiveAutoMergeEnable()) {
+        mergeArchiveFilesIfNecessary(context);
+      }
       return success;
     } finally {
       close();
     }
   }
 
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+    Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+    // Flush reminded content if existed and open a new write
+    reOpenWriter();
+    // List all archive files
+    FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+        new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+    List<FileStatus> mergeCandidate = new ArrayList<>();
+    int archiveFilesCompactBatch = config.getArchiveFilesMergeBatchSize();
+    long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+    for (FileStatus fs: fsStatuses) {
+      if (fs.getLen() < smallFileLimitBytes) {
+        mergeCandidate.add(fs);
+      }
+      if (mergeCandidate.size() >= archiveFilesCompactBatch) {
+        break;
+      }
+    }
+
+    if (mergeCandidate.size() >= archiveFilesCompactBatch) {
+      List<String> candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());
+      // before merge archive files build merge plan
+      String logFileName = computeLogFileName();
+      buildArchiveMergePlan(candidateFiles, planPath, logFileName);
+      // merge archive files
+      mergeArchiveFiles(mergeCandidate);
+      // after merge, delete the small archive files.
+      deleteFilesParallelize(metaClient, candidateFiles, context, true);
+      // finally, delete archiveMergePlan which means merge small archive 
files operatin is succeed.
+      metaClient.getFs().delete(planPath, false);
+    }
+  }
+
+  /**
+   * Get final written archive file name based on storageSchemes support 
append or not.
+   */
+  private String computeLogFileName() throws IOException {
+    if (!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+      String logWriteToken = writer.getLogFile().getLogWriteToken();
+      HoodieLogFile hoodieLogFile = 
writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
+      return hoodieLogFile.getFileName();
+    } else {
+      return writer.getLogFile().getFileName();

Review comment:
       > Do we create a new log block in log10 which will contain all merged 
log blocks from log1 to log10 ? Or should we explicitly create a new log file 
(log11)
   
   Nice catch! It is better for option 2 that we create a new log11 each time 
during merging small archive files.
   
   > Do we even need to consider adding this new feature for storage schemes 
where append is supported. In other words, should we consider enabling this 
feature just for storage schemes where append is not supported. and leave it as 
no-op for storage schemes appends are supported.
   
   It's better to enable this feature just for storage schemes where append is 
not supported




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to