nsivabalan commented on code in PR #13383:
URL: https://github.com/apache/hudi/pull/13383#discussion_r2122394819


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -634,19 +636,15 @@ private Pair<Integer, HoodieData<HoodieRecord>> 
initializeExpressionIndexPartiti
       if (entry.getValue().getBaseFile().isPresent()) {
         partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), 
Pair.of(entry.getValue().getBaseFile().get().getPath(), 
entry.getValue().getBaseFile().get().getFileLen())));
       }
-      entry.getValue().getLogFiles().forEach(hoodieLogFile -> {
-        if (entry.getValue().getLogFiles().count() > 0) {
-          entry.getValue().getLogFiles().forEach(logfile -> {
-            partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), 
Pair.of(logfile.getPath().toString(), logfile.getFileSize())));
-          });
-        }
-      });
+      entry.getValue().getLogFiles()
+          .forEach(hoodieLogFile -> 
partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), 
Pair.of(hoodieLogFile.getPath().toString(), hoodieLogFile.getFileSize()))));
     });
 
     int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount();
     int parallelism = Math.min(partitionFilePathSizeTriplet.size(), 
dataWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
-    Schema readerSchema = 
getProjectedSchemaForExpressionIndex(indexDefinition, dataMetaClient);
-    return Pair.of(fileGroupCount, 
getExpressionIndexRecords(partitionFilePathSizeTriplet, indexDefinition, 
dataMetaClient, parallelism, readerSchema, storageConf, dataTableInstantTime));
+    Schema tableSchema = new 
TableSchemaResolver(dataMetaClient).getTableAvroSchema();

Review Comment:
   not related to changes in this patch as such. but better to get it fixed.
   
   can we get the table Schema from within initializeFromFileSystem and pass it 
in to all init methods. 
   like
   initializeFilesPartition
   initializeBloomFiltersPartition
   initializeColumnStatsPartition
   etc
   
   which ever tries to instanttiate TableSchema, we should pass it from the 
caller. 
   
   The data table schema is never going to change while we are instantiating 
multiple mdt partitions. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java:
##########
@@ -88,33 +87,28 @@ protected boolean filterLogCompactionOperations() {
   }
 
   /**
-   * Can schedule logcompaction if log files count is greater than 4 or total 
log blocks is greater than 4.
+   * Can schedule logcompaction if log files count or total log blocks is 
greater than the configured threshold.
    * @param fileSlice File Slice under consideration.
+   * @param instantRange Range of valid instants.
    * @return Boolean value that determines whether log compaction will be 
scheduled or not.
    */
-  private boolean isFileSliceEligibleForLogCompaction(FileSlice fileSlice, 
String maxInstantTime,
+  private boolean isFileSliceEligibleForLogCompaction(FileSlice fileSlice,
                                                       Option<InstantRange> 
instantRange) {
-    LOG.info("Checking if fileId " + fileSlice.getFileId() + " and partition "
-        + fileSlice.getPartitionPath() + " eligible for log compaction.");
+    LOG.info("Checking if fileId {} and partition {} eligible for log 
compaction.", fileSlice.getFileId(), fileSlice.getPartitionPath());
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
-    HoodieUnMergedLogRecordScanner scanner = 
HoodieUnMergedLogRecordScanner.newBuilder()
-        .withStorage(metaClient.getStorage())
-        .withBasePath(hoodieTable.getMetaClient().getBasePath())
-        .withLogFilePaths(fileSlice.getLogFiles()
-            .sorted(HoodieLogFile.getLogFileComparator())
-            .map(file -> file.getPath().toString())
-            .collect(Collectors.toList()))
-        .withLatestInstantTime(maxInstantTime)
-        .withInstantRange(instantRange)
-        .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
-        .withOptimizedLogBlocksScan(true)
-        .withRecordMerger(writeConfig.getRecordMerger())
-        .withTableMetaClient(metaClient)
-        .build();
-    scanner.scan(true);
+    long numLogFiles = fileSlice.getLogFiles().count();
+    if (numLogFiles >= writeConfig.getLogCompactionBlocksThreshold()) {

Review Comment:
   for v8, we don't even need to proceed further. 
   each log file will have only 1 log block. 
   so we could return right away. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java:
##########
@@ -88,33 +87,28 @@ protected boolean filterLogCompactionOperations() {
   }
 
   /**
-   * Can schedule logcompaction if log files count is greater than 4 or total 
log blocks is greater than 4.
+   * Can schedule logcompaction if log files count or total log blocks is 
greater than the configured threshold.
    * @param fileSlice File Slice under consideration.
+   * @param instantRange Range of valid instants.
    * @return Boolean value that determines whether log compaction will be 
scheduled or not.
    */
-  private boolean isFileSliceEligibleForLogCompaction(FileSlice fileSlice, 
String maxInstantTime,
+  private boolean isFileSliceEligibleForLogCompaction(FileSlice fileSlice,
                                                       Option<InstantRange> 
instantRange) {
-    LOG.info("Checking if fileId " + fileSlice.getFileId() + " and partition "
-        + fileSlice.getPartitionPath() + " eligible for log compaction.");
+    LOG.info("Checking if fileId {} and partition {} eligible for log 
compaction.", fileSlice.getFileId(), fileSlice.getPartitionPath());
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
-    HoodieUnMergedLogRecordScanner scanner = 
HoodieUnMergedLogRecordScanner.newBuilder()
-        .withStorage(metaClient.getStorage())
-        .withBasePath(hoodieTable.getMetaClient().getBasePath())
-        .withLogFilePaths(fileSlice.getLogFiles()
-            .sorted(HoodieLogFile.getLogFileComparator())
-            .map(file -> file.getPath().toString())
-            .collect(Collectors.toList()))
-        .withLatestInstantTime(maxInstantTime)
-        .withInstantRange(instantRange)
-        .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
-        .withOptimizedLogBlocksScan(true)
-        .withRecordMerger(writeConfig.getRecordMerger())
-        .withTableMetaClient(metaClient)
-        .build();
-    scanner.scan(true);
+    long numLogFiles = fileSlice.getLogFiles().count();
+    if (numLogFiles >= writeConfig.getLogCompactionBlocksThreshold()) {
+      LOG.info("Total logs files ({}) is greater than log blocks threshold is 
{}", numLogFiles, writeConfig.getLogCompactionBlocksThreshold());
+      return true;
+    }
+    HoodieLogBlockMetadataScanner scanner = new 
HoodieLogBlockMetadataScanner(metaClient, fileSlice.getLogFiles()
+        .sorted(HoodieLogFile.getLogFileComparator())
+        .map(file -> file.getPath().toString())
+        .collect(Collectors.toList()),
+        writeConfig.getMaxDFSStreamBufferSize(),
+        instantRange);

Review Comment:
   are we removing `maxInstantTime` while reading from 
LogRecordReaderorScanner. 
   for version 6, we need to retain that. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to