codope commented on a change in pull request #4740:
URL: https://github.com/apache/hudi/pull/4740#discussion_r799291405
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
##########
@@ -53,56 +46,24 @@
private final BloomFilter bloomFilter;
private final List<String> candidateRecordKeys;
- private final boolean useMetadataTableIndex;
- private Option<String> fileName = Option.empty();
private long totalKeysChecked;
public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K,
O> hoodieTable,
Pair<String, String> partitionPathFileIDPair) {
- this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
- }
-
- public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K,
O> hoodieTable,
- Pair<String, String> partitionPathFileIDPair,
Option<String> fileName,
- boolean useMetadataTableIndex) {
super(config, hoodieTable, partitionPathFileIDPair);
this.candidateRecordKeys = new ArrayList<>();
this.totalKeysChecked = 0;
- if (fileName.isPresent()) {
-
ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()),
- "File name '" + fileName.get() + "' doesn't match this lookup handle
fileid '" + getFileId() + "'");
- this.fileName = fileName;
- }
- this.useMetadataTableIndex = useMetadataTableIndex;
this.bloomFilter = getBloomFilter();
}
private BloomFilter getBloomFilter() {
- BloomFilter bloomFilter = null;
HoodieTimer timer = new HoodieTimer().startTimer();
- try {
- if (this.useMetadataTableIndex) {
- ValidationUtils.checkArgument(this.fileName.isPresent(),
- "File name not available to fetch bloom filter from the metadata
table index.");
- Option<ByteBuffer> bloomFilterByteBuffer =
-
hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(),
fileName.get());
- if (!bloomFilterByteBuffer.isPresent()) {
- throw new HoodieIndexException("BloomFilter missing for " +
partitionPathFileIDPair.getRight());
- }
- bloomFilter =
- new
HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(bloomFilterByteBuffer.get()).toString(),
- BloomFilterTypeCode.DYNAMIC_V0);
- } else {
- try (HoodieFileReader reader = createNewFileReader()) {
- bloomFilter = reader.readBloomFilter();
- }
- }
+ try (HoodieFileReader reader = createNewFileReader()) {
+ LOG.debug(String.format("Read bloom filter from %s in %d ms",
partitionPathFileIDPair, timer.endTimer()));
+ return reader.readBloomFilter();
Review comment:
I see you hae removed `useMetadataTableIndex`. Will `readBloomFilter()`
use MT bloom index by default?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java
##########
@@ -113,7 +110,7 @@ protected void start() {
}
List<Pair<String, String>> partitionNameFileNameList = new
ArrayList<>(fileToKeysMap.keySet());
- Map<Pair<String, String>, ByteBuffer> fileToBloomFilterMap =
+ Map<Pair<String, String>, BloomFilter> fileToBloomFilterMap =
Review comment:
Is my understanding correct that change from ByteBuffer to BloomFilter
is only to avoid repetitive decoding? Is there any other reason?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
##########
@@ -53,56 +46,24 @@
private final BloomFilter bloomFilter;
private final List<String> candidateRecordKeys;
- private final boolean useMetadataTableIndex;
- private Option<String> fileName = Option.empty();
private long totalKeysChecked;
public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K,
O> hoodieTable,
Pair<String, String> partitionPathFileIDPair) {
- this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
- }
-
- public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K,
O> hoodieTable,
- Pair<String, String> partitionPathFileIDPair,
Option<String> fileName,
- boolean useMetadataTableIndex) {
super(config, hoodieTable, partitionPathFileIDPair);
this.candidateRecordKeys = new ArrayList<>();
this.totalKeysChecked = 0;
- if (fileName.isPresent()) {
-
ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()),
- "File name '" + fileName.get() + "' doesn't match this lookup handle
fileid '" + getFileId() + "'");
- this.fileName = fileName;
- }
- this.useMetadataTableIndex = useMetadataTableIndex;
this.bloomFilter = getBloomFilter();
}
private BloomFilter getBloomFilter() {
- BloomFilter bloomFilter = null;
HoodieTimer timer = new HoodieTimer().startTimer();
- try {
- if (this.useMetadataTableIndex) {
- ValidationUtils.checkArgument(this.fileName.isPresent(),
- "File name not available to fetch bloom filter from the metadata
table index.");
- Option<ByteBuffer> bloomFilterByteBuffer =
-
hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(),
fileName.get());
- if (!bloomFilterByteBuffer.isPresent()) {
- throw new HoodieIndexException("BloomFilter missing for " +
partitionPathFileIDPair.getRight());
- }
- bloomFilter =
- new
HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(bloomFilterByteBuffer.get()).toString(),
- BloomFilterTypeCode.DYNAMIC_V0);
- } else {
- try (HoodieFileReader reader = createNewFileReader()) {
- bloomFilter = reader.readBloomFilter();
- }
- }
+ try (HoodieFileReader reader = createNewFileReader()) {
+ LOG.debug(String.format("Read bloom filter from %s in %d ms",
partitionPathFileIDPair, timer.endTimer()));
Review comment:
Bloom filter is being read in the next line so why end the timer here?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -203,15 +206,19 @@ protected BaseTableMetadata(HoodieEngineContext
engineContext, HoodieMetadataCon
metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR,
(timer.endTimer() / partitionIDFileIDStrings.size())));
- Map<Pair<String, String>, ByteBuffer> partitionFileToBloomFilterMap = new
HashMap<>();
+ Map<Pair<String, String>, BloomFilter> partitionFileToBloomFilterMap = new
HashMap<>();
for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry
: hoodieRecordList) {
if (entry.getRight().isPresent()) {
final Option<HoodieMetadataBloomFilter> bloomFilterMetadata =
entry.getRight().get().getData().getBloomFilterMetadata();
Review comment:
Method throws `HoodieMetadataException` but this chained call can throw
other exception as well. Should we handle in a try-catch?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class MetadataRecordsGenerationParams implements Serializable {
+
+ private final HoodieTableMetaClient dataMetaClient;
Review comment:
Do we need to serialize `dataMetaClient` as well?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -186,94 +179,92 @@ public static void deleteMetadataTable(String basePath,
HoodieEngineContext cont
/**
* Convert commit action metadata to bloom filter records.
*
- * @param commitMetadata - Commit action metadata
- * @param dataMetaClient - Meta client for the data table
- * @param instantTime - Action instant time
- * @return List of metadata table records
+ * @param context - Engine context to use
+ * @param commitMetadata - Commit action metadata
+ * @param instantTime - Action instant time
+ * @param recordsGenerationParams - Parameters for bloom filter record
generation
+ * @return HoodieData of metadata table records
*/
- public static List<HoodieRecord>
convertMetadataToBloomFilterRecords(HoodieCommitMetadata commitMetadata,
-
HoodieTableMetaClient dataMetaClient,
- String
instantTime) {
- List<HoodieRecord> records = new LinkedList<>();
- commitMetadata.getPartitionToWriteStats().forEach((partitionStatName,
writeStats) -> {
- final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME)
? NON_PARTITIONED_NAME : partitionStatName;
- Map<String, Long> newFiles = new HashMap<>(writeStats.size());
- writeStats.forEach(hoodieWriteStat -> {
- // No action for delta logs
- if (hoodieWriteStat instanceof HoodieDeltaWriteStat) {
- return;
- }
+ public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(
+ HoodieEngineContext context, HoodieCommitMetadata commitMetadata,
+ String instantTime, MetadataRecordsGenerationParams
recordsGenerationParams) {
+ final List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(entry -> entry.stream()).collect(Collectors.toList());
+ if (allWriteStats.isEmpty()) {
+ return context.emptyHoodieData();
+ }
- String pathWithPartition = hoodieWriteStat.getPath();
- if (pathWithPartition == null) {
- // Empty partition
- LOG.error("Failed to find path in write stat to update metadata
table " + hoodieWriteStat);
- return;
- }
- int offset = partition.equals(NON_PARTITIONED_NAME) ?
(pathWithPartition.startsWith("/") ? 1 : 0) :
- partition.length() + 1;
+ HoodieData<HoodieWriteStat> allWriteStatsRDD =
context.parallelize(allWriteStats,
+ Math.max(recordsGenerationParams.getBloomIndexParallelism(),
allWriteStats.size()));
+ return allWriteStatsRDD.flatMap(hoodieWriteStat -> {
+ final String partition = hoodieWriteStat.getPartitionPath();
- final String fileName = pathWithPartition.substring(offset);
- if (!FSUtils.isBaseFile(new Path(fileName))) {
- return;
- }
- ValidationUtils.checkState(!newFiles.containsKey(fileName), "Duplicate
files in HoodieCommitMetadata");
+ // TODO: HUDI-1492 Delta write stat handling for schemes supporting
appends
+ if (hoodieWriteStat instanceof HoodieDeltaWriteStat) {
+ return Collections.emptyListIterator();
+ }
- final Path writeFilePath = new Path(dataMetaClient.getBasePath(),
pathWithPartition);
+ String pathWithPartition = hoodieWriteStat.getPath();
+ if (pathWithPartition == null) {
+ // Empty partition
+ LOG.error("Failed to find path in write stat to update metadata table
" + hoodieWriteStat);
+ return Collections.emptyListIterator();
+ }
+ int offset = partition.equals(NON_PARTITIONED_NAME) ?
(pathWithPartition.startsWith("/") ? 1 : 0) :
+ partition.length() + 1;
+
+ final String fileName = pathWithPartition.substring(offset);
+ if (!FSUtils.isBaseFile(new Path(fileName))) {
+ return Collections.emptyListIterator();
+ }
+
+ final Path writeFilePath = new
Path(recordsGenerationParams.getDataMetaClient().getBasePath(),
pathWithPartition);
+ try (HoodieFileReader<IndexedRecord> fileReader =
+
HoodieFileReaderFactory.getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(),
writeFilePath)) {
try {
- HoodieFileReader<IndexedRecord> fileReader =
-
HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(),
writeFilePath);
- try {
- final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
- if (fileBloomFilter == null) {
- LOG.error("Failed to read bloom filter for " + writeFilePath);
- return;
- }
- ByteBuffer bloomByteBuffer =
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
- HoodieRecord record =
HoodieMetadataPayload.createBloomFilterMetadataRecord(
- partition, fileName, instantTime, bloomByteBuffer, false);
- records.add(record);
- } catch (Exception e) {
+ final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
+ if (fileBloomFilter == null) {
LOG.error("Failed to read bloom filter for " + writeFilePath);
- return;
+ return Collections.emptyListIterator();
}
+ ByteBuffer bloomByteBuffer =
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
+ HoodieRecord record =
HoodieMetadataPayload.createBloomFilterMetadataRecord(
+ partition, fileName, instantTime,
recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false);
Review comment:
This part looks similar to whta is being done at #L650. Is there a way
to extract and and enahnce reuse?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]