abhisheksahani91 commented on issue #11739:
URL: https://github.com/apache/hudi/issues/11739#issuecomment-2277046725
@ad1happy2go
From the source code below function in class HoodieCompactor provides the
responsibility for compaction
/**
* Execute compaction operations and report back status.
*/
public HoodieData<WriteStatus> compact(
HoodieEngineContext context, HoodieCompactionPlan compactionPlan,
HoodieTable table, HoodieWriteConfig config, String
compactionInstantTime,
HoodieCompactionHandler compactionHandler) {
if (compactionPlan == null || (compactionPlan.getOperations() == null)
|| (compactionPlan.getOperations().isEmpty())) {
return context.emptyHoodieData();
}
HoodieActiveTimeline timeline = table.getActiveTimeline();
HoodieInstant instant =
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
// Mark instant as compaction inflight
timeline.transitionCompactionRequestedToInflight(instant);
table.getMetaClient().reloadActiveTimeline();
HoodieTableMetaClient metaClient = table.getMetaClient();
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
// Here we firstly use the table schema as the reader schema to read
// log file.That is because in the case of MergeInto, the
config.getSchema may not
// the same with the table schema.
try {
if (StringUtils.isNullOrEmpty(config.getInternalSchema())) {
Schema readerSchema = schemaResolver.getTableAvroSchema(false);
config.setSchema(readerSchema.toString());
}
} catch (Exception e) {
// If there is no commit in the table, just ignore the exception.
}
// Compacting is very similar to applying updates to existing file
List<CompactionOperation> operations =
compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("Compactor compacting " + operations + " files");
context.setJobStatus(this.getClass().getSimpleName(), "Compacting file
slices: " + config.getTableName());
TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
return context.parallelize(operations).map(operation -> compact(
compactionHandler, metaClient, config, operation,
compactionInstantTime, taskContextSupplier))
.flatMap(List::iterator);
}
from the logs, we can see that the following files are selected for
compaction
24/08/07 20:40:21 INFO HoodieCompactor: Compactor compacting
[CompactionOperation{baseInstantTime='20240807183515584',
dataFileCommitTime=Option{val=20240807183515584},
deltaFileNames=[.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.1_1-376-2470,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.2_1-435-2837,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.3_1-493-3203,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.4_1-551-3569,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.5_1-609-3935,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.6_1-667-4301,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.7_1-725-4667,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.8_1-783-5033,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.9_1-841-5399,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.10_1-912-5807,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_2024080
7183515584.log.11_1-970-6173,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.12_1-1030-6542,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.13_1-1088-6908,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.14_1-1146-7274,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.15_1-1204-7640,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.16_1-1275-8048,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.17_1-1333-8414,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.18_1-1391-8785,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.19_1-1449-9156,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.20_1-1507-9527,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.21_1-1565-9898,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.22_1-1623-10269,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.23_1-1668-10567,
.1668c2db-6f1d-4b92-9ba9-d251
15fe3b74-0_20240807183515584.log.24_1-1726-10938,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.25_1-1784-11309,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.26_1-1842-11680,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.27_1-1900-12051,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.28_1-1971-12464,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.29_1-2029-12835,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.30_1-2100-13248,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.31_1-2171-13661,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.32_1-2228-14031,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.33_1-2285-14401,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.34_1-2343-14772,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.35_1-2401-15143,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.36_1-2459-15514,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.37_1-2517-15885,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.38_1-2575-16256,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.39_1-2633-16627,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.40_1-2691-16998,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.41_1-2749-17369,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.42_1-2820-17782,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.43_1-2878-18153,
.1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_20240807183515584.log.44_1-2936-18524],
dataFileName=Option{val=1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0_0-0-0_20240807183515584.parquet},
id='HoodieFileGroupId{partitionPath='',
fileId='1668c2db-6f1d-4b92-9ba9-d25115fe3b74-0'}',
metrics={TOTAL_LOG_FILES=44.0, TOTAL_IO_READ_MB=4754.0,
TOTAL_LOG_FILES_SIZE=4.875567816E9, TOTAL_IO_WRITE_MB=104.0,
TOTAL_IO_MB=4858.0}, bootstrapFilePath=Optional.empty}] fi
les
Now if we focus on the code line
context.parallelize(operations).map(operation -> compact(
compactionHandler, metaClient, config, operation,
compactionInstantTime, taskContextSupplier))
.flatMap(List::iterator);
context.parallelize(operations) is converting the list of files to
HoodieJavaRDD
@Override
public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
return HoodieJavaRDD.of(javaSparkContext.parallelize(data, parallelism));
}
My doubt is if Hoodie is parallelizing the files selected for compaction
then why the whole stage 0 is executed over a single executor/worker?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]