yihua commented on code in PR #6432:
URL: https://github.com/apache/hudi/pull/6432#discussion_r951958281
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BloomIndexFileInfo.java:
##########
@@ -27,19 +27,20 @@
public class BloomIndexFileInfo implements Serializable {
private final String fileId;
-
+ private final String filename;
Review Comment:
Here we store the filename to avoid recomputation of the same piece of
information across stages.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java:
##########
@@ -121,12 +121,14 @@ private HoodiePairData<HoodieKey, HoodieRecordLocation>
lookupIndex(
// Step 2: Load all involved files as <Partition, filename> pairs
List<Pair<String, BloomIndexFileInfo>> fileInfoList =
getBloomIndexFileInfoForPartitions(context, hoodieTable,
affectedPartitionPathList);
- final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
- fileInfoList.stream().collect(groupingBy(Pair::getLeft,
mapping(Pair::getRight, toList())));
+ // partition -> {file ID -> BloomIndexFileInfo instance}
+ final Map<String, Map<String, BloomIndexFileInfo>> partitionToFileInfo =
fileInfoList.stream()
Review Comment:
Switching from List to Map for each value so that the filename can be
retrieved with file ID in O(1) time.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java:
##########
@@ -82,25 +87,33 @@ public HoodiePairData<HoodieKey, HoodieRecordLocation>
findMatchingFilesForRecor
if (config.getBloomIndexUseMetadata()
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
.contains(BLOOM_FILTERS.getPartitionPath())) {
- // Step 1: Sort by file id
- JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
- fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism);
+ // Step 1: Sort by bloom filter key (Hash ID of partition and file name)
in metadata table
+ JavaRDD<Tuple2<Tuple2<String, String>, HoodieKey>>
sortedFileIdAndKeyPairs =
+ fileComparisonsRDD
+ .sortBy(t -> HoodieMetadataPayload.getBloomFilterIndexKey(
+ new PartitionIndexID(t._1._1), new FileIndexID(t._1._2)),
true, joinParallelism);
// Step 2: Use bloom filter to filter and the actual log file to get the
record location
- keyLookupResultRDD = sortedFileIdAndKeyPairs.mapPartitionsWithIndex(
- new HoodieMetadataBloomIndexCheckFunction(hoodieTable), true);
+ keyLookupResultRDD =
sortedFileIdAndKeyPairs.coalesce(config.getBloomIndexMetadataReadParallelism())
Review Comment:
This limits the parallelism of bloom filter fetching.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java:
##########
@@ -121,12 +121,14 @@ private HoodiePairData<HoodieKey, HoodieRecordLocation>
lookupIndex(
// Step 2: Load all involved files as <Partition, filename> pairs
List<Pair<String, BloomIndexFileInfo>> fileInfoList =
getBloomIndexFileInfoForPartitions(context, hoodieTable,
affectedPartitionPathList);
- final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
- fileInfoList.stream().collect(groupingBy(Pair::getLeft,
mapping(Pair::getRight, toList())));
+ // partition -> {file ID -> BloomIndexFileInfo instance}
+ final Map<String, Map<String, BloomIndexFileInfo>> partitionToFileInfo =
fileInfoList.stream()
+ .collect(groupingBy(Pair::getLeft, toMap(entry ->
entry.getRight().getFileId(), Pair::getRight)));
// Step 3: Obtain a HoodieData, for each incoming record, that already
exists, with the file id,
// that contains it.
- HoodieData<Pair<String, HoodieKey>> fileComparisonPairs =
+ // Each entry: ((File ID, Filename), HoodieKey instance)
+ HoodieData<Pair<Pair<String, String>, HoodieKey>> fileComparisonPairs =
Review Comment:
Along the DAG, passing down the file names so they don't need to be
recomputed.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java:
##########
@@ -202,41 +219,57 @@ private List<Pair<String, BloomIndexFileInfo>>
getFileInfoForLatestBaseFiles(
* @param partitions - List of partitions for which column stats need to be
loaded
* @param context - Engine context
* @param hoodieTable - Hoodie table
+ * @param parallelism - Parallelism for reading column stats from metadata
table
* @return List of partition and file column range info pairs
*/
protected List<Pair<String, BloomIndexFileInfo>>
loadColumnRangesFromMetaIndex(
- List<String> partitions, final HoodieEngineContext context, final
HoodieTable<?, ?, ?, ?> hoodieTable) {
+ List<String> partitions, final HoodieEngineContext context,
+ final HoodieTable<?, ?, ?, ?> hoodieTable, int parallelism) {
// also obtain file ranges, if range pruning is enabled
context.setJobStatus(this.getClass().getName(), "Load meta index key
ranges for file slices: " + config.getTableName());
final String keyField =
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
- return context.flatMap(partitions, partitionName -> {
- // Partition and file name pairs
- List<Pair<String, String>> partitionFileNameList =
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
- hoodieTable).stream().map(baseFile -> Pair.of(partitionName,
baseFile.getFileName()))
- .sorted()
- .collect(toList());
- if (partitionFileNameList.isEmpty()) {
- return Stream.empty();
- }
- try {
- Map<Pair<String, String>, HoodieMetadataColumnStats>
fileToColumnStatsMap =
-
hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, keyField);
- List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
- for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry
: fileToColumnStatsMap.entrySet()) {
- result.add(Pair.of(entry.getKey().getLeft(),
- new BloomIndexFileInfo(
- FSUtils.getFileId(entry.getKey().getRight()),
- // NOTE: Here we assume that the type of the primary key
field is string
- (String)
unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
- (String)
unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
- )));
- }
- return result.stream();
- } catch (MetadataNotFoundException me) {
- throw new HoodieMetadataException("Unable to find column range
metadata for partition:" + partitionName, me);
- }
- }, Math.max(partitions.size(), 1));
+ return context.parallelize(partitions,
Math.max(Math.min(partitions.size(), parallelism), 1))
Review Comment:
This limits the parallelism of column stats fetching from the metadata table.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]