alexeykudinkin commented on code in PR #7642:
URL: https://github.com/apache/hudi/pull/7642#discussion_r1087231730
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java:
##########
@@ -66,42 +80,94 @@ public static SparkHoodieBloomIndexHelper getInstance() {
public HoodiePairData<HoodieKey, HoodieRecordLocation>
findMatchingFilesForRecordKeys(
HoodieWriteConfig config, HoodieEngineContext context, HoodieTable
hoodieTable,
HoodiePairData<String, String> partitionRecordKeyPairs,
- HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
+ HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
Map<String, Long> recordsPerPartition) {
- JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
- HoodieJavaRDD.getJavaRDD(fileComparisonPairs)
- .map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight()));
int inputParallelism =
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
- int joinParallelism = Math.max(inputParallelism,
config.getBloomIndexParallelism());
- LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism:
${"
- + config.getBloomIndexParallelism() + "}");
+ int configuredBloomIndexParallelism = config.getBloomIndexParallelism();
+ // NOTE: Target parallelism could be overridden by the config
+ int targetParallelism =
+ configuredBloomIndexParallelism > 0 ? configuredBloomIndexParallelism
: inputParallelism;
+
+ LOG.info(String.format("Input parallelism: %d, Index parallelism: %d",
inputParallelism, targetParallelism));
+
+ JavaPairRDD<HoodieFileGroupId, String> fileComparisonsRDD =
HoodieJavaRDD.getJavaRDD(fileComparisonPairs);
JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
+
if (config.getBloomIndexUseMetadata()
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
.contains(BLOOM_FILTERS.getPartitionPath())) {
- // Step 1: Sort by file id
- JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
- fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism);
+ SerializableConfiguration hadoopConf = new
SerializableConfiguration(hoodieTable.getHadoopConf());
+
+ HoodieTableFileSystemView baseFileOnlyView =
+ getBaseFileOnlyView(hoodieTable, partitionToFileInfo.keySet());
+
+ Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast =
+ ((HoodieSparkEngineContext)
context).getJavaSparkContext().broadcast(baseFileOnlyView);
+
+ // When leveraging MT we're aiming for following goals:
+ // - (G1) All requests to MT are made in batch (ie we're trying to
fetch all the values
+ // for corresponding keys at once)
+ // - (G2) Each task reads no more than just _one_ file-group from the
MT Bloom Filters
+ // partition
+ //
+ // Ta achieve G2, following invariant have to be maintained: Spark
partitions have to be
+ // affine w/ Metadata Table's file-groups, meaning that each Spark
partition holds records
+ // belonging to one and only file-group in MT Bloom Filters partition.
To provide for that
+ // we need to make sure
+ // - Spark's used [[Partitioner]] employs same hashing function as
Metadata Table (as well
+ // as being applied to the same keys as the MT one)
+ // - Make sure that # of partitions is congruent to the # of
file-groups (ie number of Spark
+ // partitions is a multiple of the # of the file-groups).
+ //
+ // Last provision is necessary, so that for every key it's the case
that
+ //
+ // (hash(key) % N) % M = hash(key) % M, iff N % M = 0
+ //
+ // Let's take an example of N = 8 and M = 4 (default # of file-groups
in Bloom Filter
+ // partition). In that case Spark partitions for which `hash(key) %
N` will be either 0
+ // or 4, will map to the same (first) file-group in MT
+ //
+ // To achieve G1, we drastically reduce # of RDD partitions actually
reading from MT, by
+ // setting target parallelism as a (low-factor) multiple of the # of the
file-groups in MT
+ int targetMetadataParallelism =
+ config.getMetadataConfig().getBloomFilterIndexFileGroupCount() *
config.getBloomIndexMetadataFetchingParallelismFactor();
+
+ AffineBloomIndexFileGroupPartitioner partitioner =
+ new AffineBloomIndexFileGroupPartitioner(baseFileOnlyViewBroadcast,
targetMetadataParallelism);
+
+ keyLookupResultRDD =
+ // First, we need to repartition and sort records using
[[AffineBloomIndexFileGroupPartitioner]]
+ // to make sure every Spark task accesses no more than just a single
file-group in MT (allows
+ // us to achieve G2).
+ //
+ // NOTE: Sorting records w/in individual partitions is required to
make sure that we cluster
+ // together keys co-located w/in the MT files (sorted by keys)
+ fileComparisonsRDD.repartitionAndSortWithinPartitions(partitioner)
Review Comment:
There is b/c we cluster the files into batches of 256 per request to MT and
we want to make sure we're not querying metadata for the same file multiple
times
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]