codope commented on code in PR #12091:
URL: https://github.com/apache/hudi/pull/12091#discussion_r1799775150
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -173,24 +174,30 @@ protected HoodieData<HoodieRecord>
getFunctionalIndexRecords(List<Pair<String, F
// HUDI-6994 will address this.
String columnToIndex = indexDefinition.getSourceFields().get(0);
SQLContext sqlContext = sparkEngineContext.getSqlContext();
-
- // Group FileSlices by partition
- Map<String, List<FileSlice>> partitionToFileSlicesMap =
partitionFileSlicePairs.stream()
- .collect(Collectors.groupingBy(Pair::getKey,
Collectors.mapping(Pair::getValue, Collectors.toList())));
- HoodieFunctionalIndex<Column, Column> functionalIndex =
- new HoodieSparkFunctionalIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
- List<HoodieRecord> allRecords = new ArrayList<>();
- for (Map.Entry<String, List<FileSlice>> entry :
partitionToFileSlicesMap.entrySet()) {
- String partition = entry.getKey();
- List<FileSlice> fileSlices = entry.getValue();
- List<HoodieRecord> recordsForPartition = Collections.emptyList();
- if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
- recordsForPartition =
getFunctionalIndexRecordsUsingColumnStats(metaClient, readerSchema, fileSlices,
partition, functionalIndex, columnToIndex, sqlContext);
- } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
- recordsForPartition =
getFunctionalIndexRecordsUsingBloomFilter(metaClient, readerSchema, fileSlices,
partition, functionalIndex, columnToIndex, sqlContext, metadataWriteConfig);
- }
- allRecords.addAll(recordsForPartition);
- }
+ ForkJoinPool customThreadPool = new ForkJoinPool(parallelism);
+ List<HoodieRecord> allRecords = customThreadPool.submit(() ->
Review Comment:
Can't use engine context parallelism, because we need the context for
dataset creation on which spark functions can be applied. So, going with usual
java parallelism, which is still better than the previous sequential stats
computation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]