zhangyue19921010 commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r777892769



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -134,12 +161,199 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
         LOG.info("No Instants to archive");
       }
 
+      if (config.getArchiveAutoMergeEnable()) {
+        mergeArchiveFilesIfNecessary(context);
+      }
       return success;
     } finally {
       close();
     }
   }
 
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+    Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+    // Flush reminded content if existed and open a new write
+    reOpenWriter();
+    // List all archive files
+    FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+        new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+    List<FileStatus> mergeCandidate = new ArrayList<>();
+    int archiveFilesCompactBatch = config.getArchiveFilesMergeBatchSize();
+    long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+    for (FileStatus fs: fsStatuses) {
+      if (fs.getLen() < smallFileLimitBytes) {
+        mergeCandidate.add(fs);
+      }
+      if (mergeCandidate.size() >= archiveFilesCompactBatch) {
+        break;
+      }
+    }
+
+    if (mergeCandidate.size() >= archiveFilesCompactBatch) {
+      List<String> candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());
+      // before merge archive files build merge plan
+      String logFileName = computeLogFileName();
+      buildArchiveMergePlan(candidateFiles, planPath, logFileName);
+      // merge archive files
+      mergeArchiveFiles(mergeCandidate);
+      // after merge, delete the small archive files.
+      deleteFilesParallelize(metaClient, candidateFiles, context, true);
+      // finally, delete archiveMergePlan which means merge small archive 
files operatin is succeed.
+      metaClient.getFs().delete(planPath, false);
+    }
+  }
+
+  /**
+   * Get final written archive file name based on storageSchemes support 
append or not.
+   */
+  private String computeLogFileName() throws IOException {
+    if (!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+      String logWriteToken = writer.getLogFile().getLogWriteToken();
+      HoodieLogFile hoodieLogFile = 
writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
+      return hoodieLogFile.getFileName();
+    } else {
+      return writer.getLogFile().getFileName();
+    }
+  }
+
+  /**
+   * Check/Solve if there is any failed and unfinished merge small archive 
files operation
+   * @param context HoodieEngineContext used for parallelize to delete small 
archive files if necessary.
+   * @throws IOException
+   */
+  private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext 
context) throws IOException {
+    if (config.getArchiveAutoMergeEnable()) {

Review comment:
       When enable and disable:
   1. disable using graceful way, there 's nothing left by this patch.
   2. Disable using un-graceful way, there may be left duplicate archive 
instant informations until enable again, but i think there may be cause no 
damaged if we used HasSet when loading archive instants in this pr changed. 
   
   Why we need this button before do clean works I think are :
   1. This is a new feature, it's more safe with a default false control here. 
   2. I am pretty worried about multi-writer here, at least we have a way to 
control only one writer could do merge works.
   
   In the next step, maybe we can take care about multi-writer, also removed 
this button here :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to