yihua commented on code in PR #7642:
URL: https://github.com/apache/hudi/pull/7642#discussion_r1087220441
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java:
##########
@@ -66,42 +80,94 @@ public static SparkHoodieBloomIndexHelper getInstance() {
public HoodiePairData<HoodieKey, HoodieRecordLocation>
findMatchingFilesForRecordKeys(
HoodieWriteConfig config, HoodieEngineContext context, HoodieTable
hoodieTable,
HoodiePairData<String, String> partitionRecordKeyPairs,
- HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
+ HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
Map<String, Long> recordsPerPartition) {
- JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
- HoodieJavaRDD.getJavaRDD(fileComparisonPairs)
- .map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight()));
int inputParallelism =
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
- int joinParallelism = Math.max(inputParallelism,
config.getBloomIndexParallelism());
- LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism:
${"
- + config.getBloomIndexParallelism() + "}");
+ int configuredBloomIndexParallelism = config.getBloomIndexParallelism();
+ // NOTE: Target parallelism could be overridden by the config
+ int targetParallelism =
+ configuredBloomIndexParallelism > 0 ? configuredBloomIndexParallelism
: inputParallelism;
+
+ LOG.info(String.format("Input parallelism: %d, Index parallelism: %d",
inputParallelism, targetParallelism));
+
+ JavaPairRDD<HoodieFileGroupId, String> fileComparisonsRDD =
HoodieJavaRDD.getJavaRDD(fileComparisonPairs);
JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
+
if (config.getBloomIndexUseMetadata()
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
.contains(BLOOM_FILTERS.getPartitionPath())) {
- // Step 1: Sort by file id
- JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
- fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism);
+ SerializableConfiguration hadoopConf = new
SerializableConfiguration(hoodieTable.getHadoopConf());
+
+ HoodieTableFileSystemView baseFileOnlyView =
+ getBaseFileOnlyView(hoodieTable, partitionToFileInfo.keySet());
+
+ Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast =
+ ((HoodieSparkEngineContext)
context).getJavaSparkContext().broadcast(baseFileOnlyView);
+
+ // When leveraging MT we're aiming for following goals:
+ // - (G1) All requests to MT are made in batch (ie we're trying to
fetch all the values
+ // for corresponding keys at once)
+ // - (G2) Each task reads no more than just _one_ file-group from the
MT Bloom Filters
+ // partition
+ //
+ // Ta achieve G2, following invariant have to be maintained: Spark
partitions have to be
+ // affine w/ Metadata Table's file-groups, meaning that each Spark
partition holds records
+ // belonging to one and only file-group in MT Bloom Filters partition.
To provide for that
+ // we need to make sure
+ // - Spark's used [[Partitioner]] employs same hashing function as
Metadata Table (as well
+ // as being applied to the same keys as the MT one)
+ // - Make sure that # of partitions is congruent to the # of
file-groups (ie number of Spark
+ // partitions is a multiple of the # of the file-groups).
+ //
+ // Last provision is necessary, so that for every key it's the case
that
+ //
+ // (hash(key) % N) % M = hash(key) % M, iff N % M = 0
+ //
+ // Let's take an example of N = 8 and M = 4 (default # of file-groups
in Bloom Filter
+ // partition). In that case Spark partitions for which `hash(key) %
N` will be either 0
+ // or 4, will map to the same (first) file-group in MT
+ //
+ // To achieve G1, we drastically reduce # of RDD partitions actually
reading from MT, by
+ // setting target parallelism as a (low-factor) multiple of the # of the
file-groups in MT
+ int targetMetadataParallelism =
+ config.getMetadataConfig().getBloomFilterIndexFileGroupCount() *
config.getBloomIndexMetadataFetchingParallelismFactor();
+
+ AffineBloomIndexFileGroupPartitioner partitioner =
+ new AffineBloomIndexFileGroupPartitioner(baseFileOnlyViewBroadcast,
targetMetadataParallelism);
+
+ keyLookupResultRDD =
+ // First, we need to repartition and sort records using
[[AffineBloomIndexFileGroupPartitioner]]
+ // to make sure every Spark task accesses no more than just a single
file-group in MT (allows
+ // us to achieve G2).
+ //
+ // NOTE: Sorting records w/in individual partitions is required to
make sure that we cluster
+ // together keys co-located w/in the MT files (sorted by keys)
+ fileComparisonsRDD.repartitionAndSortWithinPartitions(partitioner)
Review Comment:
While the repartition is required for routing the partition and file ID
pairs of the same bloom_filter file group to the same Spark partition, there is
no need to sort within Spark partition, as the ordering has to be based on the
bloom filter index key, instead of the partition and file ID pair here. And
the sorting of bloom filter index keys already done before actual lookup in the
metadata table.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java:
##########
@@ -113,7 +113,7 @@ public HoodieRecord joinWith(HoodieRecord other, Schema
targetSchema) {
@Override
public HoodieRecord rewriteRecord(Schema recordSchema, Properties props,
Schema targetSchema) throws IOException {
- GenericRecord record = HoodieAvroUtils.rewriteRecord((GenericRecord) data,
targetSchema);
+ GenericRecord record = HoodieAvroUtils.rewriteRecordWithNewSchema(data,
targetSchema);
Review Comment:
Could you avoid this irrelevant change in the PR? (Let's do it in a
separate PR if needed)
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -807,16 +809,17 @@ private ClosableIteratorWithSchema<HoodieRecord>
getRecordsIterator(
Option<Pair<Function<HoodieRecord, HoodieRecord>, Schema>>
schemaEvolutionTransformerOpt =
composeEvolvedSchemaTransformer(dataBlock);
+
// In case when schema has been evolved original persisted records will
have to be
// transformed to adhere to the new schema
- if (schemaEvolutionTransformerOpt.isPresent()) {
- return ClosableIteratorWithSchema.newInstance(
- new CloseableMappingIterator<>(blockRecordsIterator,
- schemaEvolutionTransformerOpt.get().getLeft()),
- schemaEvolutionTransformerOpt.get().getRight());
- } else {
- return ClosableIteratorWithSchema.newInstance(blockRecordsIterator,
dataBlock.getSchema());
- }
+ Function<HoodieRecord, HoodieRecord> transformer =
Review Comment:
Similar here for irrelevant changes.
##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java:
##########
@@ -117,6 +117,11 @@ <O> HoodieData<O>
mapPartitions(SerializableFunction<Iterator<T>,
*/
<O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func);
+ /**
+ * TODO java-doc
Review Comment:
reminder on the javadocs
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -73,11 +73,12 @@ public abstract class BaseTableMetadata implements
HoodieTableMetadata {
private static final Logger LOG =
LogManager.getLogger(BaseTableMetadata.class);
- public static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
- public static final int BUFFER_SIZE = 10 * 1024 * 1024;
+ protected static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
+ // NOTE: Buffer-size is deliberately set pretty low, since MT internally is
relying
+ // on HFile (serving as persisted binary key-value mapping) to do
caching
+ protected static final int BUFFER_SIZE = 10 * 1024; // 10Kb
Review Comment:
Do you run any micro-benchmark to make sure there is no materialized change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]