alexeykudinkin commented on code in PR #7642:
URL: https://github.com/apache/hudi/pull/7642#discussion_r1087203007
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java:
##########
@@ -210,34 +210,35 @@ protected List<Pair<String, BloomIndexFileInfo>>
loadColumnRangesFromMetaIndex(
// also obtain file ranges, if range pruning is enabled
context.setJobStatus(this.getClass().getName(), "Load meta index key
ranges for file slices: " + config.getTableName());
- final String keyField =
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
- return context.flatMap(partitions, partitionName -> {
- // Partition and file name pairs
- List<Pair<String, String>> partitionFileNameList =
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
- hoodieTable).stream().map(baseFile -> Pair.of(partitionName,
baseFile.getFileName()))
- .sorted()
- .collect(toList());
- if (partitionFileNameList.isEmpty()) {
- return Stream.empty();
- }
- try {
- Map<Pair<String, String>, HoodieMetadataColumnStats>
fileToColumnStatsMap =
-
hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, keyField);
- List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
- for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry
: fileToColumnStatsMap.entrySet()) {
- result.add(Pair.of(entry.getKey().getLeft(),
- new BloomIndexFileInfo(
- FSUtils.getFileId(entry.getKey().getRight()),
- // NOTE: Here we assume that the type of the primary key
field is string
- (String)
unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
- (String)
unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
- )));
- }
- return result.stream();
- } catch (MetadataNotFoundException me) {
- throw new HoodieMetadataException("Unable to find column range
metadata for partition:" + partitionName, me);
- }
- }, Math.max(partitions.size(), 1));
+ String keyField =
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+
+ // Partition and file name pairs
+ List<Pair<String, String>> partitionFileNameList =
+ HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions,
context, hoodieTable).stream()
+ .map(partitionBaseFilePair ->
Pair.of(partitionBaseFilePair.getLeft(),
partitionBaseFilePair.getRight().getFileName()))
+ .sorted()
+ .collect(toList());
+
+ if (partitionFileNameList.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatsMap =
Review Comment:
@yihua ultimately we'd still transfer the whole table view (ie
BloomIndexFileInfo) for the whole table to the driver (we're passing back the
List). So there's no change in that regard
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.broadcast.Broadcast;
+import scala.Tuple2;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of the function probing filtered in candidate keys provided
in
+ * {@link HoodieBloomFilterProbingResult} w/in corresponding files identified
by {@link HoodieFileGroupId}
+ * to validate whether the record w/ the provided key is indeed persisted in it
+ */
+public class HoodieFileProbingFunction implements
+ FlatMapFunction<Iterator<Tuple2<HoodieFileGroupId,
HoodieBloomFilterProbingResult>>, List<HoodieKeyLookupResult>> {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieFileProbingFunction.class);
+
+ // Assuming each file bloom filter takes up 512K, sizing the max file count
+ // per batch so that the total fetched bloom filters would not cross 128 MB.
+ private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256;
+
+ private final Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast;
+ private final SerializableConfiguration hadoopConf;
+
+ public HoodieFileProbingFunction(Broadcast<HoodieTableFileSystemView>
baseFileOnlyViewBroadcast,
+ SerializableConfiguration hadoopConf) {
+ this.baseFileOnlyViewBroadcast = baseFileOnlyViewBroadcast;
+ this.hadoopConf = hadoopConf;
+ }
+
+ @Override
+ public Iterator<List<HoodieKeyLookupResult>>
call(Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>>
tuple2Iterator) throws Exception {
+ return new BloomIndexLazyKeyCheckIterator(tuple2Iterator);
+ }
+
+ private class BloomIndexLazyKeyCheckIterator
+ extends LazyIterableIterator<Tuple2<HoodieFileGroupId,
HoodieBloomFilterProbingResult>, List<HoodieKeyLookupResult>> {
+
+ public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<HoodieFileGroupId,
HoodieBloomFilterProbingResult>> tuple2Iterator) {
+ super(tuple2Iterator);
+ }
+
+ @Override
+ protected List<HoodieKeyLookupResult> computeNext() {
+ // Partition path and file name pair to list of keys
+ final Map<Pair<String, String>, HoodieBloomFilterProbingResult>
fileToLookupResults = new HashMap<>();
+ final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+
+ while (inputItr.hasNext()) {
+ Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult> entry =
inputItr.next();
+ final String partitionPath = entry._1.getPartitionPath();
+ final String fileId = entry._1.getFileId();
+
+ if (!fileIDBaseFileMap.containsKey(fileId)) {
+ Option<HoodieBaseFile> baseFile =
+
baseFileOnlyViewBroadcast.getValue().getLatestBaseFile(partitionPath, fileId);
+ if (!baseFile.isPresent()) {
+ throw new HoodieIndexException("Failed to find the base file for
partition: " + partitionPath
+ + ", fileId: " + fileId);
+ }
+
+ fileIDBaseFileMap.put(fileId, baseFile.get());
+ }
+
+ fileToLookupResults.putIfAbsent(Pair.of(partitionPath,
fileIDBaseFileMap.get(fileId).getFileName()), entry._2);
+
+ if (fileToLookupResults.size() >
BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
+ break;
Review Comment:
This is actually just clustering mechanism -- all files will be checked in
the end (we will iterate t/h whole iterator)
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java:
##########
@@ -66,42 +80,94 @@ public static SparkHoodieBloomIndexHelper getInstance() {
public HoodiePairData<HoodieKey, HoodieRecordLocation>
findMatchingFilesForRecordKeys(
HoodieWriteConfig config, HoodieEngineContext context, HoodieTable
hoodieTable,
HoodiePairData<String, String> partitionRecordKeyPairs,
- HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
+ HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
Map<String, Long> recordsPerPartition) {
- JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
- HoodieJavaRDD.getJavaRDD(fileComparisonPairs)
- .map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight()));
int inputParallelism =
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
- int joinParallelism = Math.max(inputParallelism,
config.getBloomIndexParallelism());
- LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism:
${"
- + config.getBloomIndexParallelism() + "}");
+ int configuredBloomIndexParallelism = config.getBloomIndexParallelism();
+ // NOTE: Target parallelism could be overridden by the config
+ int targetParallelism =
Review Comment:
Correct. Previously we're taking `max(input, configured)` and there was
essentially no way for user to override it
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/AbstractIterator.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * TODO upstream
Review Comment:
This is already addressed
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java:
##########
@@ -119,7 +118,16 @@ public <O> HoodieData<O>
mapPartitions(SerializableFunction<Iterator<T>, Iterato
@Override
public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
- return HoodieJavaRDD.of(rddData.flatMap(e -> func.apply(e)));
+ // NOTE: Unrolling this lambda into a method reference results in
[[ClassCastException]]
+ // due to weird interop b/w Scala and Java
+ return HoodieJavaRDD.of(rddData.flatMap(x -> func.apply(x)));
+ }
Review Comment:
Yeah, it's weird. Let me lookup the history. According to comment this
should have been a class instantiation and not lambda, but it is
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java:
##########
@@ -289,11 +290,13 @@ HoodieData<Pair<String, HoodieKey>>
explodeRecordsWithFileComparisons(
String recordKey = partitionRecordKeyPair.getRight();
String partitionPath = partitionRecordKeyPair.getLeft();
- return indexFileFilter.getMatchingFilesAndPartition(partitionPath,
recordKey).stream()
- .map(partitionFileIdPair -> (Pair<String, HoodieKey>) new
ImmutablePair<>(partitionFileIdPair.getRight(),
- new HoodieKey(recordKey, partitionPath)))
- .collect(Collectors.toList());
- }).flatMap(List::iterator);
+ return indexFileFilter.getMatchingFilesAndPartition(partitionPath,
recordKey)
Review Comment:
Yes. It slightly improves canonical one as well
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.broadcast.Broadcast;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Implementation of the function that probing Bloom Filters of individual
files verifying
+ * whether particular record key could be stored in the latest file-slice of
the file-group
+ * identified by the {@link HoodieFileGroupId}
+ */
+public class HoodieMetadataBloomFilterProbingFunction implements
+ PairFlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, String>>,
HoodieFileGroupId, HoodieBloomFilterProbingResult> {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieMetadataBloomFilterProbingFunction.class);
+
+ // Assuming each file bloom filter takes up 512K, sizing the max file count
+ // per batch so that the total fetched bloom filters would not cross 128 MB.
+ private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256;
+ private final HoodieTable hoodieTable;
+
+ private final Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast;
+
+ /**
+ * NOTE: It's critical for this ctor to accept {@link HoodieTable} to make
sure that it uses
+ * broadcast-ed instance of {@link HoodieBackedTableMetadata}
internally, instead of
+ * one being serialized and deserialized for _every_ task individually
+ *
+ * NOTE: We pass in broadcasted {@link HoodieTableFileSystemView} to make
sure it's materialized
+ * on executor once
+ */
+ public
HoodieMetadataBloomFilterProbingFunction(Broadcast<HoodieTableFileSystemView>
baseFileOnlyViewBroadcast,
+ HoodieTable hoodieTable) {
+ this.baseFileOnlyViewBroadcast = baseFileOnlyViewBroadcast;
+ this.hoodieTable = hoodieTable;
+ }
+
+ @Override
+ public Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>>
call(Iterator<Tuple2<HoodieFileGroupId, String>> tuple2Iterator) throws
Exception {
+ return new FlattenedIterator<>(new
BloomIndexLazyKeyCheckIterator(tuple2Iterator), Function.identity());
+ }
+
+ private class BloomIndexLazyKeyCheckIterator
+ extends LazyIterableIterator<Tuple2<HoodieFileGroupId, String>,
Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>>> {
+
+ public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<HoodieFileGroupId,
String>> tuple2Iterator) {
+ super(tuple2Iterator);
+ }
+
+ @Override
+ protected Iterator<Tuple2<HoodieFileGroupId,
HoodieBloomFilterProbingResult>> computeNext() {
+ // Partition path and file name pair to list of keys
+ final Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new
HashMap<>();
+ final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+
+ while (inputItr.hasNext()) {
+ Tuple2<HoodieFileGroupId, String> entry = inputItr.next();
+ String partitionPath = entry._1.getPartitionPath();
+ String fileId = entry._1.getFileId();
+
+ if (!fileIDBaseFileMap.containsKey(fileId)) {
+ Option<HoodieBaseFile> baseFile =
baseFileOnlyViewBroadcast.getValue().getLatestBaseFile(partitionPath, fileId);
+ if (!baseFile.isPresent()) {
+ throw new HoodieIndexException("Failed to find the base file for
partition: " + partitionPath
+ + ", fileId: " + fileId);
+ }
+ fileIDBaseFileMap.put(fileId, baseFile.get());
+ }
+
+ fileToKeysMap.computeIfAbsent(Pair.of(partitionPath,
fileIDBaseFileMap.get(fileId).getFileName()),
+ k -> new ArrayList<>()).add(new HoodieKey(entry._2,
partitionPath));
+
+ if (fileToKeysMap.size() >
BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
+ break;
Review Comment:
Responded above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]