codope commented on code in PR #12091:
URL: https://github.com/apache/hudi/pull/12091#discussion_r1799775150
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -173,24 +174,30 @@ protected HoodieData<HoodieRecord>
getFunctionalIndexRecords(List<Pair<String, F
// HUDI-6994 will address this.
String columnToIndex = indexDefinition.getSourceFields().get(0);
SQLContext sqlContext = sparkEngineContext.getSqlContext();
-
- // Group FileSlices by partition
- Map<String, List<FileSlice>> partitionToFileSlicesMap =
partitionFileSlicePairs.stream()
- .collect(Collectors.groupingBy(Pair::getKey,
Collectors.mapping(Pair::getValue, Collectors.toList())));
- HoodieFunctionalIndex<Column, Column> functionalIndex =
- new HoodieSparkFunctionalIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
- List<HoodieRecord> allRecords = new ArrayList<>();
- for (Map.Entry<String, List<FileSlice>> entry :
partitionToFileSlicesMap.entrySet()) {
- String partition = entry.getKey();
- List<FileSlice> fileSlices = entry.getValue();
- List<HoodieRecord> recordsForPartition = Collections.emptyList();
- if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
- recordsForPartition =
getFunctionalIndexRecordsUsingColumnStats(metaClient, readerSchema, fileSlices,
partition, functionalIndex, columnToIndex, sqlContext);
- } else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
- recordsForPartition =
getFunctionalIndexRecordsUsingBloomFilter(metaClient, readerSchema, fileSlices,
partition, functionalIndex, columnToIndex, sqlContext, metadataWriteConfig);
- }
- allRecords.addAll(recordsForPartition);
- }
+ ForkJoinPool customThreadPool = new ForkJoinPool(parallelism);
+ List<HoodieRecord> allRecords = customThreadPool.submit(() ->
Review Comment:
Can't use engine context parallelism, because we need the context for
dataset creation on which spark functions can be applied. So, going with usual
java parallelism.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]