vinothchandar commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r787235419
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -1432,6 +1432,18 @@ public boolean useBloomIndexBucketizedChecking() {
return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING);
}
+ public boolean isMetaIndexBloomFilterEnabled() {
+ return isMetadataTableEnabled() &&
getMetadataConfig().isMetaIndexBloomFilterEnabled();
+ }
+
+ public boolean isMetaIndexColumnStatsForAllColumns() {
+ return isMetadataTableEnabled() &&
getMetadataConfig().isMetaIndexColumnStatsForAllColumns();
+ }
+
+ public boolean isMetaIndexBloomFilterBatchLoadEnabled() {
+ return isMetadataTableEnabled() &&
getMetadataConfig().isMetaIndexBloomFilterBatchLoadEnabled();
Review comment:
We should remove this config and have one performant way of doing the
bloom checks.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -1432,6 +1432,18 @@ public boolean useBloomIndexBucketizedChecking() {
return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING);
}
+ public boolean isMetaIndexBloomFilterEnabled() {
+ return isMetadataTableEnabled() &&
getMetadataConfig().isMetaIndexBloomFilterEnabled();
+ }
+
+ public boolean isMetaIndexColumnStatsForAllColumns() {
+ return isMetadataTableEnabled() &&
getMetadataConfig().isMetaIndexColumnStatsForAllColumns();
+ }
+
+ public boolean isMetaIndexBloomFilterBatchLoadEnabled() {
Review comment:
lets keep all naming consistent. Meta vs Metadata. Since we call it Hudi
Metadata table, lets name everything `MetadataIndex`
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -171,17 +172,37 @@ protected void commit(HoodieData<HoodieRecord>
hoodieDataRecords, String partiti
/**
* Tag each record with the location in the given partition.
- *
+ * <p>
* The record is tagged with respective file slice's location based on its
record key.
*/
- private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD,
String partitionName, int numFileGroups) {
- List<FileSlice> fileSlices =
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
partitionName);
- ValidationUtils.checkArgument(fileSlices.size() == numFileGroups,
String.format("Invalid number of file groups: found=%d, required=%d",
fileSlices.size(), numFileGroups));
-
- return recordsRDD.map(r -> {
- FileSlice slice =
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
numFileGroups));
- r.setCurrentLocation(new
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
- return r;
- });
+ private JavaRDD<HoodieRecord> prepRecords(Map<MetadataPartitionType,
HoodieData<HoodieRecord>> partitionRecordsMap) {
Review comment:
I went over the caller once. Lets look at the Spark DAG and ensure there
is no double compute happening for preparing the RDD for each partition
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
##########
@@ -22,27 +22,53 @@
import java.util.List;
public enum MetadataPartitionType {
- FILES("files", "files-");
+ // TODO: Make the file group count configurable or auto compute based on the
scaling needs
+ FILES(HoodieTableMetadataUtil.PARTITION_NAME_FILES, "files-", 1),
+ COLUMN_STATS(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS,
"col-stats-", 1),
+ BLOOM_FILTERS(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS,
"bloom-filters-", 1);
Review comment:
We need to make this configurable and make it work for multiple file
groups? We need a JIRA for this.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -133,30 +144,89 @@ public HoodieBloomIndex(HoodieWriteConfig config,
BaseHoodieBloomIndexHelper blo
/**
* Load all involved files as <Partition, filename> pair List.
*/
- List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(
+ List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
List<String> partitions, final HoodieEngineContext context, final
HoodieTable hoodieTable) {
// Obtain the latest data files from all the partitions.
List<Pair<String, String>> partitionPathFileIDList =
getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
.collect(toList());
- if (config.getBloomIndexPruneByRanges()) {
- // also obtain file ranges, if range pruning is enabled
- context.setJobStatus(this.getClass().getName(), "Obtain key ranges for
file slices (range pruning=on)");
- return context.map(partitionPathFileIDList, pf -> {
- try {
- HoodieRangeInfoHandle rangeInfoHandle = new
HoodieRangeInfoHandle(config, hoodieTable, pf);
- String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
- return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(),
minMaxKeys[0], minMaxKeys[1]));
- } catch (MetadataNotFoundException me) {
- LOG.warn("Unable to find range metadata in file :" + pf);
- return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+ context.setJobStatus(this.getClass().getName(), "Obtain key ranges for
file slices (range pruning=on)");
+ return context.map(partitionPathFileIDList, pf -> {
+ try {
+ HoodieRangeInfoHandle rangeInfoHandle = new
HoodieRangeInfoHandle(config, hoodieTable, pf);
+ String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+ return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(),
minMaxKeys[0], minMaxKeys[1]));
+ } catch (MetadataNotFoundException me) {
+ LOG.warn("Unable to find range metadata in file :" + pf);
+ return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+ }
+ }, Math.max(partitionPathFileIDList.size(), 1));
+ }
+
+ /**
+ * Get the latest base files for the requested partitions.
+ *
+ * @param partitions - List of partitions to get the base files for
+ * @param context - Engine context
+ * @param hoodieTable - Hoodie Table
+ * @return List of partition and file column range info pairs
+ */
+ List<Pair<String, BloomIndexFileInfo>> getLatestBaseFilesForPartitions(
+ List<String> partitions, final HoodieEngineContext context, final
HoodieTable hoodieTable) {
+ List<Pair<String, String>> partitionPathFileIDList =
getLatestBaseFilesForAllPartitions(partitions, context,
+ hoodieTable).stream()
+ .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
+ .collect(toList());
+ return partitionPathFileIDList.stream()
+ .map(pf -> Pair.of(pf.getKey(), new
BloomIndexFileInfo(pf.getValue()))).collect(toList());
+ }
+
+ /**
+ * Load the column stats index as BloomIndexFileInfo for all the involved
files in the partition.
+ *
+ * @param partitions - List of partitions for which column stats need to be
loaded
+ * @param context - Engine context
+ * @param hoodieTable - Hoodie table
+ * @return List of partition and file column range info pairs
+ */
+ List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(
+ List<String> partitions, final HoodieEngineContext context, final
HoodieTable hoodieTable) {
+ // also obtain file ranges, if range pruning is enabled
+ context.setJobStatus(this.getClass().getName(), "Load meta index key
ranges for file slices");
+
+ final String keyField =
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+ return context.flatMap(partitions, partitionName -> {
Review comment:
does all of this handle unpartitioned tables?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -133,30 +144,89 @@ public HoodieBloomIndex(HoodieWriteConfig config,
BaseHoodieBloomIndexHelper blo
/**
* Load all involved files as <Partition, filename> pair List.
*/
- List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(
+ List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
List<String> partitions, final HoodieEngineContext context, final
HoodieTable hoodieTable) {
// Obtain the latest data files from all the partitions.
List<Pair<String, String>> partitionPathFileIDList =
getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
.collect(toList());
- if (config.getBloomIndexPruneByRanges()) {
- // also obtain file ranges, if range pruning is enabled
- context.setJobStatus(this.getClass().getName(), "Obtain key ranges for
file slices (range pruning=on)");
- return context.map(partitionPathFileIDList, pf -> {
- try {
- HoodieRangeInfoHandle rangeInfoHandle = new
HoodieRangeInfoHandle(config, hoodieTable, pf);
- String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
- return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(),
minMaxKeys[0], minMaxKeys[1]));
- } catch (MetadataNotFoundException me) {
- LOG.warn("Unable to find range metadata in file :" + pf);
- return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+ context.setJobStatus(this.getClass().getName(), "Obtain key ranges for
file slices (range pruning=on)");
+ return context.map(partitionPathFileIDList, pf -> {
+ try {
+ HoodieRangeInfoHandle rangeInfoHandle = new
HoodieRangeInfoHandle(config, hoodieTable, pf);
+ String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+ return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(),
minMaxKeys[0], minMaxKeys[1]));
+ } catch (MetadataNotFoundException me) {
+ LOG.warn("Unable to find range metadata in file :" + pf);
+ return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+ }
+ }, Math.max(partitionPathFileIDList.size(), 1));
+ }
+
+ /**
+ * Get the latest base files for the requested partitions.
+ *
+ * @param partitions - List of partitions to get the base files for
+ * @param context - Engine context
+ * @param hoodieTable - Hoodie Table
+ * @return List of partition and file column range info pairs
+ */
+ List<Pair<String, BloomIndexFileInfo>> getLatestBaseFilesForPartitions(
+ List<String> partitions, final HoodieEngineContext context, final
HoodieTable hoodieTable) {
+ List<Pair<String, String>> partitionPathFileIDList =
getLatestBaseFilesForAllPartitions(partitions, context,
+ hoodieTable).stream()
+ .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
+ .collect(toList());
+ return partitionPathFileIDList.stream()
+ .map(pf -> Pair.of(pf.getKey(), new
BloomIndexFileInfo(pf.getValue()))).collect(toList());
+ }
+
+ /**
+ * Load the column stats index as BloomIndexFileInfo for all the involved
files in the partition.
+ *
+ * @param partitions - List of partitions for which column stats need to be
loaded
+ * @param context - Engine context
+ * @param hoodieTable - Hoodie table
+ * @return List of partition and file column range info pairs
+ */
+ List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(
+ List<String> partitions, final HoodieEngineContext context, final
HoodieTable hoodieTable) {
+ // also obtain file ranges, if range pruning is enabled
+ context.setJobStatus(this.getClass().getName(), "Load meta index key
ranges for file slices");
+
+ final String keyField =
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+ return context.flatMap(partitions, partitionName -> {
+ List<String> partitionFileNameList =
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
+ hoodieTable).stream().map(baseFile -> baseFile.getFileName())
+ .collect(toList());
+ try {
+ List<Pair<String, String>> columnStatKeys = new ArrayList<>();
+ for (String fileName : partitionFileNameList) {
+ Pair<String, String> partitionFileNameKey = Pair.of(partitionName,
fileName);
+ columnStatKeys.add(partitionFileNameKey);
}
- }, Math.max(partitionPathFileIDList.size(), 1));
- } else {
- return partitionPathFileIDList.stream()
- .map(pf -> Pair.of(pf.getKey(), new
BloomIndexFileInfo(pf.getValue()))).collect(toList());
- }
+ if (columnStatKeys.isEmpty()) {
+ return Stream.empty();
+ }
+
+ Collections.sort(columnStatKeys);
+ Map<Pair<String, String>, HoodieColumnStats> fileToColumnStatMap =
hoodieTable
+ .getMetadataTable().getColumnStats(columnStatKeys, keyField);
Review comment:
nit: Pair<String, String> is a less than ideal API for partitionPath and
file or something. lets go with `getColumnStats(Option<String> partitionPath,
String fileName)`
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -171,17 +172,37 @@ protected void commit(HoodieData<HoodieRecord>
hoodieDataRecords, String partiti
/**
* Tag each record with the location in the given partition.
- *
+ * <p>
* The record is tagged with respective file slice's location based on its
record key.
*/
- private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD,
String partitionName, int numFileGroups) {
- List<FileSlice> fileSlices =
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
partitionName);
- ValidationUtils.checkArgument(fileSlices.size() == numFileGroups,
String.format("Invalid number of file groups: found=%d, required=%d",
fileSlices.size(), numFileGroups));
-
- return recordsRDD.map(r -> {
- FileSlice slice =
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
numFileGroups));
- r.setCurrentLocation(new
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
- return r;
- });
+ private JavaRDD<HoodieRecord> prepRecords(Map<MetadataPartitionType,
HoodieData<HoodieRecord>> partitionRecordsMap) {
+ // The result set
+ JavaRDD<HoodieRecord> rddAllPartitionRecords = null;
+
+ for (Map.Entry<MetadataPartitionType, HoodieData<HoodieRecord>> entry :
partitionRecordsMap.entrySet()) {
+ final String partitionName = entry.getKey().partitionPath();
+ final int fileGroupCount = entry.getKey().getFileGroupCount();
+ HoodieData<HoodieRecord> records = entry.getValue();
+ JavaRDD<HoodieRecord> recordsRDD = (JavaRDD<HoodieRecord>) records.get();
+
+ List<FileSlice> fileSlices =
+
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
partitionName);
+ ValidationUtils.checkArgument(fileSlices.size() == fileGroupCount,
+ String.format("Invalid number of file groups: found=%d,
required=%d", fileSlices.size(), fileGroupCount));
+
+ JavaRDD<HoodieRecord> rddSinglePartitionRecords = recordsRDD.map(r -> {
+ FileSlice slice =
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
+ fileGroupCount));
+ r.setCurrentLocation(new
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
+ return r;
+ });
+
+ if (rddAllPartitionRecords == null) {
Review comment:
No "null"s as sentinels
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexLazyCheckFunction.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.io.HoodieKeyLookupHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Spark Function2 implementation for checking bloom filters for the
+ * requested keys from the metadata table index. The bloom filter
+ * checking for keys and the actual file verification for the
+ * candidate keys is done in a lazy fashion of file by file.
+ */
+public class HoodieBloomMetaIndexLazyCheckFunction implements
Function2<Integer, Iterator<Tuple2<String, HoodieKey>>,
Iterator<List<HoodieKeyLookupResult>>> {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieBloomMetaIndexLazyCheckFunction.class);
+ private final HoodieTable hoodieTable;
+ private final HoodieWriteConfig config;
+
+ public HoodieBloomMetaIndexLazyCheckFunction(HoodieTable hoodieTable,
HoodieWriteConfig config) {
+ this.hoodieTable = hoodieTable;
+ this.config = config;
+ }
+
+ @Override
+ public Iterator<List<HoodieKeyLookupResult>> call(Integer integer,
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+ return new LazyKeyCheckIterator(tuple2Iterator);
+ }
+
+ class LazyKeyCheckIterator extends LazyIterableIterator<Tuple2<String,
HoodieKey>, List<HoodieKeyLookupResult>> {
+
+ private HoodieKeyLookupHandle keyLookupHandle;
+
+ LazyKeyCheckIterator(Iterator<Tuple2<String, HoodieKey>>
filePartitionRecordKeyTripletItr) {
+ super(filePartitionRecordKeyTripletItr);
+ }
+
+ @Override
+ protected void start() {
+ }
+
+ @Override
+ protected List<HoodieKeyLookupResult> computeNext() {
+
+ List<HoodieKeyLookupResult> ret = new ArrayList<>();
+ try {
+ final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+ while (inputItr.hasNext()) {
+ Tuple2<String, HoodieKey> currentTuple = inputItr.next();
+ final String partitionPath = currentTuple._2.getPartitionPath();
+ final String recordKey = currentTuple._2.getRecordKey();
+ final String fileId = currentTuple._1;
+ ValidationUtils.checkState(!fileId.isEmpty());
+ if (!fileIDBaseFileMap.containsKey(fileId)) {
+ Option<HoodieBaseFile> baseFile =
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
+ if (!baseFile.isPresent()) {
+ throw new HoodieIndexException("Failed to find the base file for
partition: " + partitionPath
+ + ", fileId: " + fileId);
+ }
+ fileIDBaseFileMap.put(fileId, baseFile.get());
+ }
+ final Pair<String, String> partitionPathFileIdPair =
Pair.of(partitionPath, fileId);
+
+ // lazily init state
+ if (keyLookupHandle == null) {
+ keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable,
partitionPathFileIdPair,
Review comment:
I think the issue here is that we open the metadata table again and
again each time. Why can't we open the bloom partition once here and pass it
into the handle? That way we only incur the merge cost once?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -171,17 +172,37 @@ protected void commit(HoodieData<HoodieRecord>
hoodieDataRecords, String partiti
/**
* Tag each record with the location in the given partition.
- *
+ * <p>
* The record is tagged with respective file slice's location based on its
record key.
*/
- private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD,
String partitionName, int numFileGroups) {
- List<FileSlice> fileSlices =
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
partitionName);
- ValidationUtils.checkArgument(fileSlices.size() == numFileGroups,
String.format("Invalid number of file groups: found=%d, required=%d",
fileSlices.size(), numFileGroups));
-
- return recordsRDD.map(r -> {
- FileSlice slice =
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
numFileGroups));
- r.setCurrentLocation(new
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
- return r;
- });
+ private JavaRDD<HoodieRecord> prepRecords(Map<MetadataPartitionType,
HoodieData<HoodieRecord>> partitionRecordsMap) {
+ // The result set
+ JavaRDD<HoodieRecord> rddAllPartitionRecords = null;
Review comment:
cant we use a empty RDD here
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -86,20 +125,63 @@ public HoodieMetadataPayload(Option<GenericRecord> record)
{
// https://issues.apache.org/jira/browse/AVRO-1811
key = record.get().get(SCHEMA_FIELD_ID_KEY).toString();
type = (int) record.get().get(SCHEMA_FIELD_ID_TYPE);
- if (record.get().get(SCHEMA_FIELD_ID_METADATA) != null) {
- filesystemMetadata = (Map<String, HoodieMetadataFileInfo>)
record.get().get("filesystemMetadata");
+ if (record.get().get(SCHEMA_FIELD_ID_FILESYSTEM) != null) {
+ filesystemMetadata = (Map<String, HoodieMetadataFileInfo>)
record.get().get(SCHEMA_FIELD_ID_FILESYSTEM);
filesystemMetadata.keySet().forEach(k -> {
GenericRecord v = filesystemMetadata.get(k);
filesystemMetadata.put(k.toString(), new
HoodieMetadataFileInfo((Long) v.get("size"), (Boolean) v.get("isDeleted")));
});
}
+
+ if (type == METADATA_TYPE_BLOOM_FILTER) {
Review comment:
we need to pull the merging for each partition type into its own payload
class and keep this just as a delegator. lets do this in this PR
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
##########
@@ -46,52 +50,54 @@
private static final Logger LOG =
LogManager.getLogger(HoodieKeyLookupHandle.class);
- private final HoodieTableType tableType;
-
private final BloomFilter bloomFilter;
-
private final List<String> candidateRecordKeys;
-
+ private final boolean useMetadataTableIndex;
+ private Option<String> fileName = Option.empty();
private long totalKeysChecked;
public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K,
O> hoodieTable,
- Pair<String, String> partitionPathFilePair) {
- super(config, null, hoodieTable, partitionPathFilePair);
- this.tableType = hoodieTable.getMetaClient().getTableType();
+ Pair<String, String> partitionPathFileIDPair) {
+ this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
+ }
+
+ public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K,
O> hoodieTable,
+ Pair<String, String> partitionPathFileIDPair,
Option<String> fileName,
+ boolean useMetadataTableIndex) {
+ super(config, null, hoodieTable, partitionPathFileIDPair);
this.candidateRecordKeys = new ArrayList<>();
this.totalKeysChecked = 0;
- HoodieTimer timer = new HoodieTimer().startTimer();
-
- try {
- this.bloomFilter = createNewFileReader().readBloomFilter();
- } catch (IOException e) {
- throw new HoodieIndexException(String.format("Error reading bloom filter
from %s: %s", partitionPathFilePair, e));
+ if (fileName.isPresent()) {
+
ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()));
+ this.fileName = fileName;
}
- LOG.info(String.format("Read bloom filter from %s in %d ms",
partitionPathFilePair, timer.endTimer()));
+ this.useMetadataTableIndex = useMetadataTableIndex;
+ this.bloomFilter = getBloomFilter();
}
- /**
- * Given a list of row keys and one file, return only row keys existing in
that file.
- */
- public List<String> checkCandidatesAgainstFile(Configuration configuration,
List<String> candidateRecordKeys,
- Path filePath) throws
HoodieIndexException {
- List<String> foundRecordKeys = new ArrayList<>();
+ private BloomFilter getBloomFilter() {
+ BloomFilter bloomFilter = null;
+ HoodieTimer timer = new HoodieTimer().startTimer();
try {
- // Load all rowKeys from the file, to double-confirm
- if (!candidateRecordKeys.isEmpty()) {
- HoodieTimer timer = new HoodieTimer().startTimer();
- Set<String> fileRowKeys = createNewFileReader().filterRowKeys(new
HashSet<>(candidateRecordKeys));
- foundRecordKeys.addAll(fileRowKeys);
- LOG.info(String.format("Checked keys against file %s, in %d ms.
#candidates (%d) #found (%d)", filePath,
- timer.endTimer(), candidateRecordKeys.size(),
foundRecordKeys.size()));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Keys matching for file " + filePath + " => " +
foundRecordKeys);
+ if (this.useMetadataTableIndex) {
+ ValidationUtils.checkArgument(this.fileName.isPresent());
+ Option<ByteBuffer> bloomFilterByteBuffer =
+
hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(),
fileName.get());
+ if (!bloomFilterByteBuffer.isPresent()) {
+ throw new HoodieIndexException("BloomFilter missing for " +
partitionPathFileIDPair.getRight());
}
+ bloomFilter =
+ new
HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(bloomFilterByteBuffer.get()).toString(),
+ BloomFilterTypeCode.DYNAMIC_V0);
+ } else {
+ bloomFilter = createNewFileReader().readBloomFilter();
Review comment:
do we close this reader? it should be a auto-closeable?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -124,6 +124,37 @@
.sinceVersion("0.10.0")
.withDocumentation("Enable full scanning of log files while reading log
records. If disabled, hudi does look up of only interested entries.");
+ public static final ConfigProperty<Boolean> ENABLE_META_INDEX_BLOOM_FILTER =
ConfigProperty
Review comment:
all the docs here need to be tightened up based on Hudi terminology
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexLazyCheckFunction.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.io.HoodieKeyLookupHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Spark Function2 implementation for checking bloom filters for the
+ * requested keys from the metadata table index. The bloom filter
+ * checking for keys and the actual file verification for the
+ * candidate keys is done in a lazy fashion of file by file.
+ */
+public class HoodieBloomMetaIndexLazyCheckFunction implements
Function2<Integer, Iterator<Tuple2<String, HoodieKey>>,
Iterator<List<HoodieKeyLookupResult>>> {
Review comment:
we need to rein in the code duplication in these classes.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -133,30 +144,89 @@ public HoodieBloomIndex(HoodieWriteConfig config,
BaseHoodieBloomIndexHelper blo
/**
* Load all involved files as <Partition, filename> pair List.
*/
- List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(
+ List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
List<String> partitions, final HoodieEngineContext context, final
HoodieTable hoodieTable) {
// Obtain the latest data files from all the partitions.
List<Pair<String, String>> partitionPathFileIDList =
getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
.collect(toList());
- if (config.getBloomIndexPruneByRanges()) {
- // also obtain file ranges, if range pruning is enabled
- context.setJobStatus(this.getClass().getName(), "Obtain key ranges for
file slices (range pruning=on)");
- return context.map(partitionPathFileIDList, pf -> {
- try {
- HoodieRangeInfoHandle rangeInfoHandle = new
HoodieRangeInfoHandle(config, hoodieTable, pf);
- String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
- return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(),
minMaxKeys[0], minMaxKeys[1]));
- } catch (MetadataNotFoundException me) {
- LOG.warn("Unable to find range metadata in file :" + pf);
- return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+ context.setJobStatus(this.getClass().getName(), "Obtain key ranges for
file slices (range pruning=on)");
+ return context.map(partitionPathFileIDList, pf -> {
+ try {
+ HoodieRangeInfoHandle rangeInfoHandle = new
HoodieRangeInfoHandle(config, hoodieTable, pf);
+ String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+ return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(),
minMaxKeys[0], minMaxKeys[1]));
+ } catch (MetadataNotFoundException me) {
+ LOG.warn("Unable to find range metadata in file :" + pf);
+ return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+ }
+ }, Math.max(partitionPathFileIDList.size(), 1));
+ }
+
+ /**
+ * Get the latest base files for the requested partitions.
+ *
+ * @param partitions - List of partitions to get the base files for
+ * @param context - Engine context
+ * @param hoodieTable - Hoodie Table
+ * @return List of partition and file column range info pairs
+ */
+ List<Pair<String, BloomIndexFileInfo>> getLatestBaseFilesForPartitions(
Review comment:
Also rename. Its a bit misleading that its called getLatestBaseFiles*
when its returning BloomIndexFileInfo
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
##########
@@ -46,52 +50,54 @@
private static final Logger LOG =
LogManager.getLogger(HoodieKeyLookupHandle.class);
- private final HoodieTableType tableType;
-
private final BloomFilter bloomFilter;
-
private final List<String> candidateRecordKeys;
-
+ private final boolean useMetadataTableIndex;
+ private Option<String> fileName = Option.empty();
private long totalKeysChecked;
public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K,
O> hoodieTable,
- Pair<String, String> partitionPathFilePair) {
- super(config, null, hoodieTable, partitionPathFilePair);
- this.tableType = hoodieTable.getMetaClient().getTableType();
+ Pair<String, String> partitionPathFileIDPair) {
+ this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
+ }
+
+ public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K,
O> hoodieTable,
+ Pair<String, String> partitionPathFileIDPair,
Option<String> fileName,
+ boolean useMetadataTableIndex) {
+ super(config, null, hoodieTable, partitionPathFileIDPair);
Review comment:
yes lets not use `null` anywhere new in this PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]