danny0405 commented on code in PR #13383:
URL: https://github.com/apache/hudi/pull/13383#discussion_r2128028040


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java:
##########
@@ -88,33 +88,32 @@ protected boolean filterLogCompactionOperations() {
   }
 
   /**
-   * Can schedule logcompaction if log files count is greater than 4 or total 
log blocks is greater than 4.
+   * Can schedule logcompaction if log files count or total log blocks is 
greater than the configured threshold.
    * @param fileSlice File Slice under consideration.
+   * @param instantRange Range of valid instants.
    * @return Boolean value that determines whether log compaction will be 
scheduled or not.
    */
   private boolean isFileSliceEligibleForLogCompaction(FileSlice fileSlice, 
String maxInstantTime,
                                                       Option<InstantRange> 
instantRange) {
-    LOG.info("Checking if fileId " + fileSlice.getFileId() + " and partition "
-        + fileSlice.getPartitionPath() + " eligible for log compaction.");
+    LOG.info("Checking if fileId {} and partition {} eligible for log 
compaction.", fileSlice.getFileId(), fileSlice.getPartitionPath());
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
-    HoodieUnMergedLogRecordScanner scanner = 
HoodieUnMergedLogRecordScanner.newBuilder()
-        .withStorage(metaClient.getStorage())
-        .withBasePath(hoodieTable.getMetaClient().getBasePath())
-        .withLogFilePaths(fileSlice.getLogFiles()
-            .sorted(HoodieLogFile.getLogFileComparator())
-            .map(file -> file.getPath().toString())
-            .collect(Collectors.toList()))
-        .withLatestInstantTime(maxInstantTime)
-        .withInstantRange(instantRange)
-        .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
-        .withOptimizedLogBlocksScan(true)
-        .withRecordMerger(writeConfig.getRecordMerger())
-        .withTableMetaClient(metaClient)
-        .build();
-    scanner.scan(true);
+    long numLogFiles = fileSlice.getLogFiles().count();
+    if (numLogFiles >= writeConfig.getLogCompactionBlocksThreshold()) {
+      LOG.info("Total logs files ({}) is greater than log blocks threshold is 
{}", numLogFiles, writeConfig.getLogCompactionBlocksThreshold());
+      return true;
+    } else if 
(hoodieTable.getMetaClient().getTableConfig().getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT))
 {
+      // for table version 8 and above, we assume a single log block per log 
file

Review Comment:
   > and also we have disabled appends to same log file in tbl v 8.
   
   We disable the append, but there is still opportunity one input data set 
been flushed into multiple log blocks based on the configured threshold(Flink 
disable this multipe flush in master now but not for the other engines).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to