nsivabalan commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r795049484
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -199,6 +260,21 @@ public Builder enable(boolean enable) {
return this;
}
+ public Builder withMetadataIndexBloomFilter(boolean enable) {
+ metadataConfig.setValue(ENABLE_METADATA_INDEX_BLOOM_FILTER,
String.valueOf(enable));
+ return this;
+ }
+
+ public Builder withMetadataIndexColumnStats(boolean enable) {
+ metadataConfig.setValue(ENABLE_METADATA_INDEX_COLUMN_STATS,
String.valueOf(enable));
+ return this;
+ }
+
+ public Builder withMetadataIndexForAllColumns(boolean enable) {
Review comment:
I don't see setter methods for file group count?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -125,30 +128,43 @@ private void initIfNeeded() {
return recordsByKeys.size() == 0 ? Option.empty() :
recordsByKeys.get(0).getValue();
}
- protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
getRecordsByKeys(List<String> keys, String partitionName) {
- Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
openReadersIfNeeded(keys.get(0), partitionName);
- try {
- List<Long> timings = new ArrayList<>();
- HoodieFileReader baseFileReader = readers.getKey();
- HoodieMetadataMergedLogRecordReader logRecordScanner =
readers.getRight();
+ @Override
+ protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
getRecordsByKeys(List<String> keys,
+
String partitionName) {
+ Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap =
getPartitionFileSlices(partitionName, keys);
Review comment:
I have added comments down below. But getPartitionFileSlices should
follow same semantics as openReadersIfNeeded. may be we should try to cache
them and reuse if possible. I see that we don't do that as of now in this
patch. Prior to this patch, this call was made within openReadersIfNeeded.
Or do you think there are any challenges with that?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -110,55 +199,97 @@ private HoodieMetadataPayload(String key, int type,
Map<String, HoodieMetadataFi
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionListRecord(List<String> partitions) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
- partitions.forEach(partition -> fileInfo.put(partition, new
HoodieMetadataFileInfo(0L, false)));
+ partitions.forEach(partition -> fileInfo.put(partition, new
HoodieMetadataFileInfo(0L, false)));
- HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.partitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+ HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.getPartitionPath());
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+ fileInfo);
return new HoodieRecord<>(key, payload);
}
/**
* Create and return a {@code HoodieMetadataPayload} to save list of files
within a partition.
*
- * @param partition The name of the partition
- * @param filesAdded Mapping of files to their sizes for files which have
been added to this partition
+ * @param partition The name of the partition
+ * @param filesAdded Mapping of files to their sizes for files which have
been added to this partition
* @param filesDeleted List of files which have been deleted from this
partition
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionFilesRecord(String partition,
-
Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+
Option<Map<String, Long>> filesAdded,
+
Option<List<String>> filesDeleted) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
filesAdded.ifPresent(
m -> m.forEach((filename, size) -> fileInfo.put(filename, new
HoodieMetadataFileInfo(size, false))));
filesDeleted.ifPresent(
- m -> m.forEach(filename -> fileInfo.put(filename, new
HoodieMetadataFileInfo(0L, true))));
+ m -> m.forEach(filename -> fileInfo.put(filename, new
HoodieMetadataFileInfo(0L, true))));
- HoodieKey key = new HoodieKey(partition,
MetadataPartitionType.FILES.partitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+ HoodieKey key = new HoodieKey(partition,
MetadataPartitionType.FILES.getPartitionPath());
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
return new HoodieRecord<>(key, payload);
}
+ /**
+ * Create bloom filter metadata record.
+ *
+ * @param partitionName - Partition name
+ * @param baseFileName - Base file name for which the bloom filter needs to
persisted
+ * @param timestamp - Instant timestamp responsible for this record
+ * @param bloomFilter - Bloom filter for the File
+ * @param isDeleted - Is the bloom filter no more valid
+ * @return Metadata payload containing the fileID and its bloom filter record
+ */
+ public static HoodieRecord<HoodieMetadataPayload>
createBloomFilterMetadataRecord(final String partitionName,
+
final String baseFileName,
+
final String timestamp,
+
final ByteBuffer bloomFilter,
+
final boolean isDeleted) {
+ ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR)
+ && FSUtils.isBaseFile(new Path(baseFileName)),
+ "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!");
+ final String bloomFilterKey = new
PartitionIndexID(partitionName).asBase64EncodedString()
+ .concat(new FileIndexID(baseFileName).asBase64EncodedString());
+ HoodieKey key = new HoodieKey(bloomFilterKey,
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+
+ // TODO: Get the bloom filter type from the file
+ HoodieMetadataBloomFilter metadataBloomFilter =
+ new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(),
+ timestamp, bloomFilter, isDeleted);
+ HoodieMetadataPayload metadataPayload = new
HoodieMetadataPayload(key.getRecordKey(),
+ HoodieMetadataPayload.METADATA_TYPE_BLOOM_FILTER, metadataBloomFilter);
+ return new HoodieRecord<>(key, metadataPayload);
+ }
+
@Override
public HoodieMetadataPayload preCombine(HoodieMetadataPayload
previousRecord) {
ValidationUtils.checkArgument(previousRecord.type == type,
- "Cannot combine " + previousRecord.type + " with " + type);
-
- Map<String, HoodieMetadataFileInfo> combinedFileInfo = null;
+ "Cannot combine " + previousRecord.type + " with " + type);
switch (type) {
- case PARTITION_LIST:
- case FILE_LIST:
- combinedFileInfo = combineFilesystemMetadata(previousRecord);
- break;
+ case METADATA_TYPE_PARTITION_LIST:
+ case METADATA_TYPE_FILE_LIST:
+ Map<String, HoodieMetadataFileInfo> combinedFileInfo =
combineFilesystemMetadata(previousRecord);
+ return new HoodieMetadataPayload(key, type, combinedFileInfo);
+ case METADATA_TYPE_BLOOM_FILTER:
+ HoodieMetadataBloomFilter combineBloomFilterMetadata =
combineBloomFilterMetadata(previousRecord);
+ return new HoodieMetadataPayload(key, type,
combineBloomFilterMetadata);
+ case METADATA_TYPE_COLUMN_STATS:
+ return new HoodieMetadataPayload(key, type,
combineColumnStatsMetadatat(previousRecord));
default:
throw new HoodieMetadataException("Unknown type of
HoodieMetadataPayload: " + type);
}
+ }
+
+ private HoodieMetadataBloomFilter
combineBloomFilterMetadata(HoodieMetadataPayload previousRecord) {
+ return this.bloomFilterMetadata;
+ }
- return new HoodieMetadataPayload(key, type, combinedFileInfo);
+ private HoodieColumnStats combineColumnStatsMetadatat(HoodieMetadataPayload
previousRecord) {
+ return this.columnStatMetadata;
Review comment:
just so I we are in same page.
for bloom filter partition, I understand either a file will be added or
deleted and there are no mutations as such. but for column stats, if a file is
updated, we fetch column stats from latest file and so we should be good to
completely ignore older version of the record?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -233,38 +249,69 @@ private void initIfNeeded() {
}
/**
- * Returns a new pair of readers to the base and log files.
+ * Get the file slice details for the given key in a partition.
+ *
+ * @param partitionName - Metadata partition name
+ * @param key - Key to get the file slice for
+ * @return Partition and file slice pair for the given key
*/
- private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader>
openReadersIfNeeded(String key, String partitionName) {
- return partitionReaders.computeIfAbsent(partitionName, k -> {
- try {
- final long baseFileOpenMs;
- final long logScannerOpenMs;
- HoodieFileReader baseFileReader = null;
- HoodieMetadataMergedLogRecordReader logRecordScanner = null;
+ private Pair<String, FileSlice> getPartitionFileSlice(final String
partitionName, final String key) {
+ // Metadata is in sync till the latest completed instant on the dataset
+ List<FileSlice> latestFileSlices =
+
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
partitionName);
+
+ final FileSlice slice =
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key,
+ latestFileSlices.size()));
+ return Pair.of(partitionName, slice);
+ }
+
+ /**
+ * Get the latest file slices for the interested keys in a given partition.
+ *
+ * @param partitionName - Partition to get the file slices from
+ * @param keys - Interested keys
+ * @return FileSlices for the keys
+ */
+ private Map<Pair<String, FileSlice>, List<String>>
getPartitionFileSlices(final String partitionName, final List<String> keys) {
+ // Metadata is in sync till the latest completed instant on the dataset
+ List<FileSlice> latestFileSlices =
+
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
partitionName);
+
+ Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap =
new HashMap<>();
+ for (String key : keys) {
+ final FileSlice slice =
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key,
+ latestFileSlices.size()));
+ final Pair<String, FileSlice> keyFileSlicePair = Pair.of(partitionName,
slice);
+ partitionFileSliceToKeysMap.computeIfAbsent(keyFileSlicePair, k -> new
ArrayList<>()).add(key);
+ }
+ return partitionFileSliceToKeysMap;
+ }
- // Metadata is in sync till the latest completed instant on the dataset
+ /**
+ * Create a file reader and the record scanner for a given partition and
file slice
+ * if readers are not already available.
+ *
+ * @param partitionName - Partition name
+ * @param slice - The file slice to open readers for
+ * @return File reader and the record scanner pair for the requested file
slice
+ */
+ private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader>
openReadersIfNeeded(String partitionName, FileSlice slice) {
+ return partitionReaders.computeIfAbsent(Pair.of(partitionName,
slice.getFileId()), k -> {
+ try {
HoodieTimer timer = new HoodieTimer().startTimer();
- List<FileSlice> latestFileSlices =
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
partitionName);
- if (latestFileSlices.size() == 0) {
- // empty partition
- return Pair.of(null, null);
- }
- ValidationUtils.checkArgument(latestFileSlices.size() == 1,
String.format("Invalid number of file slices: found=%d, required=%d",
latestFileSlices.size(), 1));
- final FileSlice slice =
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key,
latestFileSlices.size()));
// Open base file reader
Pair<HoodieFileReader, Long> baseFileReaderOpenTimePair =
getBaseFileReader(slice, timer);
- baseFileReader = baseFileReaderOpenTimePair.getKey();
- baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
+ HoodieFileReader baseFileReader = baseFileReaderOpenTimePair.getKey();
+ final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
// Open the log record scanner using the log files from the latest
file slice
- Pair<HoodieMetadataMergedLogRecordReader, Long>
logRecordScannerOpenTimePair = getLogRecordScanner(slice,
- partitionName);
- logRecordScanner = logRecordScannerOpenTimePair.getKey();
- logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
+ Pair<HoodieMetadataMergedLogRecordReader, Long>
logRecordScannerOpenTimePair = getLogRecordScanner(slice, partitionName);
+ HoodieMetadataMergedLogRecordReader logRecordScanner =
logRecordScannerOpenTimePair.getKey();
+ final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
- metrics.ifPresent(metrics ->
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs +
logScannerOpenMs));
+ metrics.ifPresent(metrics ->
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR,
Review comment:
minor. can we fix the name of the metric
(HoodieMetadataMetrics.SCAN_STR). this metric represents time to just open the
readers. not entire scan time.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -94,13 +136,60 @@ public HoodieMetadataPayload(Option<GenericRecord> record)
{
filesystemMetadata.put(k.toString(), new
HoodieMetadataFileInfo((Long) v.get("size"), (Boolean) v.get("isDeleted")));
});
}
+
+ if (type == METADATA_TYPE_BLOOM_FILTER) {
+ final GenericRecord metadataRecord = (GenericRecord)
record.get().get(SCHEMA_FIELD_ID_BLOOM_FILTER);
+ if (metadataRecord == null) {
+ throw new HoodieMetadataException("Valid " +
SCHEMA_FIELD_ID_BLOOM_FILTER + " record expected for type: " +
METADATA_TYPE_BLOOM_FILTER);
+ }
+ bloomFilterMetadata = new HoodieMetadataBloomFilter(
+ (String) metadataRecord.get(BLOOM_FILTER_FIELD_TYPE),
+ (String) metadataRecord.get(BLOOM_FILTER_FIELD_TIMESTAMP),
+ (ByteBuffer) metadataRecord.get(BLOOM_FILTER_FIELD_BLOOM_FILTER),
+ (Boolean) metadataRecord.get(BLOOM_FILTER_FIELD_IS_DELETED)
+ );
+ }
+
+ if (type == METADATA_TYPE_COLUMN_STATS) {
+ GenericRecord v = (GenericRecord)
record.get().get(SCHEMA_FIELD_ID_COLUMN_STATS);
+ if (v == null) {
+ throw new HoodieMetadataException("Valid " +
SCHEMA_FIELD_ID_COLUMN_STATS + " record expected for type: " +
METADATA_TYPE_COLUMN_STATS);
+ }
+ columnStatMetadata = new HoodieColumnStats(
+ (String) v.get(COLUMN_STATS_FIELD_RESOURCE_NAME),
+ (String) v.get(COLUMN_STATS_FIELD_MIN_VALUE),
+ (String) v.get(COLUMN_STATS_FIELD_MAX_VALUE),
+ (Long) v.get(COLUMN_STATS_FIELD_NULL_COUNT),
+ (Long) v.get(COLUMN_STATS_FIELD_VALUE_COUNT),
+ (Long) v.get(COLUMN_STATS_FIELD_TOTAL_SIZE),
+ (Long) v.get(COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE),
+ (Boolean) v.get(COLUMN_STATS_FIELD_IS_DELETED)
+ );
+ }
}
}
private HoodieMetadataPayload(String key, int type, Map<String,
HoodieMetadataFileInfo> filesystemMetadata) {
+ this(key, type, filesystemMetadata, null, null);
+ }
+
+ private HoodieMetadataPayload(String key, int type,
HoodieMetadataBloomFilter metadataBloomFilter) {
+ this(key, type, null, metadataBloomFilter, null);
+ }
+
+ private HoodieMetadataPayload(String key, int type, HoodieColumnStats
columnStats) {
Review comment:
do you think we should name this HoodieMetadataColumnStats to be in line
with HoodieMetadataBloomFilter ?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -134,22 +139,41 @@ public BloomFilter readBloomFilter() {
}
@Override
- public Set<String> filterRowKeys(Set candidateRowKeys) {
- // Current implementation reads all records and filters them. In certain
cases, it many be better to:
- // 1. Scan a limited subset of keys (min/max range of candidateRowKeys)
- // 2. Lookup keys individually (if the size of candidateRowKeys is much
less than the total keys in file)
- try {
- List<Pair<String, R>> allRecords = readAllRecords();
- Set<String> rowKeys = new HashSet<>();
- allRecords.forEach(t -> {
- if (candidateRowKeys.contains(t.getFirst())) {
- rowKeys.add(t.getFirst());
- }
- });
- return rowKeys;
- } catch (IOException e) {
- throw new HoodieIOException("Failed to read row keys from " + path, e);
+ public Set<String> filterRowKeys(Set<String> candidateRowKeys) {
+ return candidateRowKeys.stream().filter(k -> {
+ try {
+ return isKeyAvailable(k);
Review comment:
I see that prior to this patch, we were reading all records and then
looking up the cadidateRowKeys. here we are doing pointed look up for each key
at a time. Is this based on some perf tuning? will sync up w/ you f2f to
understand more. Can you add a line in java doc that this method expects
candidate keys to be sorted. So, in future if someone develops more code using
this, would know the contract from this method.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -125,30 +128,43 @@ private void initIfNeeded() {
return recordsByKeys.size() == 0 ? Option.empty() :
recordsByKeys.get(0).getValue();
}
- protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
getRecordsByKeys(List<String> keys, String partitionName) {
- Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
openReadersIfNeeded(keys.get(0), partitionName);
- try {
- List<Long> timings = new ArrayList<>();
- HoodieFileReader baseFileReader = readers.getKey();
- HoodieMetadataMergedLogRecordReader logRecordScanner =
readers.getRight();
+ @Override
+ protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
getRecordsByKeys(List<String> keys,
+
String partitionName) {
+ Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap =
getPartitionFileSlices(partitionName, keys);
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result =
new ArrayList<>();
+ AtomicInteger fileSlicesKeysCount = new AtomicInteger();
+ partitionFileSliceToKeysMap.forEach((partitionFileSlicePair,
fileSliceKeys) -> {
+ Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
openReadersIfNeeded(partitionName,
+ partitionFileSlicePair.getRight());
+ try {
+ List<Long> timings = new ArrayList<>();
+ HoodieFileReader baseFileReader = readers.getKey();
+ HoodieMetadataMergedLogRecordReader logRecordScanner =
readers.getRight();
- if (baseFileReader == null && logRecordScanner == null) {
- return Collections.emptyList();
- }
+ if (baseFileReader == null && logRecordScanner == null) {
+ return;
+ }
- // local map to assist in merging with base file records
- Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
readLogRecords(logRecordScanner, keys, timings);
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result =
readFromBaseAndMergeWithLogRecords(
- baseFileReader, keys, logRecords, timings, partitionName);
- LOG.info(String.format("Metadata read for %s keys took [baseFileRead,
logMerge] %s ms", keys.size(), timings));
- return result;
- } catch (IOException ioe) {
- throw new HoodieIOException("Error merging records from metadata table
for " + keys.size() + " key : ", ioe);
- } finally {
- if (!reuse) {
- close(partitionName);
+ // local map to assist in merging with base file records
+ Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
readLogRecords(logRecordScanner,
+ fileSliceKeys, timings);
+ result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader,
fileSliceKeys, logRecords,
+ timings, partitionName));
+ LOG.debug(String.format("Metadata read for %s keys took [baseFileRead,
logMerge] %s ms",
+ fileSliceKeys.size(), timings));
+ fileSlicesKeysCount.addAndGet(fileSliceKeys.size());
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Error merging records from metadata table
for " + keys.size() + " key : ", ioe);
+ } finally {
+ if (!reuse) {
+ close(Pair.of(partitionFileSlicePair.getLeft(),
partitionFileSlicePair.getRight().getFileId()));
+ }
}
- }
+ });
+
+ ValidationUtils.checkState(keys.size() == fileSlicesKeysCount.get());
Review comment:
Are there chances that if a key is deleted (lets say a file is deleted)
and do get called with getRecordsByKey() with invalid/deleted key. wouldn't
that return empt when we do readFromBaseAndMergeWithLogRecords. So, total
incoming might differ from result list is it?
Or do we atleast return Option.empty and so the total entries should match
here for such cases?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -233,38 +249,69 @@ private void initIfNeeded() {
}
/**
- * Returns a new pair of readers to the base and log files.
+ * Get the file slice details for the given key in a partition.
+ *
+ * @param partitionName - Metadata partition name
+ * @param key - Key to get the file slice for
+ * @return Partition and file slice pair for the given key
*/
- private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader>
openReadersIfNeeded(String key, String partitionName) {
- return partitionReaders.computeIfAbsent(partitionName, k -> {
- try {
- final long baseFileOpenMs;
- final long logScannerOpenMs;
- HoodieFileReader baseFileReader = null;
- HoodieMetadataMergedLogRecordReader logRecordScanner = null;
+ private Pair<String, FileSlice> getPartitionFileSlice(final String
partitionName, final String key) {
+ // Metadata is in sync till the latest completed instant on the dataset
+ List<FileSlice> latestFileSlices =
+
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
partitionName);
+
+ final FileSlice slice =
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key,
+ latestFileSlices.size()));
+ return Pair.of(partitionName, slice);
+ }
+
+ /**
+ * Get the latest file slices for the interested keys in a given partition.
+ *
+ * @param partitionName - Partition to get the file slices from
+ * @param keys - Interested keys
+ * @return FileSlices for the keys
+ */
+ private Map<Pair<String, FileSlice>, List<String>>
getPartitionFileSlices(final String partitionName, final List<String> keys) {
Review comment:
can we find a right name for this method. this is essentially mapping
keys to file slices. but the return value is mapped the reverse way. but
getPartitionFileSlices() does not convey that.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -79,14 +98,53 @@ public static void deleteMetadataTable(String basePath,
HoodieEngineContext cont
}
}
+ /**
+ * Convert commit action to metadata records for the enabled partition types.
+ *
+ * @param commitMetadata - Commit action metadata
+ * @param dataMetaClient - Meta client for the data
table
+ * @param isMetaIndexColumnStatsForAllColumns - Do all columns need meta
indexing?
+ * @param instantTime - Action instant time
+ * @return Map of partition to metadata records for the commit action
+ */
+ public static Map<MetadataPartitionType, HoodieData<HoodieRecord>>
convertMetadataToRecords(
+ HoodieEngineContext context, List<MetadataPartitionType>
enabledPartitionTypes,
+ HoodieCommitMetadata commitMetadata, HoodieTableMetaClient
dataMetaClient,
+ boolean isMetaIndexColumnStatsForAllColumns, String instantTime) {
+ final Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordsMap = new HashMap<>();
+ final HoodieData<HoodieRecord> filesPartitionRecordsRDD =
context.parallelize(
+ convertMetadataToFilesPartitionRecords(commitMetadata, instantTime),
1);
+ partitionToRecordsMap.put(MetadataPartitionType.FILES,
filesPartitionRecordsRDD);
+
+ if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) {
+ final List<HoodieRecord> metadataBloomFilterRecords =
convertMetadataToBloomFilterRecords(commitMetadata,
+ dataMetaClient, instantTime);
+ if (!metadataBloomFilterRecords.isEmpty()) {
+ final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD =
context.parallelize(metadataBloomFilterRecords, 1);
+ partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS,
metadataBloomFilterRecordsRDD);
+ }
+ }
+
+ if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) {
+ final List<HoodieRecord> metadataColumnStats =
convertMetadataToColumnStatsRecords(commitMetadata, context,
Review comment:
same comment as above.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -110,55 +199,97 @@ private HoodieMetadataPayload(String key, int type,
Map<String, HoodieMetadataFi
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionListRecord(List<String> partitions) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
- partitions.forEach(partition -> fileInfo.put(partition, new
HoodieMetadataFileInfo(0L, false)));
+ partitions.forEach(partition -> fileInfo.put(partition, new
HoodieMetadataFileInfo(0L, false)));
- HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.partitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+ HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.getPartitionPath());
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+ fileInfo);
return new HoodieRecord<>(key, payload);
}
/**
* Create and return a {@code HoodieMetadataPayload} to save list of files
within a partition.
*
- * @param partition The name of the partition
- * @param filesAdded Mapping of files to their sizes for files which have
been added to this partition
+ * @param partition The name of the partition
+ * @param filesAdded Mapping of files to their sizes for files which have
been added to this partition
* @param filesDeleted List of files which have been deleted from this
partition
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionFilesRecord(String partition,
-
Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+
Option<Map<String, Long>> filesAdded,
+
Option<List<String>> filesDeleted) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
filesAdded.ifPresent(
m -> m.forEach((filename, size) -> fileInfo.put(filename, new
HoodieMetadataFileInfo(size, false))));
filesDeleted.ifPresent(
- m -> m.forEach(filename -> fileInfo.put(filename, new
HoodieMetadataFileInfo(0L, true))));
+ m -> m.forEach(filename -> fileInfo.put(filename, new
HoodieMetadataFileInfo(0L, true))));
- HoodieKey key = new HoodieKey(partition,
MetadataPartitionType.FILES.partitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+ HoodieKey key = new HoodieKey(partition,
MetadataPartitionType.FILES.getPartitionPath());
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
return new HoodieRecord<>(key, payload);
}
+ /**
+ * Create bloom filter metadata record.
+ *
+ * @param partitionName - Partition name
+ * @param baseFileName - Base file name for which the bloom filter needs to
persisted
+ * @param timestamp - Instant timestamp responsible for this record
+ * @param bloomFilter - Bloom filter for the File
+ * @param isDeleted - Is the bloom filter no more valid
+ * @return Metadata payload containing the fileID and its bloom filter record
+ */
+ public static HoodieRecord<HoodieMetadataPayload>
createBloomFilterMetadataRecord(final String partitionName,
+
final String baseFileName,
+
final String timestamp,
+
final ByteBuffer bloomFilter,
+
final boolean isDeleted) {
+ ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR)
+ && FSUtils.isBaseFile(new Path(baseFileName)),
+ "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!");
+ final String bloomFilterKey = new
PartitionIndexID(partitionName).asBase64EncodedString()
+ .concat(new FileIndexID(baseFileName).asBase64EncodedString());
+ HoodieKey key = new HoodieKey(bloomFilterKey,
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+
+ // TODO: Get the bloom filter type from the file
+ HoodieMetadataBloomFilter metadataBloomFilter =
+ new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(),
Review comment:
if someone has explicitly set SIMPLE as bloom type for the data table,
this might crash right. probably good to fix in this patch only.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,12 +156,123 @@ protected BaseTableMetadata(HoodieEngineContext
engineContext, HoodieMetadataCon
.getAllFilesInPartitions(partitions);
}
+ @Override
+ public Option<ByteBuffer> getBloomFilter(final String partitionName, final
String fileName)
+ throws HoodieMetadataException {
+ if (!isBloomFilterIndexEnabled) {
+ LOG.error("Metadata bloom filter index is disabled!");
+ return Option.empty();
+ }
+
+ final Pair<String, String> partitionFileName = Pair.of(partitionName,
fileName);
+ Map<Pair<String, String>, ByteBuffer> bloomFilters =
getBloomFilters(Collections.singletonList(partitionFileName));
+ if (bloomFilters.isEmpty()) {
+ LOG.error("Meta index: missing bloom filter for partition: " +
partitionName + ", file: " + fileName);
+ return Option.empty();
+ }
+
+ ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName));
+ return Option.of(bloomFilters.get(partitionFileName));
+ }
+
+ @Override
+ public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final
List<Pair<String, String>> partitionNameFileNameList)
+ throws HoodieMetadataException {
+ if (!isBloomFilterIndexEnabled) {
+ LOG.error("Metadata bloom filter index is disabled!");
+ return Collections.emptyMap();
+ }
+ if (partitionNameFileNameList.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+ Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+ partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
+ final String bloomKey = new
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()
+ .concat(new
FileIndexID(partitionNameFileNamePair.getRight()).asBase64EncodedString());
+ partitionIDFileIDSortedStrings.add(bloomKey);
+ fileToKeyMap.put(bloomKey, partitionNameFileNamePair);
+ }
+ );
+
+ List<String> partitionIDFileIDStrings = new
ArrayList<>(partitionIDFileIDSortedStrings);
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
hoodieRecordList =
+ getRecordsByKeys(partitionIDFileIDStrings,
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR,
+ (timer.endTimer() / partitionIDFileIDStrings.size())));
+
+ Map<Pair<String, String>, ByteBuffer> partitionFileToBloomFilterMap = new
HashMap<>();
+ for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry
: hoodieRecordList) {
+ if (entry.getRight().isPresent()) {
+ final Option<HoodieMetadataBloomFilter> bloomFilterMetadata =
+ entry.getRight().get().getData().getBloomFilterMetadata();
+ if (bloomFilterMetadata.isPresent()) {
+ if (!bloomFilterMetadata.get().getIsDeleted()) {
+
ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft()));
+
partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()),
bloomFilterMetadata.get().getBloomFilter());
+ }
+ } else {
+ LOG.error("Meta index bloom filter missing for: " +
fileToKeyMap.get(entry.getLeft()));
+ }
+ }
+ }
+ return partitionFileToBloomFilterMap;
+ }
+
+ @Override
+ public Map<Pair<String, String>, HoodieColumnStats> getColumnStats(final
List<Pair<String, String>> partitionNameFileNameList, final String columnName)
+ throws HoodieMetadataException {
+ if (!isColumnStatsIndexEnabled) {
+ LOG.error("Metadata column stats index is disabled!");
+ return Collections.emptyMap();
+ }
+
+ Map<String, Pair<String, String>> columnStatKeyToFileNameMap = new
HashMap<>();
+ TreeSet<String> sortedKeys = new TreeSet<>();
+ final String columnIndexStr = new
ColumnIndexID(columnName).asBase64EncodedString();
+ for (Pair<String, String> partitionNameFileNamePair :
partitionNameFileNameList) {
+ final String columnStatIndexKey = columnIndexStr
+ .concat(new
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString())
Review comment:
same comment as above. I guess we have a static method in
HoodieMetadataPayload to construct the key. can we use the same.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
##########
@@ -30,16 +28,21 @@
private final String columnName;
private final T minValue;
private final T maxValue;
- private final long numNulls;
- private final PrimitiveStringifier stringifier;
+ private final long nullCount;
+ private final long valueCount;
+ private final long totalSize;
+ private final long totalUncompressedSize;
- public HoodieColumnRangeMetadata(final String filePath, final String
columnName, final T minValue, final T maxValue, final long numNulls, final
PrimitiveStringifier stringifier) {
+ public HoodieColumnRangeMetadata(final String filePath, final String
columnName, final T minValue, final T maxValue,
Review comment:
should we rename this class to HoodieColumnStatsMetadata or something.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -233,38 +249,69 @@ private void initIfNeeded() {
}
/**
- * Returns a new pair of readers to the base and log files.
+ * Get the file slice details for the given key in a partition.
+ *
+ * @param partitionName - Metadata partition name
+ * @param key - Key to get the file slice for
+ * @return Partition and file slice pair for the given key
*/
- private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader>
openReadersIfNeeded(String key, String partitionName) {
- return partitionReaders.computeIfAbsent(partitionName, k -> {
- try {
- final long baseFileOpenMs;
- final long logScannerOpenMs;
- HoodieFileReader baseFileReader = null;
- HoodieMetadataMergedLogRecordReader logRecordScanner = null;
+ private Pair<String, FileSlice> getPartitionFileSlice(final String
partitionName, final String key) {
+ // Metadata is in sync till the latest completed instant on the dataset
+ List<FileSlice> latestFileSlices =
+
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
partitionName);
+
+ final FileSlice slice =
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key,
+ latestFileSlices.size()));
+ return Pair.of(partitionName, slice);
+ }
+
+ /**
+ * Get the latest file slices for the interested keys in a given partition.
+ *
+ * @param partitionName - Partition to get the file slices from
+ * @param keys - Interested keys
+ * @return FileSlices for the keys
+ */
+ private Map<Pair<String, FileSlice>, List<String>>
getPartitionFileSlices(final String partitionName, final List<String> keys) {
+ // Metadata is in sync till the latest completed instant on the dataset
+ List<FileSlice> latestFileSlices =
Review comment:
something to follow up after this patch may be.
I see we call getPartitionLatestMergedFileSlices and getPartitionFileSlices
in few places and could be repeated as well. Can we cache the return value
based on latest instant. If latest instant has not changed, then
latestFileSlice is not going to change right. So, might as well used the cached
copy if we have one.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -79,14 +98,53 @@ public static void deleteMetadataTable(String basePath,
HoodieEngineContext cont
}
}
+ /**
+ * Convert commit action to metadata records for the enabled partition types.
+ *
+ * @param commitMetadata - Commit action metadata
+ * @param dataMetaClient - Meta client for the data
table
+ * @param isMetaIndexColumnStatsForAllColumns - Do all columns need meta
indexing?
+ * @param instantTime - Action instant time
+ * @return Map of partition to metadata records for the commit action
+ */
+ public static Map<MetadataPartitionType, HoodieData<HoodieRecord>>
convertMetadataToRecords(
+ HoodieEngineContext context, List<MetadataPartitionType>
enabledPartitionTypes,
+ HoodieCommitMetadata commitMetadata, HoodieTableMetaClient
dataMetaClient,
+ boolean isMetaIndexColumnStatsForAllColumns, String instantTime) {
+ final Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordsMap = new HashMap<>();
+ final HoodieData<HoodieRecord> filesPartitionRecordsRDD =
context.parallelize(
+ convertMetadataToFilesPartitionRecords(commitMetadata, instantTime),
1);
+ partitionToRecordsMap.put(MetadataPartitionType.FILES,
filesPartitionRecordsRDD);
+
+ if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) {
+ final List<HoodieRecord> metadataBloomFilterRecords =
convertMetadataToBloomFilterRecords(commitMetadata,
Review comment:
if this is happening in the driver, can we try to parallelize across
executors. reading bloom could add some latency if there are too many files.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -110,55 +199,97 @@ private HoodieMetadataPayload(String key, int type,
Map<String, HoodieMetadataFi
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionListRecord(List<String> partitions) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
- partitions.forEach(partition -> fileInfo.put(partition, new
HoodieMetadataFileInfo(0L, false)));
+ partitions.forEach(partition -> fileInfo.put(partition, new
HoodieMetadataFileInfo(0L, false)));
- HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.partitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+ HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.getPartitionPath());
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+ fileInfo);
return new HoodieRecord<>(key, payload);
}
/**
* Create and return a {@code HoodieMetadataPayload} to save list of files
within a partition.
*
- * @param partition The name of the partition
- * @param filesAdded Mapping of files to their sizes for files which have
been added to this partition
+ * @param partition The name of the partition
+ * @param filesAdded Mapping of files to their sizes for files which have
been added to this partition
* @param filesDeleted List of files which have been deleted from this
partition
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionFilesRecord(String partition,
-
Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+
Option<Map<String, Long>> filesAdded,
+
Option<List<String>> filesDeleted) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
filesAdded.ifPresent(
m -> m.forEach((filename, size) -> fileInfo.put(filename, new
HoodieMetadataFileInfo(size, false))));
filesDeleted.ifPresent(
- m -> m.forEach(filename -> fileInfo.put(filename, new
HoodieMetadataFileInfo(0L, true))));
+ m -> m.forEach(filename -> fileInfo.put(filename, new
HoodieMetadataFileInfo(0L, true))));
- HoodieKey key = new HoodieKey(partition,
MetadataPartitionType.FILES.partitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+ HoodieKey key = new HoodieKey(partition,
MetadataPartitionType.FILES.getPartitionPath());
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
return new HoodieRecord<>(key, payload);
}
+ /**
+ * Create bloom filter metadata record.
+ *
+ * @param partitionName - Partition name
+ * @param baseFileName - Base file name for which the bloom filter needs to
persisted
+ * @param timestamp - Instant timestamp responsible for this record
+ * @param bloomFilter - Bloom filter for the File
+ * @param isDeleted - Is the bloom filter no more valid
+ * @return Metadata payload containing the fileID and its bloom filter record
+ */
+ public static HoodieRecord<HoodieMetadataPayload>
createBloomFilterMetadataRecord(final String partitionName,
+
final String baseFileName,
+
final String timestamp,
+
final ByteBuffer bloomFilter,
+
final boolean isDeleted) {
+ ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR)
+ && FSUtils.isBaseFile(new Path(baseFileName)),
+ "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!");
+ final String bloomFilterKey = new
PartitionIndexID(partitionName).asBase64EncodedString()
Review comment:
can we use the same name everywhere. may be bloomFilterIndexKey.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -233,38 +249,69 @@ private void initIfNeeded() {
}
/**
- * Returns a new pair of readers to the base and log files.
+ * Get the file slice details for the given key in a partition.
+ *
+ * @param partitionName - Metadata partition name
+ * @param key - Key to get the file slice for
+ * @return Partition and file slice pair for the given key
*/
- private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader>
openReadersIfNeeded(String key, String partitionName) {
- return partitionReaders.computeIfAbsent(partitionName, k -> {
- try {
- final long baseFileOpenMs;
- final long logScannerOpenMs;
- HoodieFileReader baseFileReader = null;
- HoodieMetadataMergedLogRecordReader logRecordScanner = null;
+ private Pair<String, FileSlice> getPartitionFileSlice(final String
partitionName, final String key) {
Review comment:
I guess this method is not used anywhere. can you check.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -124,14 +182,111 @@ public static void deleteMetadataTable(String basePath,
HoodieEngineContext cont
return records;
}
+ /**
+ * Convert commit action metadata to bloom filter records.
+ *
+ * @param commitMetadata - Commit action metadata
+ * @param dataMetaClient - Meta client for the data table
+ * @param instantTime - Action instant time
+ * @return List of metadata table records
+ */
+ public static List<HoodieRecord>
convertMetadataToBloomFilterRecords(HoodieCommitMetadata commitMetadata,
+
HoodieTableMetaClient dataMetaClient,
+ String
instantTime) {
+ List<HoodieRecord> records = new LinkedList<>();
+ commitMetadata.getPartitionToWriteStats().forEach((partitionStatName,
writeStats) -> {
Review comment:
if the commit metadata has only log files, we won't return anything from
this method right. can you confirm that our writes still go through? or do we
have any check that return list should match num files in commit metadata or
atleast not empty ?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -319,9 +616,88 @@ private static void
processRollbackMetadata(HoodieActiveTimeline metadataTableTi
return records;
}
+ /**
+ * Convert rollback action metadata to bloom filter index records.
+ */
+ private static List<HoodieRecord>
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+
HoodieTableMetaClient dataMetaClient,
+
Map<String, List<String>> partitionToDeletedFiles,
+
Map<String, Map<String, Long>> partitionToAppendedFiles,
+ String
instantTime) {
+ List<HoodieRecord> records = new LinkedList<>();
+ partitionToDeletedFiles.forEach((partitionName, deletedFileList) ->
deletedFileList.forEach(deletedFile -> {
+ if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+ return;
+ }
+
+ final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
+ records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+ partition, deletedFile, instantTime, ByteBuffer.allocate(0), true));
+ }));
+
+ partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
+ final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
+ appendedFileMap.forEach((appendedFile, length) -> {
+ if (!FSUtils.isBaseFile(new Path(appendedFile))) {
+ return;
+ }
+ final String pathWithPartition = partitionName + "/" + appendedFile;
+ final Path appendedFilePath = new Path(dataMetaClient.getBasePath(),
pathWithPartition);
+ try {
+ HoodieFileReader<IndexedRecord> fileReader =
+
HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(),
appendedFilePath);
+ final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
+ if (fileBloomFilter == null) {
+ LOG.error("Failed to read bloom filter for " + appendedFilePath);
+ return;
+ }
+ ByteBuffer bloomByteBuffer =
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
+ HoodieRecord record =
HoodieMetadataPayload.createBloomFilterMetadataRecord(
+ partition, appendedFile, instantTime, bloomByteBuffer, false);
+ records.add(record);
+ fileReader.close();
+ } catch (IOException e) {
+ LOG.error("Failed to get bloom filter for file: " +
appendedFilePath);
+ }
+ });
+ });
+ return records;
+ }
+
+ /**
+ * Convert rollback action metadata to column stats index records.
+ */
+ private static List<HoodieRecord>
convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
+
HoodieTableMetaClient datasetMetaClient,
+
Map<String, List<String>> partitionToDeletedFiles,
+
Map<String, Map<String, Long>> partitionToAppendedFiles,
+ String
instantTime) {
+ List<HoodieRecord> records = new LinkedList<>();
+ List<String> latestColumns = getLatestColumns(datasetMetaClient);
+ partitionToDeletedFiles.forEach((partitionName, deletedFileList) ->
deletedFileList.forEach(deletedFile -> {
Review comment:
I did leave this comment for bloom filter as well. Since reading col
stats could be latency sensitive if large no of files in a commit metadata, is
there a possibility to parallelize the reads across diff files?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -399,4 +775,116 @@ public static int mapRecordKeyToFileGroupIndex(String
recordKey, int numFileGrou
return fileSliceStream.sorted((s1, s2) ->
s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList());
}
+ public static List<HoodieRecord>
convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
+
HoodieEngineContext engineContext,
+
HoodieTableMetaClient dataMetaClient,
+ boolean
isMetaIndexColumnStatsForAllColumns,
+ String
instantTime) {
+
+ try {
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(entry -> entry.stream()).collect(Collectors.toList());
+ return
HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext,
dataMetaClient, allWriteStats,
+ isMetaIndexColumnStatsForAllColumns);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate column stats records for
metadata table ", e);
+ }
+ }
+
+ /**
+ * Create column stats from write status.
+ *
+ * @param engineContext - Enging context
+ * @param datasetMetaClient - Dataset meta client
+ * @param allWriteStats - Write status to convert
+ * @param isMetaIndexColumnStatsForAllColumns - Are all columns enabled for
indexing
+ */
+ public static List<HoodieRecord>
createColumnStatsFromWriteStats(HoodieEngineContext engineContext,
+
HoodieTableMetaClient datasetMetaClient,
+
List<HoodieWriteStat> allWriteStats,
+ boolean
isMetaIndexColumnStatsForAllColumns) throws Exception {
+ if (allWriteStats.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<HoodieWriteStat> prunedWriteStats =
allWriteStats.stream().filter(writeStat -> {
+ return !(writeStat instanceof HoodieDeltaWriteStat);
+ }).collect(Collectors.toList());
+ if (prunedWriteStats.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ return engineContext.flatMap(prunedWriteStats,
+ writeStat -> translateWriteStatToColumnStats(writeStat,
datasetMetaClient,
+ getLatestColumns(datasetMetaClient,
isMetaIndexColumnStatsForAllColumns)),
+ prunedWriteStats.size());
+ }
+
+ /**
+ * Get the latest columns for the table for column stats indexing.
+ *
+ * @param datasetMetaClient - Data table meta client
+ * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing
enabled for all columns
+ */
+ private static List<String> getLatestColumns(HoodieTableMetaClient
datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
+ if (!isMetaIndexColumnStatsForAllColumns
+ ||
datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants()
< 1) {
+ return
Collections.singletonList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp());
+ }
+
+ TableSchemaResolver schemaResolver = new
TableSchemaResolver(datasetMetaClient);
+ // consider nested fields as well. if column stats is enabled only for a
subset of columns,
+ // directly use them instead of all columns from the latest table schema
+ try {
+ return schemaResolver.getTableAvroSchema().getFields().stream()
+ .map(entry -> entry.name()).collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new HoodieException("Failed to get latest columns for " +
datasetMetaClient.getBasePath());
+ }
+ }
+
+ private static List<String> getLatestColumns(HoodieTableMetaClient
datasetMetaClient) {
+ return getLatestColumns(datasetMetaClient, false);
+ }
+
+ public static Stream<HoodieRecord>
translateWriteStatToColumnStats(HoodieWriteStat writeStat,
+
HoodieTableMetaClient datasetMetaClient,
+
List<String> latestColumns) {
+ return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(),
datasetMetaClient, latestColumns, false);
+
+ }
+
+ public static Stream<HoodieRecord> getColumnStats(final String
partitionPath, final String filePathWithPartition,
Review comment:
can you please revisit access specifiers for new methods added. intellij
might show yellow color against these. don't think these are required to be
public.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -399,4 +775,116 @@ public static int mapRecordKeyToFileGroupIndex(String
recordKey, int numFileGrou
return fileSliceStream.sorted((s1, s2) ->
s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList());
}
+ public static List<HoodieRecord>
convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
+
HoodieEngineContext engineContext,
+
HoodieTableMetaClient dataMetaClient,
+ boolean
isMetaIndexColumnStatsForAllColumns,
+ String
instantTime) {
+
+ try {
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(entry -> entry.stream()).collect(Collectors.toList());
+ return
HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext,
dataMetaClient, allWriteStats,
+ isMetaIndexColumnStatsForAllColumns);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate column stats records for
metadata table ", e);
+ }
+ }
+
+ /**
+ * Create column stats from write status.
+ *
+ * @param engineContext - Enging context
+ * @param datasetMetaClient - Dataset meta client
+ * @param allWriteStats - Write status to convert
+ * @param isMetaIndexColumnStatsForAllColumns - Are all columns enabled for
indexing
+ */
+ public static List<HoodieRecord>
createColumnStatsFromWriteStats(HoodieEngineContext engineContext,
+
HoodieTableMetaClient datasetMetaClient,
+
List<HoodieWriteStat> allWriteStats,
+ boolean
isMetaIndexColumnStatsForAllColumns) throws Exception {
+ if (allWriteStats.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<HoodieWriteStat> prunedWriteStats =
allWriteStats.stream().filter(writeStat -> {
+ return !(writeStat instanceof HoodieDeltaWriteStat);
+ }).collect(Collectors.toList());
+ if (prunedWriteStats.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ return engineContext.flatMap(prunedWriteStats,
+ writeStat -> translateWriteStatToColumnStats(writeStat,
datasetMetaClient,
+ getLatestColumns(datasetMetaClient,
isMetaIndexColumnStatsForAllColumns)),
+ prunedWriteStats.size());
+ }
+
+ /**
+ * Get the latest columns for the table for column stats indexing.
+ *
+ * @param datasetMetaClient - Data table meta client
+ * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing
enabled for all columns
+ */
+ private static List<String> getLatestColumns(HoodieTableMetaClient
datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
+ if (!isMetaIndexColumnStatsForAllColumns
+ ||
datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants()
< 1) {
+ return
Collections.singletonList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp());
+ }
+
+ TableSchemaResolver schemaResolver = new
TableSchemaResolver(datasetMetaClient);
+ // consider nested fields as well. if column stats is enabled only for a
subset of columns,
Review comment:
do we have a tracking jira for this.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,12 +156,123 @@ protected BaseTableMetadata(HoodieEngineContext
engineContext, HoodieMetadataCon
.getAllFilesInPartitions(partitions);
}
+ @Override
+ public Option<ByteBuffer> getBloomFilter(final String partitionName, final
String fileName)
+ throws HoodieMetadataException {
+ if (!isBloomFilterIndexEnabled) {
+ LOG.error("Metadata bloom filter index is disabled!");
+ return Option.empty();
+ }
+
+ final Pair<String, String> partitionFileName = Pair.of(partitionName,
fileName);
+ Map<Pair<String, String>, ByteBuffer> bloomFilters =
getBloomFilters(Collections.singletonList(partitionFileName));
+ if (bloomFilters.isEmpty()) {
+ LOG.error("Meta index: missing bloom filter for partition: " +
partitionName + ", file: " + fileName);
+ return Option.empty();
+ }
+
+ ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName));
+ return Option.of(bloomFilters.get(partitionFileName));
+ }
+
+ @Override
+ public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final
List<Pair<String, String>> partitionNameFileNameList)
+ throws HoodieMetadataException {
+ if (!isBloomFilterIndexEnabled) {
+ LOG.error("Metadata bloom filter index is disabled!");
+ return Collections.emptyMap();
+ }
+ if (partitionNameFileNameList.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+ Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+ partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
+ final String bloomKey = new
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()
+ .concat(new
FileIndexID(partitionNameFileNamePair.getRight()).asBase64EncodedString());
+ partitionIDFileIDSortedStrings.add(bloomKey);
+ fileToKeyMap.put(bloomKey, partitionNameFileNamePair);
+ }
+ );
+
+ List<String> partitionIDFileIDStrings = new
ArrayList<>(partitionIDFileIDSortedStrings);
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
hoodieRecordList =
+ getRecordsByKeys(partitionIDFileIDStrings,
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR,
+ (timer.endTimer() / partitionIDFileIDStrings.size())));
+
+ Map<Pair<String, String>, ByteBuffer> partitionFileToBloomFilterMap = new
HashMap<>();
+ for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry
: hoodieRecordList) {
+ if (entry.getRight().isPresent()) {
+ final Option<HoodieMetadataBloomFilter> bloomFilterMetadata =
+ entry.getRight().get().getData().getBloomFilterMetadata();
+ if (bloomFilterMetadata.isPresent()) {
+ if (!bloomFilterMetadata.get().getIsDeleted()) {
+
ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft()));
+
partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()),
bloomFilterMetadata.get().getBloomFilter());
+ }
+ } else {
+ LOG.error("Meta index bloom filter missing for: " +
fileToKeyMap.get(entry.getLeft()));
+ }
+ }
+ }
+ return partitionFileToBloomFilterMap;
+ }
+
+ @Override
+ public Map<Pair<String, String>, HoodieColumnStats> getColumnStats(final
List<Pair<String, String>> partitionNameFileNameList, final String columnName)
+ throws HoodieMetadataException {
+ if (!isColumnStatsIndexEnabled) {
+ LOG.error("Metadata column stats index is disabled!");
+ return Collections.emptyMap();
+ }
+
+ Map<String, Pair<String, String>> columnStatKeyToFileNameMap = new
HashMap<>();
+ TreeSet<String> sortedKeys = new TreeSet<>();
+ final String columnIndexStr = new
ColumnIndexID(columnName).asBase64EncodedString();
+ for (Pair<String, String> partitionNameFileNamePair :
partitionNameFileNameList) {
+ final String columnStatIndexKey = columnIndexStr
Review comment:
guess we are using "Index" everywhere. so, can we add "s" to the name.
(columnStat**s**IndexKey)
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,12 +156,123 @@ protected BaseTableMetadata(HoodieEngineContext
engineContext, HoodieMetadataCon
.getAllFilesInPartitions(partitions);
}
+ @Override
+ public Option<ByteBuffer> getBloomFilter(final String partitionName, final
String fileName)
+ throws HoodieMetadataException {
+ if (!isBloomFilterIndexEnabled) {
+ LOG.error("Metadata bloom filter index is disabled!");
+ return Option.empty();
+ }
+
+ final Pair<String, String> partitionFileName = Pair.of(partitionName,
fileName);
+ Map<Pair<String, String>, ByteBuffer> bloomFilters =
getBloomFilters(Collections.singletonList(partitionFileName));
+ if (bloomFilters.isEmpty()) {
+ LOG.error("Meta index: missing bloom filter for partition: " +
partitionName + ", file: " + fileName);
+ return Option.empty();
+ }
+
+ ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName));
+ return Option.of(bloomFilters.get(partitionFileName));
+ }
+
+ @Override
+ public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final
List<Pair<String, String>> partitionNameFileNameList)
+ throws HoodieMetadataException {
+ if (!isBloomFilterIndexEnabled) {
+ LOG.error("Metadata bloom filter index is disabled!");
+ return Collections.emptyMap();
+ }
+ if (partitionNameFileNameList.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+ Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+ partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
+ final String bloomKey = new
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()
Review comment:
can we use static method getBloomFilterIndexKey() in
HoodieMetadataPayload to construct the key where ever needed.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -110,55 +199,97 @@ private HoodieMetadataPayload(String key, int type,
Map<String, HoodieMetadataFi
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionListRecord(List<String> partitions) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
- partitions.forEach(partition -> fileInfo.put(partition, new
HoodieMetadataFileInfo(0L, false)));
+ partitions.forEach(partition -> fileInfo.put(partition, new
HoodieMetadataFileInfo(0L, false)));
- HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.partitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+ HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.getPartitionPath());
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+ fileInfo);
return new HoodieRecord<>(key, payload);
}
/**
* Create and return a {@code HoodieMetadataPayload} to save list of files
within a partition.
*
- * @param partition The name of the partition
- * @param filesAdded Mapping of files to their sizes for files which have
been added to this partition
+ * @param partition The name of the partition
+ * @param filesAdded Mapping of files to their sizes for files which have
been added to this partition
* @param filesDeleted List of files which have been deleted from this
partition
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionFilesRecord(String partition,
-
Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+
Option<Map<String, Long>> filesAdded,
+
Option<List<String>> filesDeleted) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
filesAdded.ifPresent(
m -> m.forEach((filename, size) -> fileInfo.put(filename, new
HoodieMetadataFileInfo(size, false))));
filesDeleted.ifPresent(
- m -> m.forEach(filename -> fileInfo.put(filename, new
HoodieMetadataFileInfo(0L, true))));
+ m -> m.forEach(filename -> fileInfo.put(filename, new
HoodieMetadataFileInfo(0L, true))));
- HoodieKey key = new HoodieKey(partition,
MetadataPartitionType.FILES.partitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+ HoodieKey key = new HoodieKey(partition,
MetadataPartitionType.FILES.getPartitionPath());
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
return new HoodieRecord<>(key, payload);
}
+ /**
+ * Create bloom filter metadata record.
+ *
+ * @param partitionName - Partition name
+ * @param baseFileName - Base file name for which the bloom filter needs to
persisted
+ * @param timestamp - Instant timestamp responsible for this record
+ * @param bloomFilter - Bloom filter for the File
+ * @param isDeleted - Is the bloom filter no more valid
+ * @return Metadata payload containing the fileID and its bloom filter record
+ */
+ public static HoodieRecord<HoodieMetadataPayload>
createBloomFilterMetadataRecord(final String partitionName,
+
final String baseFileName,
+
final String timestamp,
+
final ByteBuffer bloomFilter,
+
final boolean isDeleted) {
+ ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR)
+ && FSUtils.isBaseFile(new Path(baseFileName)),
+ "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!");
+ final String bloomFilterKey = new
PartitionIndexID(partitionName).asBase64EncodedString()
+ .concat(new FileIndexID(baseFileName).asBase64EncodedString());
+ HoodieKey key = new HoodieKey(bloomFilterKey,
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+
+ // TODO: Get the bloom filter type from the file
+ HoodieMetadataBloomFilter metadataBloomFilter =
+ new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(),
+ timestamp, bloomFilter, isDeleted);
+ HoodieMetadataPayload metadataPayload = new
HoodieMetadataPayload(key.getRecordKey(),
+ HoodieMetadataPayload.METADATA_TYPE_BLOOM_FILTER, metadataBloomFilter);
+ return new HoodieRecord<>(key, metadataPayload);
+ }
+
@Override
public HoodieMetadataPayload preCombine(HoodieMetadataPayload
previousRecord) {
ValidationUtils.checkArgument(previousRecord.type == type,
- "Cannot combine " + previousRecord.type + " with " + type);
-
- Map<String, HoodieMetadataFileInfo> combinedFileInfo = null;
+ "Cannot combine " + previousRecord.type + " with " + type);
switch (type) {
- case PARTITION_LIST:
- case FILE_LIST:
- combinedFileInfo = combineFilesystemMetadata(previousRecord);
- break;
+ case METADATA_TYPE_PARTITION_LIST:
+ case METADATA_TYPE_FILE_LIST:
+ Map<String, HoodieMetadataFileInfo> combinedFileInfo =
combineFilesystemMetadata(previousRecord);
+ return new HoodieMetadataPayload(key, type, combinedFileInfo);
+ case METADATA_TYPE_BLOOM_FILTER:
+ HoodieMetadataBloomFilter combineBloomFilterMetadata =
combineBloomFilterMetadata(previousRecord);
+ return new HoodieMetadataPayload(key, type,
combineBloomFilterMetadata);
+ case METADATA_TYPE_COLUMN_STATS:
+ return new HoodieMetadataPayload(key, type,
combineColumnStatsMetadatat(previousRecord));
default:
throw new HoodieMetadataException("Unknown type of
HoodieMetadataPayload: " + type);
}
+ }
+
+ private HoodieMetadataBloomFilter
combineBloomFilterMetadata(HoodieMetadataPayload previousRecord) {
+ return this.bloomFilterMetadata;
+ }
- return new HoodieMetadataPayload(key, type, combinedFileInfo);
+ private HoodieColumnStats combineColumnStatsMetadatat(HoodieMetadataPayload
previousRecord) {
+ return this.columnStatMetadata;
Review comment:
actually my bad. shouldn't we be combining at a file level?
for eg:
metadata record1:
file1: stats1_1
file2: stats2_1
metadata record2:
file1: stats1_2
file3: stats3_1
what will be final value after combining both these records?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,12 +156,123 @@ protected BaseTableMetadata(HoodieEngineContext
engineContext, HoodieMetadataCon
.getAllFilesInPartitions(partitions);
}
+ @Override
+ public Option<ByteBuffer> getBloomFilter(final String partitionName, final
String fileName)
+ throws HoodieMetadataException {
+ if (!isBloomFilterIndexEnabled) {
+ LOG.error("Metadata bloom filter index is disabled!");
+ return Option.empty();
+ }
+
+ final Pair<String, String> partitionFileName = Pair.of(partitionName,
fileName);
+ Map<Pair<String, String>, ByteBuffer> bloomFilters =
getBloomFilters(Collections.singletonList(partitionFileName));
+ if (bloomFilters.isEmpty()) {
+ LOG.error("Meta index: missing bloom filter for partition: " +
partitionName + ", file: " + fileName);
+ return Option.empty();
+ }
+
+ ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName));
+ return Option.of(bloomFilters.get(partitionFileName));
+ }
+
+ @Override
+ public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final
List<Pair<String, String>> partitionNameFileNameList)
+ throws HoodieMetadataException {
+ if (!isBloomFilterIndexEnabled) {
+ LOG.error("Metadata bloom filter index is disabled!");
+ return Collections.emptyMap();
+ }
+ if (partitionNameFileNameList.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+ Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+ partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
+ final String bloomKey = new
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()
Review comment:
or bloomFilterIndexKey
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]