yihua commented on code in PR #12390:
URL: https://github.com/apache/hudi/pull/12390#discussion_r1865590919


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java:
##########
@@ -161,66 +162,97 @@ public List<WriteStatus> compact(HoodieCompactionHandler 
compactionHandler,
                                    Option<InstantRange> instantRange,
                                    TaskContextSupplier taskContextSupplier,
                                    CompactionExecutionHelper executionHelper) 
throws IOException {
-    HoodieStorage storage = metaClient.getStorage();
-    Schema readerSchema;
-    Option<InternalSchema> internalSchemaOption = Option.empty();
-    if (!StringUtils.isNullOrEmpty(config.getInternalSchema())) {
-      readerSchema = new Schema.Parser().parse(config.getSchema());
-      internalSchemaOption = SerDeHelper.fromJson(config.getInternalSchema());
-      // its safe to modify config here, since we are running in task side.
-      ((HoodieTable) compactionHandler).getConfig().setDefault(config);
+    if 
(config.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
+        && compactionHandler.supportsFileGroupReader()) {
+      List<WriteStatus> writeStatusList = 
compactionHandler.runCompactionUsingFileGroupReader(instantTime,
+          operation.getPartitionPath(), operation.getFileId(), operation, 2);
+      writeStatusList
+          .forEach(s -> {
+            final HoodieWriteStat stat = s.getStat();
+            /*
+            fill in log reading stats
+            
stat.setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
+            stat.setTotalLogFilesCompacted(scanner.getTotalLogFiles());
+            stat.setTotalLogRecords(scanner.getTotalLogRecords());
+            stat.setPartitionPath(operation.getPartitionPath());
+            stat
+                
.setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
+            stat.setTotalLogBlocks(scanner.getTotalLogBlocks());
+            stat.setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
+            stat.setTotalRollbackBlocks(scanner.getTotalRollbacks());
+            RuntimeStats runtimeStats = new RuntimeStats();
+            // scan time has to be obtained from scanner.
+            
runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
+            // create and upsert time are obtained from the create or merge 
handle.
+            if (stat.getRuntimeStats() != null) {
+              
runtimeStats.setTotalCreateTime(stat.getRuntimeStats().getTotalCreateTime());
+              
runtimeStats.setTotalUpsertTime(stat.getRuntimeStats().getTotalUpsertTime());
+            }
+            stat.setRuntimeStats(runtimeStats);
+             */
+          });
+      return writeStatusList;
     } else {
-      readerSchema = HoodieAvroUtils.addMetadataFields(
-          new Schema.Parser().parse(config.getSchema()), 
config.allowOperationMetadataField());
-    }
-    LOG.info("Compaction operation started for base file: " + 
operation.getDataFileName() + " and delta files: " + 
operation.getDeltaFileNames()
-        + " for commit " + instantTime);
-    // TODO - FIX THIS
-    // Reads the entire avro file. Always only specific blocks should be read 
from the avro file
-    // (failure recover).
-    // Load all the delta commits since the last compaction commit and get all 
the blocks to be
-    // loaded and load it using CompositeAvroLogReader
-    // Since a DeltaCommit is not defined yet, reading all the records. 
revisit this soon.
+      HoodieStorage storage = metaClient.getStorage();

Review Comment:
   Yes.  I'll use a separate method for tracking changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to