nsivabalan commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r786179763
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -111,13 +116,19 @@ public HoodieBloomIndex(HoodieWriteConfig config,
BaseHoodieBloomIndexHelper blo
private HoodiePairData<HoodieKey, HoodieRecordLocation> lookupIndex(
HoodiePairData<String, String> partitionRecordKeyPairs, final
HoodieEngineContext context,
final HoodieTable hoodieTable) {
- // Obtain records per partition, in the incoming records
+ // Step 1: Obtain records per partition, in the incoming records
Map<String, Long> recordsPerPartition =
partitionRecordKeyPairs.countByKey();
List<String> affectedPartitionPathList = new
ArrayList<>(recordsPerPartition.keySet());
// Step 2: Load all involved files as <Partition, filename> pairs
- List<Pair<String, BloomIndexFileInfo>> fileInfoList =
- loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
+ List<Pair<String, BloomIndexFileInfo>> fileInfoList;
+ if (config.getBloomIndexPruneByRanges()) {
+ fileInfoList =
(config.getMetadataConfig().isMetaIndexColumnStatsEnabled()
+ ? loadColumnStats(affectedPartitionPathList, context, hoodieTable)
Review comment:
can we rename loadColumnStats to loadInvolvedFilesFromMetadata or
something.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -140,23 +151,83 @@ public HoodieBloomIndex(HoodieWriteConfig config,
BaseHoodieBloomIndexHelper blo
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
.collect(toList());
- if (config.getBloomIndexPruneByRanges()) {
- // also obtain file ranges, if range pruning is enabled
- context.setJobStatus(this.getClass().getName(), "Obtain key ranges for
file slices (range pruning=on)");
- return context.map(partitionPathFileIDList, pf -> {
- try {
- HoodieRangeInfoHandle rangeInfoHandle = new
HoodieRangeInfoHandle(config, hoodieTable, pf);
- String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
- return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(),
minMaxKeys[0], minMaxKeys[1]));
- } catch (MetadataNotFoundException me) {
- LOG.warn("Unable to find range metadata in file :" + pf);
- return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+ context.setJobStatus(this.getClass().getName(), "Obtain key ranges for
file slices (range pruning=on)");
+ return context.map(partitionPathFileIDList, pf -> {
+ try {
+ HoodieRangeInfoHandle rangeInfoHandle = new
HoodieRangeInfoHandle(config, hoodieTable, pf);
+ String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+ return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(),
minMaxKeys[0], minMaxKeys[1]));
+ } catch (MetadataNotFoundException me) {
+ LOG.warn("Unable to find range metadata in file :" + pf);
+ return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+ }
+ }, Math.max(partitionPathFileIDList.size(), 1));
+ }
+
+ /**
+ * Get the latest base files for the requested partitions.
+ *
+ * @param partitions - List of partitions to get the base files for
+ * @param context - Engine context
+ * @param hoodieTable - Hoodie Table
+ * @return List of partition and file column range info pairs
+ */
+ List<Pair<String, BloomIndexFileInfo>> getLatestBaseFilesForPartitions(
+ List<String> partitions, final HoodieEngineContext context, final
HoodieTable hoodieTable) {
+ List<Pair<String, String>> partitionPathFileIDList =
getLatestBaseFilesForAllPartitions(partitions, context,
+ hoodieTable).stream()
+ .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
+ .collect(toList());
+ return partitionPathFileIDList.stream()
+ .map(pf -> Pair.of(pf.getKey(), new
BloomIndexFileInfo(pf.getValue()))).collect(toList());
+ }
+
+ /**
+ * Load the column stats index as BloomIndexFileInfo for all the involved
files in the partition.
+ *
+ * @param partitions - List of partitions for which column stats need to be
loaded
+ * @param context - Engine context
+ * @param hoodieTable - Hoodie table
+ * @return List of partition and file column range info pairs
+ */
+ List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+ List<String> partitions, final HoodieEngineContext context, final
HoodieTable hoodieTable) {
+ // also obtain file ranges, if range pruning is enabled
+ context.setJobStatus(this.getClass().getName(), "Load meta index key
ranges for file slices");
+
+ final String keyField =
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+ return context.flatMap(partitions, partitionName -> {
+ List<Pair<String, String>> partitionFileIdList =
Review comment:
fileIdFileNameList
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId,
hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+ Function2<Integer, Iterator<Tuple2<String, HoodieKey>>,
Iterator<List<HoodieKeyLookupResult>>> {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+ private final HoodieTable hoodieTable;
+ private final HoodieWriteConfig config;
+
+ public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable,
HoodieWriteConfig config) {
+ this.hoodieTable = hoodieTable;
+ this.config = config;
+ }
+
+ @Override
+ public Iterator<List<HoodieKeyLookupResult>> call(Integer integer,
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+ List<List<HoodieKeyLookupResult>> resultList = new ArrayList<>();
+ Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+
+ final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+ while (tuple2Iterator.hasNext()) {
+ Tuple2<String, HoodieKey> entry = tuple2Iterator.next();
+ final String partitionPath = entry._2.getPartitionPath();
+ final String fileId = entry._1;
+ if (!fileIDBaseFileMap.containsKey(fileId)) {
Review comment:
wondering why do we need fileIDBaseFileMap. for a given task, we may not
get repeated files right? so, why not directly go from fileId to
getLatestBaseFile and to go fileName. am I missing something here.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId,
hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+ Function2<Integer, Iterator<Tuple2<String, HoodieKey>>,
Iterator<List<HoodieKeyLookupResult>>> {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+ private final HoodieTable hoodieTable;
+ private final HoodieWriteConfig config;
+
+ public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable,
HoodieWriteConfig config) {
+ this.hoodieTable = hoodieTable;
+ this.config = config;
+ }
+
+ @Override
+ public Iterator<List<HoodieKeyLookupResult>> call(Integer integer,
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+ List<List<HoodieKeyLookupResult>> resultList = new ArrayList<>();
+ Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+
+ final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+ while (tuple2Iterator.hasNext()) {
+ Tuple2<String, HoodieKey> entry = tuple2Iterator.next();
+ final String partitionPath = entry._2.getPartitionPath();
+ final String fileId = entry._1;
+ if (!fileIDBaseFileMap.containsKey(fileId)) {
+ Option<HoodieBaseFile> baseFile =
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
+ if (!baseFile.isPresent()) {
+ throw new HoodieIndexException("Failed to find the base file for
partition: " + partitionPath
+ + ", fileId: " + fileId);
+ }
+ fileIDBaseFileMap.put(fileId, baseFile.get());
+ }
+ fileToKeysMap.computeIfAbsent(Pair.of(partitionPath,
fileIDBaseFileMap.get(fileId).getFileName()),
+ k -> new ArrayList<>()).add(entry._2);
+ }
+ if (fileToKeysMap.isEmpty()) {
+ return Collections.emptyListIterator();
+ }
+
+ List<Pair<String, String>> partitionNameFileNameList =
+ fileToKeysMap.keySet().stream().map(partitionNameFileNamePair -> {
+ return Pair.of(partitionNameFileNamePair.getLeft(),
partitionNameFileNamePair.getRight());
+ }).collect(Collectors.toList());
+
+ Map<Pair<String, String>, ByteBuffer> fileIDToBloomFilterByteBufferMap =
+
hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList);
+
+ fileToKeysMap.forEach((partitionPathFileIdPair, hoodieKeyList) -> {
+ final String partitionPath = partitionPathFileIdPair.getLeft();
+ final String fileName = partitionPathFileIdPair.getRight();
+ final Pair<String, String> partitionFileNamePair =
Pair.of(partitionPath, fileName);
Review comment:
why do we need this variable explicitly. we can try to use
partitionPathFileIdPair as is.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
##########
@@ -70,26 +69,44 @@ public static SparkHoodieBloomIndexHelper getInstance() {
JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
HoodieJavaRDD.getJavaRDD(fileComparisonPairs)
.map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight()));
- Map<String, Long> comparisonsPerFileGroup = computeComparisonsPerFileGroup(
- config, recordsPerPartition, partitionToFileInfo, fileComparisonsRDD,
context);
- int inputParallelism =
-
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
+
+ int inputParallelism =
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
int joinParallelism = Math.max(inputParallelism,
config.getBloomIndexParallelism());
LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism:
${"
+ config.getBloomIndexParallelism() + "}");
- if (config.useBloomIndexBucketizedChecking()) {
+ JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
+ if (config.isMetaIndexBloomFilterEnabled()) {
+ // Step 1: Sort by file id
+ JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
+ fileComparisonsRDD.sortBy(entry -> entry._1, true, joinParallelism);
Review comment:
minor. can also do Tuple2::_1 instead of entry -> entry._1
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -171,17 +172,37 @@ protected void commit(HoodieData<HoodieRecord>
hoodieDataRecords, String partiti
/**
* Tag each record with the location in the given partition.
- *
+ * <p>
* The record is tagged with respective file slice's location based on its
record key.
*/
- private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD,
String partitionName, int numFileGroups) {
- List<FileSlice> fileSlices =
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
partitionName);
- ValidationUtils.checkArgument(fileSlices.size() == numFileGroups,
String.format("Invalid number of file groups: found=%d, required=%d",
fileSlices.size(), numFileGroups));
-
- return recordsRDD.map(r -> {
- FileSlice slice =
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
numFileGroups));
- r.setCurrentLocation(new
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
- return r;
- });
+ private JavaRDD<HoodieRecord> prepRecords(Map<MetadataPartitionType,
HoodieData<HoodieRecord>> partitionRecordsMap) {
+ // The result set
+ JavaRDD<HoodieRecord> rddAllPartitionRecords = null;
+
+ for (Map.Entry<MetadataPartitionType, HoodieData<HoodieRecord>> entry :
partitionRecordsMap.entrySet()) {
+ final String partitionName = entry.getKey().partitionPath();
+ final int fileGroupCount = entry.getKey().getFileGroupCount();
+ HoodieData<HoodieRecord> records = entry.getValue();
+ JavaRDD<HoodieRecord> recordsRDD = (JavaRDD<HoodieRecord>) records.get();
+
+ List<FileSlice> fileSlices =
+
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
partitionName);
+ ValidationUtils.checkArgument(fileSlices.size() == fileGroupCount,
+ String.format("Invalid number of file groups: found=%d,
required=%d", fileSlices.size(), fileGroupCount));
+
+ JavaRDD<HoodieRecord> rddSinglePartitionRecords = recordsRDD.map(r -> {
Review comment:
Probably its intentinal, just wanted to confirm. We have a map of
partition type to list of records to be prepped. So, we have a choice to
parallelize across diff partition types and go sequential for all records
within a partition or go sequential for diff partition types and parallelize
across diff records within each partition. We are going w/ latter.
L182 to L198
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -125,30 +129,43 @@ private void initIfNeeded() {
return recordsByKeys.size() == 0 ? Option.empty() :
recordsByKeys.get(0).getValue();
}
- protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
getRecordsByKeys(List<String> keys, String partitionName) {
- Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
openReadersIfNeeded(keys.get(0), partitionName);
- try {
- List<Long> timings = new ArrayList<>();
- HoodieFileReader baseFileReader = readers.getKey();
- HoodieMetadataMergedLogRecordReader logRecordScanner =
readers.getRight();
+ @Override
+ protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
getRecordsByKeys(List<String> keys,
+
String partitionName) {
+ Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap =
getPartitionFileSlices(partitionName, keys);
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result =
new ArrayList<>();
+ AtomicInteger fileSlicesKeysCount = new AtomicInteger();
+ partitionFileSliceToKeysMap.forEach((partitionFileSlicePair,
fileSliceKeys) -> {
Review comment:
based on offline sync up, in this patch we have only one filegroup for
every partition in metadata table. This needs to be revisited when we fix this
in a follow up patch where we add more file groups for col stats and bloom
index partition.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -140,23 +151,83 @@ public HoodieBloomIndex(HoodieWriteConfig config,
BaseHoodieBloomIndexHelper blo
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
.collect(toList());
- if (config.getBloomIndexPruneByRanges()) {
- // also obtain file ranges, if range pruning is enabled
- context.setJobStatus(this.getClass().getName(), "Obtain key ranges for
file slices (range pruning=on)");
- return context.map(partitionPathFileIDList, pf -> {
- try {
- HoodieRangeInfoHandle rangeInfoHandle = new
HoodieRangeInfoHandle(config, hoodieTable, pf);
- String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
- return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(),
minMaxKeys[0], minMaxKeys[1]));
- } catch (MetadataNotFoundException me) {
- LOG.warn("Unable to find range metadata in file :" + pf);
- return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+ context.setJobStatus(this.getClass().getName(), "Obtain key ranges for
file slices (range pruning=on)");
+ return context.map(partitionPathFileIDList, pf -> {
+ try {
+ HoodieRangeInfoHandle rangeInfoHandle = new
HoodieRangeInfoHandle(config, hoodieTable, pf);
+ String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+ return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(),
minMaxKeys[0], minMaxKeys[1]));
+ } catch (MetadataNotFoundException me) {
+ LOG.warn("Unable to find range metadata in file :" + pf);
+ return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+ }
+ }, Math.max(partitionPathFileIDList.size(), 1));
+ }
+
+ /**
+ * Get the latest base files for the requested partitions.
+ *
+ * @param partitions - List of partitions to get the base files for
+ * @param context - Engine context
+ * @param hoodieTable - Hoodie Table
+ * @return List of partition and file column range info pairs
+ */
+ List<Pair<String, BloomIndexFileInfo>> getLatestBaseFilesForPartitions(
+ List<String> partitions, final HoodieEngineContext context, final
HoodieTable hoodieTable) {
+ List<Pair<String, String>> partitionPathFileIDList =
getLatestBaseFilesForAllPartitions(partitions, context,
+ hoodieTable).stream()
+ .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
+ .collect(toList());
+ return partitionPathFileIDList.stream()
+ .map(pf -> Pair.of(pf.getKey(), new
BloomIndexFileInfo(pf.getValue()))).collect(toList());
+ }
+
+ /**
+ * Load the column stats index as BloomIndexFileInfo for all the involved
files in the partition.
+ *
+ * @param partitions - List of partitions for which column stats need to be
loaded
+ * @param context - Engine context
+ * @param hoodieTable - Hoodie table
+ * @return List of partition and file column range info pairs
+ */
+ List<Pair<String, BloomIndexFileInfo>> loadColumnStats(
+ List<String> partitions, final HoodieEngineContext context, final
HoodieTable hoodieTable) {
+ // also obtain file ranges, if range pruning is enabled
+ context.setJobStatus(this.getClass().getName(), "Load meta index key
ranges for file slices");
+
+ final String keyField =
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+ return context.flatMap(partitions, partitionName -> {
+ List<Pair<String, String>> partitionFileIdList =
Review comment:
btw, do we even need fileId. I don't see it being used anywhere. Can we
just fetch fileName only.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
##########
@@ -101,4 +116,34 @@ public static HoodieRecord getTaggedRecord(HoodieRecord
inputRecord, Option<Hood
}
return record;
}
+
+ /**
+ * Given a list of row keys and one file, return only row keys existing in
that file.
+ *
+ * @param filePath - File to filter keys from
+ * @param candidateRecordKeys - Candidate keys to filter
+ * @return List of candidate keys that are available in the file
+ */
+ public static List<String> filterKeysFromFile(Path filePath, List<String>
candidateRecordKeys,
Review comment:
do we have UTs for this.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
##########
@@ -46,52 +50,57 @@
private static final Logger LOG =
LogManager.getLogger(HoodieKeyLookupHandle.class);
- private final HoodieTableType tableType;
-
private final BloomFilter bloomFilter;
-
private final List<String> candidateRecordKeys;
-
+ private final Option<String> fileName;
+ private final boolean useMetadataTableIndex;
private long totalKeysChecked;
public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K,
O> hoodieTable,
- Pair<String, String> partitionPathFilePair) {
- super(config, null, hoodieTable, partitionPathFilePair);
- this.tableType = hoodieTable.getMetaClient().getTableType();
+ Pair<String, String> partitionPathFileIDPair) {
+ this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
+ }
+
+ public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K,
O> hoodieTable,
+ Pair<String, String> partitionPathFileIDPair,
Option<String> fileName,
+ boolean useMetadataTableIndex) {
+ super(config, null, hoodieTable, partitionPathFileIDPair);
this.candidateRecordKeys = new ArrayList<>();
this.totalKeysChecked = 0;
- HoodieTimer timer = new HoodieTimer().startTimer();
-
- try {
- this.bloomFilter = createNewFileReader().readBloomFilter();
- } catch (IOException e) {
- throw new HoodieIndexException(String.format("Error reading bloom filter
from %s: %s", partitionPathFilePair, e));
+ if (fileName.isPresent()) {
+
ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()));
+ this.fileName = fileName;
+ } else {
+ this.fileName = Option.empty();
}
- LOG.info(String.format("Read bloom filter from %s in %d ms",
partitionPathFilePair, timer.endTimer()));
+ this.useMetadataTableIndex = useMetadataTableIndex;
+ this.bloomFilter = getBloomFilter();
+
Review comment:
remove extra line break
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId,
hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+ Function2<Integer, Iterator<Tuple2<String, HoodieKey>>,
Iterator<List<HoodieKeyLookupResult>>> {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+ private final HoodieTable hoodieTable;
+ private final HoodieWriteConfig config;
+
+ public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable,
HoodieWriteConfig config) {
+ this.hoodieTable = hoodieTable;
+ this.config = config;
+ }
+
+ @Override
+ public Iterator<List<HoodieKeyLookupResult>> call(Integer integer,
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+ List<List<HoodieKeyLookupResult>> resultList = new ArrayList<>();
+ Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
Review comment:
can we add a comment here that Pair represents partition Path and
fileName
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
##########
@@ -150,18 +152,30 @@ protected void commit(HoodieData<HoodieRecord>
hoodieDataRecords, String partiti
/**
* Tag each record with the location in the given partition.
- *
+ * <p>
* The record is tagged with respective file slice's location based on its
record key.
*/
- private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String
partitionName, int numFileGroups) {
- List<FileSlice> fileSlices =
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
partitionName);
- ValidationUtils.checkArgument(fileSlices.size() == numFileGroups,
String.format("Invalid number of file groups: found=%d, required=%d",
fileSlices.size(), numFileGroups));
-
- return records.stream().map(r -> {
- FileSlice slice =
fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
numFileGroups));
- final String instantTime = slice.isEmpty() ? "I" : "U";
- r.setCurrentLocation(new HoodieRecordLocation(instantTime,
slice.getFileId()));
- return r;
- }).collect(Collectors.toList());
+ private List<HoodieRecord> prepRecords(Map<MetadataPartitionType,
HoodieData<HoodieRecord>> partitionRecordsMap) {
+ List<HoodieRecord> allPartitionRecords = null;
Review comment:
Can you create a follow up jira to unify this code across flink and
spark using HoodieData if possible.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
##########
@@ -46,52 +50,57 @@
private static final Logger LOG =
LogManager.getLogger(HoodieKeyLookupHandle.class);
- private final HoodieTableType tableType;
-
private final BloomFilter bloomFilter;
-
private final List<String> candidateRecordKeys;
-
+ private final Option<String> fileName;
+ private final boolean useMetadataTableIndex;
private long totalKeysChecked;
public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K,
O> hoodieTable,
- Pair<String, String> partitionPathFilePair) {
- super(config, null, hoodieTable, partitionPathFilePair);
- this.tableType = hoodieTable.getMetaClient().getTableType();
+ Pair<String, String> partitionPathFileIDPair) {
+ this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
+ }
+
+ public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K,
O> hoodieTable,
+ Pair<String, String> partitionPathFileIDPair,
Option<String> fileName,
+ boolean useMetadataTableIndex) {
+ super(config, null, hoodieTable, partitionPathFileIDPair);
this.candidateRecordKeys = new ArrayList<>();
this.totalKeysChecked = 0;
- HoodieTimer timer = new HoodieTimer().startTimer();
-
- try {
- this.bloomFilter = createNewFileReader().readBloomFilter();
- } catch (IOException e) {
- throw new HoodieIndexException(String.format("Error reading bloom filter
from %s: %s", partitionPathFilePair, e));
+ if (fileName.isPresent()) {
+
ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()));
+ this.fileName = fileName;
+ } else {
+ this.fileName = Option.empty();
Review comment:
we can assign Option.empty at L55 and don't need this else block at all
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId,
hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+ Function2<Integer, Iterator<Tuple2<String, HoodieKey>>,
Iterator<List<HoodieKeyLookupResult>>> {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+ private final HoodieTable hoodieTable;
+ private final HoodieWriteConfig config;
+
+ public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable,
HoodieWriteConfig config) {
+ this.hoodieTable = hoodieTable;
+ this.config = config;
+ }
+
+ @Override
+ public Iterator<List<HoodieKeyLookupResult>> call(Integer integer,
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+ List<List<HoodieKeyLookupResult>> resultList = new ArrayList<>();
+ Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+
+ final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+ while (tuple2Iterator.hasNext()) {
+ Tuple2<String, HoodieKey> entry = tuple2Iterator.next();
+ final String partitionPath = entry._2.getPartitionPath();
+ final String fileId = entry._1;
+ if (!fileIDBaseFileMap.containsKey(fileId)) {
+ Option<HoodieBaseFile> baseFile =
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
+ if (!baseFile.isPresent()) {
+ throw new HoodieIndexException("Failed to find the base file for
partition: " + partitionPath
+ + ", fileId: " + fileId);
+ }
+ fileIDBaseFileMap.put(fileId, baseFile.get());
+ }
+ fileToKeysMap.computeIfAbsent(Pair.of(partitionPath,
fileIDBaseFileMap.get(fileId).getFileName()),
+ k -> new ArrayList<>()).add(entry._2);
+ }
+ if (fileToKeysMap.isEmpty()) {
+ return Collections.emptyListIterator();
+ }
+
+ List<Pair<String, String>> partitionNameFileNameList =
Review comment:
can we simplify as below
```
List<Pair<String, String>> partitionNameFileNameList =
fileToKeysMap.keySet().stream().collect(Collectors.toList())
```
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -319,9 +635,90 @@ private static void
processRollbackMetadata(HoodieActiveTimeline metadataTableTi
return records;
}
+ /**
+ * Convert rollback action metadata to bloom filter index records.
+ */
+ private static List<HoodieRecord>
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+
HoodieTableMetaClient dataMetaClient,
+
Map<String, List<String>> partitionToDeletedFiles,
+
Map<String, Map<String, Long>> partitionToAppendedFiles,
+ String
instantTime) {
+ List<HoodieRecord> records = new LinkedList<>();
+ partitionToDeletedFiles.forEach((partitionName, deletedFileList) ->
deletedFileList.forEach(deletedFile -> {
+ if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+ return;
+ }
+
+ final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
+ records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+ new PartitionIndexID(partition), new FileIndexID(deletedFile),
+ instantTime, ByteBuffer.allocate(0), true));
+ }));
+
+ partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
+ final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
+ appendedFileMap.forEach((appendedFile, length) -> {
+ if (!FSUtils.isBaseFile(new Path(appendedFile))) {
+ return;
+ }
+ final String pathWithPartition = partitionName + "/" + appendedFile;
+ final Path appendedFilePath = new Path(dataMetaClient.getBasePath(),
pathWithPartition);
+ try {
+ HoodieFileReader<IndexedRecord> fileReader =
+
HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(),
appendedFilePath);
+ final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
+ if (fileBloomFilter == null) {
+ LOG.error("Failed to read bloom filter for " + appendedFilePath);
+ return;
+ }
+ ByteBuffer bloomByteBuffer =
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
+ HoodieRecord record =
HoodieMetadataPayload.createBloomFilterMetadataRecord(
+ new PartitionIndexID(partition), new FileIndexID(appendedFile),
instantTime,
+ bloomByteBuffer, false);
+ records.add(record);
+ fileReader.close();
+ } catch (IOException e) {
+ LOG.error("Failed to get bloom filter for file: " +
appendedFilePath);
+ }
+ });
+ });
+ return records;
+ }
+
+ /**
+ * Convert rollback action metadata to column stats index records.
+ */
+ private static List<HoodieRecord>
convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
+
HoodieTableMetaClient datasetMetaClient,
+
Map<String, List<String>> partitionToDeletedFiles,
+
Map<String, Map<String, Long>> partitionToAppendedFiles,
+ String
instantTime) {
+ List<HoodieRecord> records = new LinkedList<>();
+ List<String> latestColumns = getLatestColumns(datasetMetaClient);
+ partitionToDeletedFiles.forEach((partitionName, deletedFileList) ->
deletedFileList.forEach(deletedFile -> {
+ final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
+ if (deletedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ final String filePathWithPartition = partitionName + "/" + deletedFile;
+ records.addAll(getColumnStats(partition, filePathWithPartition,
datasetMetaClient,
Review comment:
update: getColumnStats handles how to go about deleted file. we don't
read the file for deleted entry.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -109,55 +191,97 @@ private HoodieMetadataPayload(String key, int type,
Map<String, HoodieMetadataFi
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionListRecord(List<String> partitions) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
- partitions.forEach(partition -> fileInfo.put(partition, new
HoodieMetadataFileInfo(0L, false)));
+ partitions.forEach(partition -> fileInfo.put(partition, new
HoodieMetadataFileInfo(0L, false)));
HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.partitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+ fileInfo);
return new HoodieRecord<>(key, payload);
}
/**
* Create and return a {@code HoodieMetadataPayload} to save list of files
within a partition.
*
- * @param partition The name of the partition
- * @param filesAdded Mapping of files to their sizes for files which have
been added to this partition
+ * @param partition The name of the partition
+ * @param filesAdded Mapping of files to their sizes for files which have
been added to this partition
* @param filesDeleted List of files which have been deleted from this
partition
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionFilesRecord(String partition,
-
Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+
Option<Map<String, Long>> filesAdded,
+
Option<List<String>> filesDeleted) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
filesAdded.ifPresent(
m -> m.forEach((filename, size) -> fileInfo.put(filename, new
HoodieMetadataFileInfo(size, false))));
filesDeleted.ifPresent(
- m -> m.forEach(filename -> fileInfo.put(filename, new
HoodieMetadataFileInfo(0L, true))));
+ m -> m.forEach(filename -> fileInfo.put(filename, new
HoodieMetadataFileInfo(0L, true))));
HoodieKey key = new HoodieKey(partition,
MetadataPartitionType.FILES.partitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
return new HoodieRecord<>(key, payload);
}
+ /**
+ * Create bloom filter metadata record.
+ *
+ * @param partitionName - Partition name
+ * @param baseFileName - Base file name for which the bloom filter needs to
persisted
+ * @param timestamp - Instant timestamp responsible for this record
+ * @param bloomFilter - Bloom filter for the File
+ * @param isDeleted - Is the bloom filter no more valid
+ * @return Metadata payload containing the fileID and its bloom filter record
+ */
+ public static HoodieRecord<HoodieMetadataPayload>
createBloomFilterMetadataRecord(final String partitionName,
+
final String baseFileName,
+
final String timestamp,
+
final ByteBuffer bloomFilter,
+
final boolean isDeleted) {
+ ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR)
+ && FSUtils.isBaseFile(new Path(baseFileName)),
Review comment:
I see we have this base file check in many places. wondering if we are
doing too many checks. can we try to do it once and remove extraneous checks.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -124,14 +202,111 @@ public static void deleteMetadataTable(String basePath,
HoodieEngineContext cont
return records;
}
+ /**
+ * Convert commit action metadata to bloom filter records.
+ *
+ * @param commitMetadata - Commit action metadata
+ * @param dataMetaClient - Meta client for the data table
+ * @param instantTime - Action instant time
+ * @return List of metadata table records
+ */
+ public static List<HoodieRecord>
convertMetadataToBloomFilterRecords(HoodieCommitMetadata commitMetadata,
+
HoodieTableMetaClient dataMetaClient,
+ String
instantTime) {
+ List<HoodieRecord> records = new LinkedList<>();
+ commitMetadata.getPartitionToWriteStats().forEach((partitionStatName,
writeStats) -> {
+ final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME)
? NON_PARTITIONED_NAME : partitionStatName;
+ Map<String, Long> newFiles = new HashMap<>(writeStats.size());
+ writeStats.forEach(hoodieWriteStat -> {
+ // No action for delta logs
+ if (hoodieWriteStat instanceof HoodieDeltaWriteStat) {
+ return;
+ }
+
+ String pathWithPartition = hoodieWriteStat.getPath();
+ if (pathWithPartition == null) {
+ // Empty partition
+ LOG.error("Failed to find path in write stat to update metadata
table " + hoodieWriteStat);
+ return;
+ }
+ int offset = partition.equals(NON_PARTITIONED_NAME) ?
(pathWithPartition.startsWith("/") ? 1 : 0) :
+ partition.length() + 1;
+
+ final String fileName = pathWithPartition.substring(offset);
+ if (!FSUtils.isBaseFile(new Path(fileName))) {
+ return;
+ }
+ ValidationUtils.checkState(!newFiles.containsKey(fileName), "Duplicate
files in HoodieCommitMetadata");
+
+ final Path writeFilePath = new Path(dataMetaClient.getBasePath(),
pathWithPartition);
+ try {
+ HoodieFileReader<IndexedRecord> fileReader =
+
HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(),
writeFilePath);
+ try {
+ final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
+ if (fileBloomFilter == null) {
+ LOG.error("Failed to read bloom filter for " + writeFilePath);
+ return;
+ }
+ ByteBuffer bloomByteBuffer =
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
+ HoodieRecord record =
HoodieMetadataPayload.createBloomFilterMetadataRecord(
+ partition, fileName, instantTime, bloomByteBuffer, false);
+ records.add(record);
+ } catch (Exception e) {
+ LOG.error("Failed to read bloom filter for " + writeFilePath);
+ return;
Review comment:
Do we need to fail fast here? rather than moving ahead.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -124,6 +124,37 @@
.sinceVersion("0.10.0")
.withDocumentation("Enable full scanning of log files while reading log
records. If disabled, hudi does look up of only interested entries.");
+ public static final ConfigProperty<Boolean> ENABLE_META_INDEX_BLOOM_FILTER =
ConfigProperty
+ .key(METADATA_PREFIX + ".index.bloom.filter.enable")
+ .defaultValue(false)
+ .sinceVersion("0.11.0")
+ .withDocumentation("Enable indexing user data files bloom filters under
metadata table. When enabled, "
+ + "metadata table will have a partition to store the bloom filter
index and will be "
+ + "used during the index lookups.");
+
+ public static final ConfigProperty<Boolean> ENABLE_META_INDEX_COLUMN_STATS =
ConfigProperty
+ .key(METADATA_PREFIX + ".index.column.stats.enable")
+ .defaultValue(false)
+ .sinceVersion("0.11.0")
+ .withDocumentation("Enable indexing user data files column ranges under
metadata table key lookups. When "
+ + "enabled, metadata table will have a partition to store the column
ranges and will "
+ + "used for pruning files during the index lookups.");
+
+ public static final ConfigProperty<Boolean>
META_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS = ConfigProperty
+ .key(METADATA_PREFIX + ".index.column.stats.all_columns")
+ .defaultValue(false)
+ .sinceVersion("0.11.0")
+ .withDocumentation("Enable indexing user data files column ranges under
metadata table key lookups. When "
+ + "enabled, metadata table will have a partition to store the column
ranges and will "
+ + "used for pruning files during the index lookups.");
+
Review comment:
we might need to support a subset a columns to be indexed right. will it
be part of future work? Do we have a tracking jira please.
Infact, I would suggest we should be cautious in supporting to index all
columns. atleast prefer to have some constraints. for eg, only if total col
count is less than say 25 or so, we can support indexing all columns. If not,
only a a subset of columns as configured by user should be supported.
we can jam more on how to go about this.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId,
hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+ Function2<Integer, Iterator<Tuple2<String, HoodieKey>>,
Iterator<List<HoodieKeyLookupResult>>> {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+ private final HoodieTable hoodieTable;
+ private final HoodieWriteConfig config;
+
+ public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable,
HoodieWriteConfig config) {
+ this.hoodieTable = hoodieTable;
+ this.config = config;
+ }
+
+ @Override
+ public Iterator<List<HoodieKeyLookupResult>> call(Integer integer,
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+ List<List<HoodieKeyLookupResult>> resultList = new ArrayList<>();
+ Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+
+ final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+ while (tuple2Iterator.hasNext()) {
+ Tuple2<String, HoodieKey> entry = tuple2Iterator.next();
+ final String partitionPath = entry._2.getPartitionPath();
+ final String fileId = entry._1;
+ if (!fileIDBaseFileMap.containsKey(fileId)) {
+ Option<HoodieBaseFile> baseFile =
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
+ if (!baseFile.isPresent()) {
+ throw new HoodieIndexException("Failed to find the base file for
partition: " + partitionPath
+ + ", fileId: " + fileId);
+ }
+ fileIDBaseFileMap.put(fileId, baseFile.get());
+ }
+ fileToKeysMap.computeIfAbsent(Pair.of(partitionPath,
fileIDBaseFileMap.get(fileId).getFileName()),
+ k -> new ArrayList<>()).add(entry._2);
+ }
+ if (fileToKeysMap.isEmpty()) {
+ return Collections.emptyListIterator();
+ }
+
+ List<Pair<String, String>> partitionNameFileNameList =
+ fileToKeysMap.keySet().stream().map(partitionNameFileNamePair -> {
+ return Pair.of(partitionNameFileNamePair.getLeft(),
partitionNameFileNamePair.getRight());
+ }).collect(Collectors.toList());
+
+ Map<Pair<String, String>, ByteBuffer> fileIDToBloomFilterByteBufferMap =
+
hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList);
+
+ fileToKeysMap.forEach((partitionPathFileIdPair, hoodieKeyList) -> {
Review comment:
partitionPathFileNamePair
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexLazyCheckFunction.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.io.HoodieKeyLookupHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId,
hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexLazyCheckFunction implements
Function2<Integer, Iterator<Tuple2<String, HoodieKey>>,
Iterator<List<HoodieKeyLookupResult>>> {
Review comment:
can you add sufficient java docs explaining what we do here and how is
it diff from the other batch check function. also, do add sufficient java docs
for other class too.
##########
File path: hudi-common/src/main/avro/HoodieMetadata.avsc
##########
@@ -30,27 +30,108 @@
"doc": "Type of the metadata record",
"type": "int"
},
- { "name": "filesystemMetadata",
+ {
"doc": "Contains information about partitions and files within the
dataset",
- "type": ["null", {
- "type": "map",
- "values": {
+ "name": "filesystemMetadata",
+ "type": [
+ "null",
+ {
+ "type": "map",
+ "values": {
+ "type": "record",
+ "name": "HoodieMetadataFileInfo",
+ "fields": [
+ {
+ "name": "size",
+ "type": "long",
+ "doc": "Size of the file"
+ },
+ {
+ "name": "isDeleted",
+ "type": "boolean",
+ "doc": "True if this file has been deleted"
+ }
+ ]
+ }
+ }
+ ]
+ },
+ {
+ "doc": "Metadata Index of bloom filters for all data files in the
user table",
+ "name": "BloomFilterMetadata",
+ "type": [
+ "null",
+ {
+ "doc": "Data file bloom filter details",
+ "name": "HoodieMetadataBloomFilter",
"type": "record",
- "name": "HoodieMetadataFileInfo",
"fields": [
{
- "name": "size",
- "type": "long",
- "doc": "Size of the file"
+ "doc": "Bloom filter type code",
+ "name": "type",
+ "type": "string"
+ },
+ {
+ "doc": "Instant timestamp when this metadata was
created/updated",
+ "name": "timestamp",
Review comment:
may I know where do we use this? we don't have this for other partitions
right(files/col stats) ?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -319,9 +636,88 @@ private static void
processRollbackMetadata(HoodieActiveTimeline metadataTableTi
return records;
}
+ /**
+ * Convert rollback action metadata to bloom filter index records.
+ */
+ private static List<HoodieRecord>
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+
HoodieTableMetaClient dataMetaClient,
+
Map<String, List<String>> partitionToDeletedFiles,
+
Map<String, Map<String, Long>> partitionToAppendedFiles,
+ String
instantTime) {
+ List<HoodieRecord> records = new LinkedList<>();
+ partitionToDeletedFiles.forEach((partitionName, deletedFileList) ->
deletedFileList.forEach(deletedFile -> {
+ if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+ return;
+ }
+
+ final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
+ records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+ partition, deletedFile, instantTime, ByteBuffer.allocate(0), true));
+ }));
+
+ partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
+ final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
+ appendedFileMap.forEach((appendedFile, length) -> {
+ if (!FSUtils.isBaseFile(new Path(appendedFile))) {
+ return;
+ }
+ final String pathWithPartition = partitionName + "/" + appendedFile;
+ final Path appendedFilePath = new Path(dataMetaClient.getBasePath(),
pathWithPartition);
+ try {
+ HoodieFileReader<IndexedRecord> fileReader =
Review comment:
we can probably create a private method to fetch
BloomFilterMetadatRecord for a given file and use it everywhere required
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -109,55 +191,97 @@ private HoodieMetadataPayload(String key, int type,
Map<String, HoodieMetadataFi
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionListRecord(List<String> partitions) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
- partitions.forEach(partition -> fileInfo.put(partition, new
HoodieMetadataFileInfo(0L, false)));
+ partitions.forEach(partition -> fileInfo.put(partition, new
HoodieMetadataFileInfo(0L, false)));
HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.partitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+ fileInfo);
return new HoodieRecord<>(key, payload);
}
/**
* Create and return a {@code HoodieMetadataPayload} to save list of files
within a partition.
*
- * @param partition The name of the partition
- * @param filesAdded Mapping of files to their sizes for files which have
been added to this partition
+ * @param partition The name of the partition
+ * @param filesAdded Mapping of files to their sizes for files which have
been added to this partition
* @param filesDeleted List of files which have been deleted from this
partition
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionFilesRecord(String partition,
-
Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+
Option<Map<String, Long>> filesAdded,
+
Option<List<String>> filesDeleted) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
filesAdded.ifPresent(
m -> m.forEach((filename, size) -> fileInfo.put(filename, new
HoodieMetadataFileInfo(size, false))));
filesDeleted.ifPresent(
- m -> m.forEach(filename -> fileInfo.put(filename, new
HoodieMetadataFileInfo(0L, true))));
+ m -> m.forEach(filename -> fileInfo.put(filename, new
HoodieMetadataFileInfo(0L, true))));
HoodieKey key = new HoodieKey(partition,
MetadataPartitionType.FILES.partitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
return new HoodieRecord<>(key, payload);
}
+ /**
+ * Create bloom filter metadata record.
+ *
+ * @param partitionName - Partition name
+ * @param baseFileName - Base file name for which the bloom filter needs to
persisted
+ * @param timestamp - Instant timestamp responsible for this record
+ * @param bloomFilter - Bloom filter for the File
+ * @param isDeleted - Is the bloom filter no more valid
+ * @return Metadata payload containing the fileID and its bloom filter record
+ */
+ public static HoodieRecord<HoodieMetadataPayload>
createBloomFilterMetadataRecord(final String partitionName,
+
final String baseFileName,
+
final String timestamp,
+
final ByteBuffer bloomFilter,
+
final boolean isDeleted) {
+ ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR)
+ && FSUtils.isBaseFile(new Path(baseFileName)),
+ "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!");
+ final String bloomFilterKey = new
PartitionIndexID(partitionName).asBase64EncodedString()
+ .concat(new FileIndexID(baseFileName).asBase64EncodedString());
+ HoodieKey key = new HoodieKey(bloomFilterKey,
MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+
+ // TODO: Get the bloom filter type from the file
+ HoodieMetadataBloomFilter metadataBloomFilter =
+ new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(),
+ timestamp, bloomFilter, isDeleted);
+ HoodieMetadataPayload metadataPayload = new
HoodieMetadataPayload(key.getRecordKey(),
+ HoodieMetadataPayload.METADATA_TYPE_BLOOM_FILTER, metadataBloomFilter);
+ return new HoodieRecord<>(key, metadataPayload);
+ }
+
@Override
public HoodieMetadataPayload preCombine(HoodieMetadataPayload
previousRecord) {
ValidationUtils.checkArgument(previousRecord.type == type,
- "Cannot combine " + previousRecord.type + " with " + type);
-
- Map<String, HoodieMetadataFileInfo> combinedFileInfo = null;
+ "Cannot combine " + previousRecord.type + " with " + type);
switch (type) {
- case PARTITION_LIST:
- case FILE_LIST:
- combinedFileInfo = combineFilesystemMetadata(previousRecord);
- break;
+ case METADATA_TYPE_PARTITION_LIST:
+ case METADATA_TYPE_FILE_LIST:
+ Map<String, HoodieMetadataFileInfo> combinedFileInfo =
combineFilesystemMetadata(previousRecord);
+ return new HoodieMetadataPayload(key, type, combinedFileInfo);
+ case METADATA_TYPE_BLOOM_FILTER:
+ HoodieMetadataBloomFilter combineBloomFilterMetadata =
combineBloomFilterMetadata(previousRecord);
+ return new HoodieMetadataPayload(key, type,
combineBloomFilterMetadata);
+ case METADATA_TYPE_COLUMN_STATS:
+ return new HoodieMetadataPayload(key, type,
combineColumnStats(previousRecord));
default:
throw new HoodieMetadataException("Unknown type of
HoodieMetadataPayload: " + type);
}
+ }
+
+ private HoodieMetadataBloomFilter
combineBloomFilterMetadata(HoodieMetadataPayload previousRecord) {
+ return this.bloomFilterMetadata;
+ }
Review comment:
reason why we are going w/ this is, one metadatapayload can contain only
one record pertaining to just one file. And hence, either it will be a new file
or it will represent deleted file and hence we are good.
but its diff case with fileSystem metadata. bcoz, one record (for eg
partition p1) might have had 3 files in record1 and could have just 2 in
record2 and so we need detailed combine logic.
but its not required for col stats or bloom
is my understanding right.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
##########
@@ -70,26 +69,44 @@ public static SparkHoodieBloomIndexHelper getInstance() {
JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
HoodieJavaRDD.getJavaRDD(fileComparisonPairs)
.map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight()));
- Map<String, Long> comparisonsPerFileGroup = computeComparisonsPerFileGroup(
- config, recordsPerPartition, partitionToFileInfo, fileComparisonsRDD,
context);
- int inputParallelism =
-
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
+
+ int inputParallelism =
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
int joinParallelism = Math.max(inputParallelism,
config.getBloomIndexParallelism());
LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism:
${"
+ config.getBloomIndexParallelism() + "}");
- if (config.useBloomIndexBucketizedChecking()) {
+ JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
+ if (config.isMetaIndexBloomFilterEnabled()) {
+ // Step 1: Sort by file id
+ JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
+ fileComparisonsRDD.sortBy(entry -> entry._1, true, joinParallelism);
+
+ // Step 2: Use bloom filter to filter and the actual log file to get the
record location
+ final boolean isBloomFiltersBatchLoadEnabled =
config.isMetaIndexBloomFilterBatchLoadEnabled();
+ if (isBloomFiltersBatchLoadEnabled) {
Review comment:
something to think about. not required in this patch.
similar to how we do comparisonsPerFileGroup, we can try to fetch how many
file groups are involved and then take some informed decision to go w/ batch
load of lazy evaluation here.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,6 +156,111 @@ protected BaseTableMetadata(HoodieEngineContext
engineContext, HoodieMetadataCon
.getAllFilesInPartitions(partitions);
}
+ @Override
+ public Option<ByteBuffer> getBloomFilter(final String partitionName, final
String fileName)
+ throws HoodieMetadataException {
+ if (!isMetaIndexBloomFilterEnabled) {
+ LOG.error("Meta index for bloom filters is disabled!");
+ return Option.empty();
+ }
+
+ final Pair<String, String> partitionFileName = Pair.of(partitionName,
fileName);
+ Map<Pair<String, String>, ByteBuffer> bloomFilters =
getBloomFilters(Collections.singletonList(partitionFileName));
+ if (bloomFilters.isEmpty()) {
+ LOG.error("Meta index: missing bloom filter for partition: " +
partitionName + ", file: " + fileName);
+ return Option.empty();
+ }
+
+ ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName));
+ return Option.of(bloomFilters.get(partitionFileName));
+ }
+
+ @Override
+ public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final
List<Pair<String, String>> partitionNameFileNameList)
+ throws HoodieMetadataException {
+ if (!isMetaIndexBloomFilterEnabled) {
+ LOG.error("Meta index for bloom filter is disabled!");
+ return Collections.emptyMap();
+ }
+
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+ Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+ partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
+ final String bloomKey = new
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()
+ .concat(new
FileIndexID(partitionNameFileNamePair.getRight()).asBase64EncodedString());
+ partitionIDFileIDSortedStrings.add(bloomKey);
+ fileToKeyMap.put(bloomKey, partitionNameFileNamePair);
+ }
+ );
+
+ List<String> partitionIDFileIDStrings = new
ArrayList<>(partitionIDFileIDSortedStrings);
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
hoodieRecordList =
+ getRecordsByKeys(partitionIDFileIDStrings,
MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR,
timer.endTimer()));
+
+ Map<Pair<String, String>, ByteBuffer> partitionFileToBloomFilterMap = new
HashMap<>();
+ for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry
: hoodieRecordList) {
+ if (entry.getRight().isPresent()) {
+ final Option<HoodieMetadataBloomFilter> bloomFilterMetadata =
+ entry.getRight().get().getData().getBloomFilterMetadata();
+ if (bloomFilterMetadata.isPresent()) {
+ if (!bloomFilterMetadata.get().getIsDeleted()) {
+
ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft()));
+
partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()),
bloomFilterMetadata.get().getBloomFilter());
+ }
+ } else {
+ LOG.error("Meta index bloom filter missing for: " +
fileToKeyMap.get(entry.getLeft()));
+ }
+ }
+ }
+ return partitionFileToBloomFilterMap;
+ }
+
+ @Override
+ public Map<Pair<String, String>, HoodieColumnStats> getColumnStats(final
List<Pair<String, String>> partitionNameFileNameList, final String columnName)
+ throws HoodieMetadataException {
+ if (!isMetaIndexColumnStatsEnabled) {
+ LOG.error("Meta index for column stats is disabled!");
+ return Collections.emptyMap();
+ }
+
+ Map<String, Pair<String, String>> columnStatKeyToFileNameMap = new
HashMap<>();
+ List<String> columnStatKeys = new ArrayList<>();
+ final String columnIndexStr = new
ColumnIndexID(columnName).asBase64EncodedString();
+ for (Pair<String, String> partitionNameFileNamePair :
partitionNameFileNameList) {
+ final String columnStatIndexKey = columnIndexStr
+ .concat(new
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString())
+ .concat(new
FileIndexID(partitionNameFileNamePair.getRight()).asBase64EncodedString());
+ columnStatKeys.add(columnStatIndexKey);
Review comment:
shouldn't we add this to treeset and then add to list. we need this to
be sorted in my understanding.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -399,4 +795,119 @@ public static int mapRecordKeyToFileGroupIndex(String
recordKey, int numFileGrou
return fileSliceStream.sorted((s1, s2) ->
s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList());
}
+ public static List<HoodieRecord>
convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
+
HoodieEngineContext engineContext,
+
HoodieTableMetaClient dataMetaClient,
+ boolean
isMetaIndexColumnStatsForAllColumns,
+ String
instantTime) {
+
+ try {
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(entry -> entry.stream()).collect(Collectors.toList());
+ return
HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext,
dataMetaClient, allWriteStats,
+ isMetaIndexColumnStatsForAllColumns);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate column stats records for
metadata table ", e);
+ }
+ }
+
+ /**
+ * Create column stats from write status.
+ *
+ * @param engineContext - Enging context
+ * @param datasetMetaClient - Dataset meta client
+ * @param allWriteStats - Write status to convert
+ * @param isMetaIndexColumnStatsForAllColumns - Are all columns enabled for
indexing
+ */
+ public static List<HoodieRecord>
createColumnStatsFromWriteStats(HoodieEngineContext engineContext,
+
HoodieTableMetaClient datasetMetaClient,
+
List<HoodieWriteStat> allWriteStats,
+ boolean
isMetaIndexColumnStatsForAllColumns) throws Exception {
+ if (allWriteStats.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<HoodieWriteStat> prunedWriteStats =
allWriteStats.stream().filter(writeStat -> {
+ return !(writeStat instanceof HoodieDeltaWriteStat);
+ }).collect(Collectors.toList());
+ if (prunedWriteStats.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ return engineContext.flatMap(prunedWriteStats,
+ writeStat -> translateWriteStatToColumnStats(writeStat,
datasetMetaClient,
+ getLatestColumns(datasetMetaClient,
isMetaIndexColumnStatsForAllColumns)),
+ prunedWriteStats.size());
+ }
+
+ /**
+ * Get the latest columns for the table for column stats indexing.
+ *
+ * @param datasetMetaClient - Data table meta client
+ * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing
enabled for all columns
+ */
+ private static List<String> getLatestColumns(HoodieTableMetaClient
datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
+ if (!isMetaIndexColumnStatsForAllColumns) {
+ return
Collections.singletonList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp());
+ }
+
+ if
(datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants()
> 1) {
+ TableSchemaResolver schemaResolver = new
TableSchemaResolver(datasetMetaClient);
+ // consider nested fields as well. if column stats is enabled only for a
subset of columns,
+ // directly use them instead of all columns from the latest table schema
+ try {
+ return schemaResolver.getTableAvroSchema().getFields().stream()
+ .map(entry -> entry.name()).collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new HoodieException("Failed to get latest columns for " +
datasetMetaClient.getBasePath());
+ }
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ private static List<String> getLatestColumns(HoodieTableMetaClient
datasetMetaClient) {
+ return getLatestColumns(datasetMetaClient, false);
Review comment:
so, looks like we index only record key in this patch is it ?
I mean, we don't support adding index for all cols. is my understanding
right.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -109,55 +191,97 @@ private HoodieMetadataPayload(String key, int type,
Map<String, HoodieMetadataFi
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionListRecord(List<String> partitions) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
- partitions.forEach(partition -> fileInfo.put(partition, new
HoodieMetadataFileInfo(0L, false)));
+ partitions.forEach(partition -> fileInfo.put(partition, new
HoodieMetadataFileInfo(0L, false)));
HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.partitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+ fileInfo);
return new HoodieRecord<>(key, payload);
}
/**
* Create and return a {@code HoodieMetadataPayload} to save list of files
within a partition.
*
- * @param partition The name of the partition
- * @param filesAdded Mapping of files to their sizes for files which have
been added to this partition
+ * @param partition The name of the partition
+ * @param filesAdded Mapping of files to their sizes for files which have
been added to this partition
* @param filesDeleted List of files which have been deleted from this
partition
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionFilesRecord(String partition,
-
Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+
Option<Map<String, Long>> filesAdded,
+
Option<List<String>> filesDeleted) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
filesAdded.ifPresent(
m -> m.forEach((filename, size) -> fileInfo.put(filename, new
HoodieMetadataFileInfo(size, false))));
filesDeleted.ifPresent(
- m -> m.forEach(filename -> fileInfo.put(filename, new
HoodieMetadataFileInfo(0L, true))));
+ m -> m.forEach(filename -> fileInfo.put(filename, new
HoodieMetadataFileInfo(0L, true))));
HoodieKey key = new HoodieKey(partition,
MetadataPartitionType.FILES.partitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
return new HoodieRecord<>(key, payload);
}
+ /**
+ * Create bloom filter metadata record.
+ *
+ * @param partitionName - Partition name
+ * @param baseFileName - Base file name for which the bloom filter needs to
persisted
+ * @param timestamp - Instant timestamp responsible for this record
+ * @param bloomFilter - Bloom filter for the File
+ * @param isDeleted - Is the bloom filter no more valid
+ * @return Metadata payload containing the fileID and its bloom filter record
+ */
+ public static HoodieRecord<HoodieMetadataPayload>
createBloomFilterMetadataRecord(final String partitionName,
+
final String baseFileName,
+
final String timestamp,
+
final ByteBuffer bloomFilter,
+
final boolean isDeleted) {
+ ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR)
+ && FSUtils.isBaseFile(new Path(baseFileName)),
+ "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!");
+ final String bloomFilterKey = new
PartitionIndexID(partitionName).asBase64EncodedString()
+ .concat(new FileIndexID(baseFileName).asBase64EncodedString());
+ HoodieKey key = new HoodieKey(bloomFilterKey,
MetadataPartitionType.BLOOM_FILTERS.partitionPath());
+
+ // TODO: Get the bloom filter type from the file
+ HoodieMetadataBloomFilter metadataBloomFilter =
+ new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(),
+ timestamp, bloomFilter, isDeleted);
+ HoodieMetadataPayload metadataPayload = new
HoodieMetadataPayload(key.getRecordKey(),
+ HoodieMetadataPayload.METADATA_TYPE_BLOOM_FILTER, metadataBloomFilter);
+ return new HoodieRecord<>(key, metadataPayload);
+ }
+
@Override
public HoodieMetadataPayload preCombine(HoodieMetadataPayload
previousRecord) {
ValidationUtils.checkArgument(previousRecord.type == type,
- "Cannot combine " + previousRecord.type + " with " + type);
-
- Map<String, HoodieMetadataFileInfo> combinedFileInfo = null;
+ "Cannot combine " + previousRecord.type + " with " + type);
switch (type) {
- case PARTITION_LIST:
- case FILE_LIST:
- combinedFileInfo = combineFilesystemMetadata(previousRecord);
- break;
+ case METADATA_TYPE_PARTITION_LIST:
+ case METADATA_TYPE_FILE_LIST:
+ Map<String, HoodieMetadataFileInfo> combinedFileInfo =
combineFilesystemMetadata(previousRecord);
+ return new HoodieMetadataPayload(key, type, combinedFileInfo);
+ case METADATA_TYPE_BLOOM_FILTER:
+ HoodieMetadataBloomFilter combineBloomFilterMetadata =
combineBloomFilterMetadata(previousRecord);
+ return new HoodieMetadataPayload(key, type,
combineBloomFilterMetadata);
+ case METADATA_TYPE_COLUMN_STATS:
+ return new HoodieMetadataPayload(key, type,
combineColumnStats(previousRecord));
Review comment:
nit: combineColumnStatsMetadata
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
##########
@@ -70,26 +69,44 @@ public static SparkHoodieBloomIndexHelper getInstance() {
JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
HoodieJavaRDD.getJavaRDD(fileComparisonPairs)
.map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight()));
- Map<String, Long> comparisonsPerFileGroup = computeComparisonsPerFileGroup(
- config, recordsPerPartition, partitionToFileInfo, fileComparisonsRDD,
context);
- int inputParallelism =
-
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
+
+ int inputParallelism =
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
int joinParallelism = Math.max(inputParallelism,
config.getBloomIndexParallelism());
LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism:
${"
+ config.getBloomIndexParallelism() + "}");
- if (config.useBloomIndexBucketizedChecking()) {
+ JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
+ if (config.isMetaIndexBloomFilterEnabled()) {
+ // Step 1: Sort by file id
+ JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
+ fileComparisonsRDD.sortBy(entry -> entry._1, true, joinParallelism);
+
+ // Step 2: Use bloom filter to filter and the actual log file to get the
record location
+ final boolean isBloomFiltersBatchLoadEnabled =
config.isMetaIndexBloomFilterBatchLoadEnabled();
+ if (isBloomFiltersBatchLoadEnabled) {
Review comment:
bcoz, I see by default batch load is enabled. for larger datasets,
probably we should go w/ lazy.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -735,14 +768,19 @@ protected void bootstrapCommit(List<DirectoryInfo>
partitionInfoList, String cre
List<String> partitions = partitionInfoList.stream().map(p ->
p.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME :
p.getRelativePath()).collect(Collectors.toList());
final int totalFiles = partitionInfoList.stream().mapToInt(p ->
p.getTotalFiles()).sum();
+ final Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordsMap = new HashMap<>();
// Record which saves the list of all partitions
HoodieRecord allPartitionRecord =
HoodieMetadataPayload.createPartitionListRecord(partitions);
if (partitions.isEmpty()) {
- // in case of boostrapping of a fresh table, there won't be any
partitions, but we need to make a boostrap commit
-
commit(engineContext.parallelize(Collections.singletonList(allPartitionRecord),
1), MetadataPartitionType.FILES.partitionPath(), createInstantTime, false);
+ // in case of bootstrapping of a fresh table, there won't be any
partitions, but we need to make a boostrap commit
+ final HoodieData<HoodieRecord> allPartitionRecordsRDD =
engineContext.parallelize(
+ Collections.singletonList(allPartitionRecord), 1);
+ partitionToRecordsMap.put(MetadataPartitionType.FILES,
allPartitionRecordsRDD);
+ commit(createInstantTime, partitionToRecordsMap, false);
Review comment:
can you please link the jira here.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -109,6 +109,7 @@
protected boolean enabled;
protected SerializableConfiguration hadoopConf;
protected final transient HoodieEngineContext engineContext;
+ protected final List<MetadataPartitionType> enabledPartitionTypes;
Review comment:
can you please link the tracking jira here so that we don't miss out.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomMetaIndexBatchCheckFunction.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Function performing actual checking of RDD partition containing (fileId,
hoodieKeys) against the actual files.
+ */
+public class HoodieBloomMetaIndexBatchCheckFunction implements
+ Function2<Integer, Iterator<Tuple2<String, HoodieKey>>,
Iterator<List<HoodieKeyLookupResult>>> {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieBloomMetaIndexBatchCheckFunction.class);
+ private final HoodieTable hoodieTable;
+ private final HoodieWriteConfig config;
+
+ public HoodieBloomMetaIndexBatchCheckFunction(HoodieTable hoodieTable,
HoodieWriteConfig config) {
+ this.hoodieTable = hoodieTable;
+ this.config = config;
+ }
+
+ @Override
+ public Iterator<List<HoodieKeyLookupResult>> call(Integer integer,
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+ List<List<HoodieKeyLookupResult>> resultList = new ArrayList<>();
+ Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
+
+ final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+ while (tuple2Iterator.hasNext()) {
+ Tuple2<String, HoodieKey> entry = tuple2Iterator.next();
+ final String partitionPath = entry._2.getPartitionPath();
+ final String fileId = entry._1;
+ if (!fileIDBaseFileMap.containsKey(fileId)) {
+ Option<HoodieBaseFile> baseFile =
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
+ if (!baseFile.isPresent()) {
+ throw new HoodieIndexException("Failed to find the base file for
partition: " + partitionPath
+ + ", fileId: " + fileId);
+ }
+ fileIDBaseFileMap.put(fileId, baseFile.get());
+ }
+ fileToKeysMap.computeIfAbsent(Pair.of(partitionPath,
fileIDBaseFileMap.get(fileId).getFileName()),
+ k -> new ArrayList<>()).add(entry._2);
+ }
+ if (fileToKeysMap.isEmpty()) {
+ return Collections.emptyListIterator();
+ }
+
+ List<Pair<String, String>> partitionNameFileNameList =
Review comment:
or
```
List<Pair<String, String>> partitionNameFileNameList = new
ArrayList<>(fileToKeysMap.keySet());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]