yihua commented on code in PR #7642:
URL: https://github.com/apache/hudi/pull/7642#discussion_r1089530979
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java:
##########
@@ -66,42 +82,92 @@ public static SparkHoodieBloomIndexHelper getInstance() {
public HoodiePairData<HoodieKey, HoodieRecordLocation>
findMatchingFilesForRecordKeys(
HoodieWriteConfig config, HoodieEngineContext context, HoodieTable
hoodieTable,
HoodiePairData<String, String> partitionRecordKeyPairs,
- HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
+ HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
Map<String, Long> recordsPerPartition) {
- JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
- HoodieJavaRDD.getJavaRDD(fileComparisonPairs)
- .map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight()));
- int inputParallelism =
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
- int joinParallelism = Math.max(inputParallelism,
config.getBloomIndexParallelism());
- LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism:
${"
- + config.getBloomIndexParallelism() + "}");
+ int inputParallelism =
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).getNumPartitions();
+ int configuredBloomIndexParallelism = config.getBloomIndexParallelism();
+ // NOTE: Target parallelism could be overridden by the config
+ int targetParallelism =
+ configuredBloomIndexParallelism > 0 ? configuredBloomIndexParallelism
: inputParallelism;
+
+ LOG.info(String.format("Input parallelism: %d, Index parallelism: %d",
inputParallelism, targetParallelism));
+
+ JavaPairRDD<HoodieFileGroupId, String> fileComparisonsRDD =
HoodieJavaRDD.getJavaRDD(fileComparisonPairs);
JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
+
if (config.getBloomIndexUseMetadata()
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
.contains(BLOOM_FILTERS.getPartitionPath())) {
- // Step 1: Sort by file id
- JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
- fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism);
+ SerializableConfiguration hadoopConf = new
SerializableConfiguration(hoodieTable.getHadoopConf());
+
+ HoodieTableFileSystemView baseFileOnlyView =
+ getBaseFileOnlyView(hoodieTable, partitionToFileInfo.keySet());
+
+ Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast =
+ ((HoodieSparkEngineContext)
context).getJavaSparkContext().broadcast(baseFileOnlyView);
+
+ // When leveraging MT we're aiming for following goals:
+ // - (G1) All requests to MT are made in batch (ie we're trying to
fetch all the values
+ // for corresponding keys at once)
+ // - (G2) Each task reads no more than just _one_ file-group from the
MT Bloom Filters
+ // partition
+ //
+ // Ta achieve G2, following invariant have to be maintained: Spark
partitions have to be
+ // affine w/ Metadata Table's file-groups, meaning that each Spark
partition holds records
+ // belonging to one and only file-group in MT Bloom Filters partition.
To provide for that
+ // we need to make sure
+ // - Spark's used [[Partitioner]] employs same hashing function as
Metadata Table (as well
+ // as being applied to the same keys as the MT one)
+ // - Make sure that # of partitions is congruent to the # of
file-groups (ie number of Spark
+ // partitions is a multiple of the # of the file-groups).
+ //
+ // Last provision is necessary, so that for every key it's the case
that
+ //
+ // (hash(key) % N) % M = hash(key) % M, iff N % M = 0
+ //
+ // Let's take an example of N = 8 and M = 4 (default # of file-groups
in Bloom Filter
+ // partition). In that case Spark partitions for which `hash(key) %
N` will be either 0
+ // or 4, will map to the same (first) file-group in MT
+ int bloomFilterPartitionFileGroupCount =
+ config.getMetadataConfig().getBloomFilterIndexFileGroupCount();
+ int adjustedTargetParallelism =
+ targetParallelism % bloomFilterPartitionFileGroupCount == 0
+ ? targetParallelism
+ // NOTE: We add 1 to make sure parallelism a) value always stays
positive and b)
+ // {@code targetParallelism <= adjustedTargetParallelism}
+ : (targetParallelism / bloomFilterPartitionFileGroupCount + 1) *
bloomFilterPartitionFileGroupCount;
Review Comment:
Make sure we benchmark the write workload with this new logic before landing
the PR.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java:
##########
@@ -113,7 +113,7 @@ public HoodieRecord joinWith(HoodieRecord other, Schema
targetSchema) {
@Override
public HoodieRecord rewriteRecord(Schema recordSchema, Properties props,
Schema targetSchema) throws IOException {
- GenericRecord record = HoodieAvroUtils.rewriteRecord((GenericRecord) data,
targetSchema);
+ GenericRecord record = HoodieAvroUtils.rewriteRecordWithNewSchema(data,
targetSchema);
Review Comment:
nit: I see other irrelevant changes reverted. This one is still here. If
you think this makes a performance improvement, we can keep it in this PR.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -123,8 +126,10 @@ public List<String> getAllPartitionPaths() throws
IOException {
throw new HoodieMetadataException("Failed to retrieve list of
partition from metadata", e);
}
}
- return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf,
dataBasePath.toString(),
- metadataConfig.shouldAssumeDatePartitioning()).getAllPartitionPaths();
+
+ FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
+ createFileSystemBackedTableMetadata();
+ return fileSystemBackedTableMetadata.getAllPartitionPaths();
Review Comment:
Makes sense.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java:
##########
@@ -66,42 +82,92 @@ public static SparkHoodieBloomIndexHelper getInstance() {
public HoodiePairData<HoodieKey, HoodieRecordLocation>
findMatchingFilesForRecordKeys(
HoodieWriteConfig config, HoodieEngineContext context, HoodieTable
hoodieTable,
HoodiePairData<String, String> partitionRecordKeyPairs,
- HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
+ HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
Map<String, Long> recordsPerPartition) {
- JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
- HoodieJavaRDD.getJavaRDD(fileComparisonPairs)
- .map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight()));
- int inputParallelism =
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
- int joinParallelism = Math.max(inputParallelism,
config.getBloomIndexParallelism());
- LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism:
${"
- + config.getBloomIndexParallelism() + "}");
+ int inputParallelism =
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).getNumPartitions();
+ int configuredBloomIndexParallelism = config.getBloomIndexParallelism();
+ // NOTE: Target parallelism could be overridden by the config
+ int targetParallelism =
+ configuredBloomIndexParallelism > 0 ? configuredBloomIndexParallelism
: inputParallelism;
+
+ LOG.info(String.format("Input parallelism: %d, Index parallelism: %d",
inputParallelism, targetParallelism));
+
+ JavaPairRDD<HoodieFileGroupId, String> fileComparisonsRDD =
HoodieJavaRDD.getJavaRDD(fileComparisonPairs);
JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
+
if (config.getBloomIndexUseMetadata()
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
.contains(BLOOM_FILTERS.getPartitionPath())) {
- // Step 1: Sort by file id
- JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
- fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism);
+ SerializableConfiguration hadoopConf = new
SerializableConfiguration(hoodieTable.getHadoopConf());
+
+ HoodieTableFileSystemView baseFileOnlyView =
+ getBaseFileOnlyView(hoodieTable, partitionToFileInfo.keySet());
+
+ Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast =
+ ((HoodieSparkEngineContext)
context).getJavaSparkContext().broadcast(baseFileOnlyView);
+
+ // When leveraging MT we're aiming for following goals:
+ // - (G1) All requests to MT are made in batch (ie we're trying to
fetch all the values
+ // for corresponding keys at once)
+ // - (G2) Each task reads no more than just _one_ file-group from the
MT Bloom Filters
+ // partition
+ //
+ // Ta achieve G2, following invariant have to be maintained: Spark
partitions have to be
+ // affine w/ Metadata Table's file-groups, meaning that each Spark
partition holds records
+ // belonging to one and only file-group in MT Bloom Filters partition.
To provide for that
+ // we need to make sure
+ // - Spark's used [[Partitioner]] employs same hashing function as
Metadata Table (as well
+ // as being applied to the same keys as the MT one)
+ // - Make sure that # of partitions is congruent to the # of
file-groups (ie number of Spark
+ // partitions is a multiple of the # of the file-groups).
+ //
+ // Last provision is necessary, so that for every key it's the case
that
+ //
+ // (hash(key) % N) % M = hash(key) % M, iff N % M = 0
+ //
+ // Let's take an example of N = 8 and M = 4 (default # of file-groups
in Bloom Filter
+ // partition). In that case Spark partitions for which `hash(key) %
N` will be either 0
+ // or 4, will map to the same (first) file-group in MT
+ int bloomFilterPartitionFileGroupCount =
+ config.getMetadataConfig().getBloomFilterIndexFileGroupCount();
+ int adjustedTargetParallelism =
+ targetParallelism % bloomFilterPartitionFileGroupCount == 0
+ ? targetParallelism
+ // NOTE: We add 1 to make sure parallelism a) value always stays
positive and b)
+ // {@code targetParallelism <= adjustedTargetParallelism}
+ : (targetParallelism / bloomFilterPartitionFileGroupCount + 1) *
bloomFilterPartitionFileGroupCount;
Review Comment:
One caveat is that if the `targetParallelism` is large, it may still
overload the S3 bucket if the number of Spark executors is large (each executor
reading metadata table's bloom_filters partition).
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java:
##########
@@ -115,27 +181,124 @@ public HoodiePairData<HoodieKey, HoodieRecordLocation>
findMatchingFilesForRecor
/**
* Compute the estimated number of bloom filter comparisons to be performed
on each file group.
*/
- private Map<String, Long> computeComparisonsPerFileGroup(
+ private Map<HoodieFileGroupId, Long> computeComparisonsPerFileGroup(
final HoodieWriteConfig config,
final Map<String, Long> recordsPerPartition,
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
- final JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD,
+ final JavaPairRDD<HoodieFileGroupId, String> fileComparisonsRDD,
final HoodieEngineContext context) {
- Map<String, Long> fileToComparisons;
+ Map<HoodieFileGroupId, Long> fileToComparisons;
if (config.getBloomIndexPruneByRanges()) {
// we will just try exploding the input and then count to determine
comparisons
// FIX(vc): Only do sampling here and extrapolate?
context.setJobStatus(this.getClass().getSimpleName(), "Compute all
comparisons needed between records and files: " + config.getTableName());
- fileToComparisons = fileComparisonsRDD.mapToPair(t -> t).countByKey();
+ fileToComparisons = fileComparisonsRDD.countByKey();
} else {
fileToComparisons = new HashMap<>();
- partitionToFileInfo.forEach((key, value) -> {
- for (BloomIndexFileInfo fileInfo : value) {
+ partitionToFileInfo.forEach((partitionPath, fileInfos) -> {
+ for (BloomIndexFileInfo fileInfo : fileInfos) {
// each file needs to be compared against all the records coming
into the partition
- fileToComparisons.put(fileInfo.getFileId(),
recordsPerPartition.get(key));
+ fileToComparisons.put(
+ new HoodieFileGroupId(partitionPath, fileInfo.getFileId()),
recordsPerPartition.get(partitionPath));
}
});
}
return fileToComparisons;
}
+
+ private static HoodieTableFileSystemView getBaseFileOnlyView(HoodieTable<?,
?, ?, ?> hoodieTable, Collection<String> partitionPaths) {
+ try {
+ List<String> fullPartitionPaths = partitionPaths.stream()
+ .map(partitionPath ->
+ String.format("%s/%s",
hoodieTable.getMetaClient().getBasePathV2(), partitionPath))
+ .collect(Collectors.toList());
+
+ FileStatus[] allFiles =
+
hoodieTable.getMetadataTable().getAllFilesInPartitions(fullPartitionPaths).values().stream()
+ .flatMap(Arrays::stream)
+ .toArray(FileStatus[]::new);
+
+ return new HoodieTableFileSystemView(hoodieTable.getMetaClient(),
hoodieTable.getActiveTimeline(), allFiles);
+ } catch (IOException e) {
+ LOG.error(String.format("Failed to fetch all files for partitions (%s)",
partitionPaths));
+ throw new HoodieIOException("Failed to fetch all files for partitions",
e);
+ }
+ }
+
+ static class AffineBloomIndexFileGroupPartitioner extends Partitioner {
+
+ private final Broadcast<HoodieTableFileSystemView>
latestBaseFilesBroadcast;
+
+ // TODO(HUDI-5619) remove when addressed
+ private final Map<String, Map<String, String>> cachedLatestBaseFileNames =
+ new HashMap<>(16);
Review Comment:
Any reason of using `16` instead of a bigger number?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]