abhisheksahani91 commented on issue #11739:
URL: https://github.com/apache/hudi/issues/11739#issuecomment-2277046725

   @ad1happy2go 
   From the source code below function in class HoodieCompactor provides the 
responsibility for compaction
   
    /**
      * Execute compaction operations and report back status.
      */
     public HoodieData<WriteStatus> compact(
         HoodieEngineContext context, HoodieCompactionPlan compactionPlan,
         HoodieTable table, HoodieWriteConfig config, String 
compactionInstantTime,
         HoodieCompactionHandler compactionHandler) {
       if (compactionPlan == null || (compactionPlan.getOperations() == null)
           || (compactionPlan.getOperations().isEmpty())) {
         return context.emptyHoodieData();
       }
       HoodieActiveTimeline timeline = table.getActiveTimeline();
       HoodieInstant instant = 
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
       // Mark instant as compaction inflight
       timeline.transitionCompactionRequestedToInflight(instant);
       table.getMetaClient().reloadActiveTimeline();
   
       HoodieTableMetaClient metaClient = table.getMetaClient();
       TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
   
       // Here we firstly use the table schema as the reader schema to read
       // log file.That is because in the case of MergeInto, the 
config.getSchema may not
       // the same with the table schema.
       try {
         if (StringUtils.isNullOrEmpty(config.getInternalSchema())) {
           Schema readerSchema = schemaResolver.getTableAvroSchema(false);
           config.setSchema(readerSchema.toString());
         }
       } catch (Exception e) {
         // If there is no commit in the table, just ignore the exception.
       }
   
       // Compacting is very similar to applying updates to existing file
       List<CompactionOperation> operations = 
compactionPlan.getOperations().stream()
           
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
       LOG.info("Compactor compacting " + operations + " files");
   
       context.setJobStatus(this.getClass().getSimpleName(), "Compacting file 
slices: " + config.getTableName());
       TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
       return context.parallelize(operations).map(operation -> compact(
           compactionHandler, metaClient, config, operation, 
compactionInstantTime, taskContextSupplier))
           .flatMap(List::iterator);
     }
     
     from the logs, we can see that the following files are selected for 
compaction
     
     24/08/07 20:40:21 INFO HoodieCompactor: Compactor compacting 
[CompactionOperation{baseInstantTime='20240807183515584', 
dataFileCommitTime=Option{val=20240807183515584}, 
deltaFileNames=[.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.1_1-376-2470,
 .1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.2_1-435-2837, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.3_1-493-3203, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.4_1-551-3569, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.5_1-609-3935, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.6_1-667-4301, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.7_1-725-4667, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.8_1-783-5033, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.9_1-841-5399, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.10_1-912-5807, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_2024080
 7183515584.log.11_1-970-6173, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.12_1-1030-6542, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.13_1-1088-6908, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.14_1-1146-7274, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.15_1-1204-7640, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.16_1-1275-8048, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.17_1-1333-8414, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.18_1-1391-8785, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.19_1-1449-9156, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.20_1-1507-9527, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.21_1-1565-9898, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.22_1-1623-10269, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.23_1-1668-10567, 
.1668c2db-6f1d-4b92-9ba9-d251
 15fe3b74-0_20240807183515584.log.24_1-1726-10938, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.25_1-1784-11309, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.26_1-1842-11680, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.27_1-1900-12051, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.28_1-1971-12464, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.29_1-2029-12835, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.30_1-2100-13248, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.31_1-2171-13661, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.32_1-2228-14031, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.33_1-2285-14401, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.34_1-2343-14772, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.35_1-2401-15143, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.36_1-2459-15514,
  
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.37_1-2517-15885, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.38_1-2575-16256, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.39_1-2633-16627, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.40_1-2691-16998, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.41_1-2749-17369, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.42_1-2820-17782, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.43_1-2878-18153, 
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.44_1-2936-18524], 
dataFileName=Option{val=1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_0-0-0_20240807183515584.parquet},
 id='HoodieFileGroupId{partitionPath='', 
fileId='1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0'}', 
metrics={TOTAL_LOG_FILES=44.0, TOTAL_IO_READ_MB=4754.0, 
TOTAL_LOG_FILES_SIZE=4.875567816E9, TOTAL_IO_WRITE_MB=104.0, 
TOTAL_IO_MB=4858.0}, bootstrapFilePath=Optional.empty}] fi
 les
     
   Now if we focus on the code line 
   
   context.parallelize(operations).map(operation -> compact(
           compactionHandler, metaClient, config, operation, 
compactionInstantTime, taskContextSupplier))
           .flatMap(List::iterator);
           
   context.parallelize(operations) is converting the list of files to 
HoodieJavaRDD
   @Override
     public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
       return HoodieJavaRDD.of(javaSparkContext.parallelize(data, parallelism));
     }
      
      My doubt is if Hoodie is parallelizing the files selected for compaction 
then why the whole stage 0 is executed over a single executor/worker? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to