danny0405 commented on code in PR #11146:
URL: https://github.com/apache/hudi/pull/11146#discussion_r1609596497
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -540,6 +547,51 @@ private HoodieFunctionalIndexDefinition
getFunctionalIndexDefinition(String inde
}
}
+ private Set<String> getSecondaryIndexPartitionsToInit() {
+ Set<String> secondaryIndexPartitions =
dataMetaClient.getFunctionalIndexMetadata().get().getIndexDefinitions().values().stream()
+ .map(HoodieFunctionalIndexDefinition::getIndexName)
+ .filter(indexName ->
indexName.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
Review Comment:
So secondary index belongs to functional index?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -410,6 +413,14 @@ private boolean initializeFromFilesystem(String
initializationTime, List<Metadat
}
fileGroupCountAndRecordsPair =
initializePartitionStatsIndex(partitionInfoList);
break;
+ case SECONDARY_INDEX:
+ Set<String> secondaryIndexPartitionsToInit =
getSecondaryIndexPartitionsToInit();
+ if (secondaryIndexPartitionsToInit.isEmpty()) {
+ continue;
+ }
+ ValidationUtils.checkState(secondaryIndexPartitionsToInit.size()
== 1, "Only one secondary index at a time is supported for now");
Review Comment:
not sure why we have the number limit.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFunctionalIndexDefinition.java:
##########
@@ -52,7 +55,7 @@ public HoodieFunctionalIndexDefinition(String indexName,
String indexType, Strin
Map<String, String> indexOptions) {
this.indexName = indexName;
this.indexType = indexType;
- this.indexFunction = indexFunction;
+ this.indexFunction = nonEmpty(indexFunction) ? indexFunction :
SPARK_IDENTITY;
Review Comment:
Can you elaborate the semantics for empty index function?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -783,4 +784,101 @@ public int
getNumFileGroupsForPartition(MetadataPartitionType partition) {
metadataFileSystemView, partition.getPartitionPath()));
return partitionFileSliceMap.get(partition.getPartitionPath()).size();
}
+
+ @Override
+ protected Map<String, String> getSecondaryKeysForRecordKeys(List<String>
recordKeys, String partitionName) {
+ if (recordKeys.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ // Load the file slices for the partition. Each file slice is a shard
which saves a portion of the keys.
+ List<FileSlice> partitionFileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
+ k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
metadataFileSystemView, partitionName));
+ final int numFileSlices = partitionFileSlices.size();
+ ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for
partition " + partitionName + " should be > 0");
Review Comment:
Can we just return empty instead of throwing.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java:
##########
@@ -111,6 +111,14 @@ static boolean isMetadataTable(String basePath) {
return basePath.endsWith(HoodieTableMetaClient.METADATA_TABLE_FOLDER_PATH);
}
+ static boolean isMetadataTableSecondaryIndexPartition(String basePath,
Option<String> partitionName) {
+ if (!isMetadataTable(basePath) || !partitionName.isPresent()) {
+ return false;
+ }
+
+ return partitionName.get().startsWith("secondary_index_");
Review Comment:
It's greate if we can avoid the hard code string.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -977,16 +1014,17 @@ public void updateFromWriteStatuses(HoodieCommitMetadata
commitMetadata, HoodieD
Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordMap =
HoodieTableMetadataUtil.convertMetadataToRecords(
engineContext, dataWriteConfig, commitMetadata, instantTime,
dataMetaClient,
- enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
- dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
- dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
dataWriteConfig.getMetadataConfig());
+ enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
+ dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
+ dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
dataWriteConfig.getMetadataConfig());
// Updates for record index are created by parsing the WriteStatus which
is a hudi-client object. Hence, we cannot yet move this code
// to the HoodieTableMetadataUtil class in hudi-common.
HoodieData<HoodieRecord> updatesFromWriteStatuses =
getRecordIndexUpserts(writeStatus);
HoodieData<HoodieRecord> additionalUpdates =
getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata);
partitionToRecordMap.put(RECORD_INDEX,
updatesFromWriteStatuses.union(additionalUpdates));
updateFunctionalIndexIfPresent(commitMetadata, instantTime,
partitionToRecordMap);
+ updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap,
writeStatus);
Review Comment:
Hmm, I see it in the bottom.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -948,9 +985,9 @@ public void buildMetadataPartitions(HoodieEngineContext
engineContext, List<Hood
relativePartitionPath, metadataWriteConfig.getBasePath(),
indexUptoInstantTime);
// return early and populate enabledPartitionTypes correctly (check in
initialCommit)
- MetadataPartitionType partitionType =
relativePartitionPath.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX)
- ? FUNCTIONAL_INDEX
- :
MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT));
+ MetadataPartitionType partitionType =
relativePartitionPath.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX)
? FUNCTIONAL_INDEX :
+
relativePartitionPath.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX)
? SECONDARY_INDEX :
Review Comment:
ditto, holding some variables in partition type may simplify the code.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1054,6 +1092,75 @@ private HoodieData<HoodieRecord>
getFunctionalIndexUpdates(HoodieCommitMetadata
return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition,
dataMetaClient, parallelism, readerSchema, storageConf);
}
+ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+ dataMetaClient.getTableConfig().getMetadataPartitions()
Review Comment:
Maybe we can share code with RLI.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1054,6 +1092,75 @@ private HoodieData<HoodieRecord>
getFunctionalIndexUpdates(HoodieCommitMetadata
return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition,
dataMetaClient, parallelism, readerSchema, storageConf);
}
+ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+ dataMetaClient.getTableConfig().getMetadataPartitions()
+ .stream()
+ .filter(partition ->
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+ .forEach(partition -> {
+ HoodieData<HoodieRecord> secondaryIndexRecords;
+ try {
+ secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata,
partition, writeStatus);
+ } catch (Exception e) {
+ throw new HoodieMetadataException("Failed to get secondary index
updates for partition " + partition, e);
+ }
+ partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
+ });
+ }
+
+ private HoodieData<HoodieRecord>
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String
indexPartition, HoodieData<WriteStatus> writeStatus) throws Exception {
+ // Build a list of basefiles+delta-log-files for every partition that this
commit touches
+ // {
+ // {
+ // "partition1", { {"baseFile11", {"logFile11", "logFile12"}},
{"baseFile12", {"logFile11"} } },
+ // },
+ // {
+ // "partition2", { {"baseFile21", {"logFile21", "logFile22"}},
{"baseFile22", {"logFile21"} } }
+ // }
+ // }
+ List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = new
ArrayList<>();
+ commitMetadata.getPartitionToWriteStats().forEach((dataPartition,
writeStats) -> {
+ writeStats.forEach(writeStat -> {
+ if (writeStat instanceof HoodieDeltaWriteStat) {
+ partitionFilePairs.add(Pair.of(dataPartition,
Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(),
((HoodieDeltaWriteStat) writeStat).getLogFiles())));
+ } else {
+ partitionFilePairs.add(Pair.of(dataPartition,
Pair.of(writeStat.getPath(), new ArrayList<>())));
Review Comment:
`Collections.emptyList()` is always better.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -437,9 +448,7 @@ private boolean initializeFromFilesystem(String
initializationTime, List<Metadat
HoodieData<HoodieRecord> records =
fileGroupCountAndRecordsPair.getValue();
bulkCommit(commitTimeForPartition, partitionType, records,
fileGroupCount);
metadataMetaClient.reloadActiveTimeline();
- String partitionPath = partitionType == FUNCTIONAL_INDEX
- ? dataWriteConfig.getFunctionalIndexConfig().getIndexName()
- : partitionType.getPartitionPath();
+ String partitionPath = (partitionType == FUNCTIONAL_INDEX ||
partitionType == SECONDARY_INDEX) ?
dataWriteConfig.getFunctionalIndexConfig().getIndexName() :
partitionType.getPartitionPath();
Review Comment:
Maybe we can extend the partition type to hold some variable flags inside.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -977,16 +1014,17 @@ public void updateFromWriteStatuses(HoodieCommitMetadata
commitMetadata, HoodieD
Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordMap =
HoodieTableMetadataUtil.convertMetadataToRecords(
engineContext, dataWriteConfig, commitMetadata, instantTime,
dataMetaClient,
- enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
- dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
- dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
dataWriteConfig.getMetadataConfig());
+ enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
+ dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
+ dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
dataWriteConfig.getMetadataConfig());
// Updates for record index are created by parsing the WriteStatus which
is a hudi-client object. Hence, we cannot yet move this code
// to the HoodieTableMetadataUtil class in hudi-common.
HoodieData<HoodieRecord> updatesFromWriteStatuses =
getRecordIndexUpserts(writeStatus);
HoodieData<HoodieRecord> additionalUpdates =
getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata);
partitionToRecordMap.put(RECORD_INDEX,
updatesFromWriteStatuses.union(additionalUpdates));
updateFunctionalIndexIfPresent(commitMetadata, instantTime,
partitionToRecordMap);
+ updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap,
writeStatus);
Review Comment:
Do we have the similiar retraction issue like RLI, should we union the
retracted records too.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -540,6 +547,51 @@ private HoodieFunctionalIndexDefinition
getFunctionalIndexDefinition(String inde
}
}
+ private Set<String> getSecondaryIndexPartitionsToInit() {
+ Set<String> secondaryIndexPartitions =
dataMetaClient.getFunctionalIndexMetadata().get().getIndexDefinitions().values().stream()
+ .map(HoodieFunctionalIndexDefinition::getIndexName)
+ .filter(indexName ->
indexName.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+ .collect(Collectors.toSet());
+ Set<String> completedMetadataPartitions =
dataMetaClient.getTableConfig().getMetadataPartitions();
+ secondaryIndexPartitions.removeAll(completedMetadataPartitions);
+ return secondaryIndexPartitions;
+ }
+
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeSecondaryIndexPartition(String indexName) throws IOException {
+ HoodieFunctionalIndexDefinition indexDefinition =
getFunctionalIndexDefinition(indexName);
+ ValidationUtils.checkState(indexDefinition != null, "Secondary Index
definition is not present for index " + indexName);
+ List<Pair<String, FileSlice>> partitionFileSlicePairs =
getPartitionFileSlicePairs();
+
+ // Reuse record index parallelism config to build secondary index
+ int parallelism = Math.min(partitionFileSlicePairs.size(),
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+ HoodieData<HoodieRecord> records = readSecondaryKeysFromFileSlices(
+ engineContext,
+ partitionFileSlicePairs,
+ parallelism,
+ this.getClass().getSimpleName(),
+ dataMetaClient,
+ EngineType.SPARK,
Review Comment:
don't think we should declare the engine type in abstract class.
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -351,6 +351,25 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Parallelism to use, when generating partition stats
index.");
+ public static final ConfigProperty<Boolean> SECONDARY_INDEX_ENABLE_PROP =
ConfigProperty
+ .key(METADATA_PREFIX + ".index.secondary.enable")
+ .defaultValue(false)
+ .sinceVersion("1.0.0")
+ .withDocumentation("Enable secondary index within the Metadata Table.");
+
+ public static final ConfigProperty<String> SECONDARY_INDEX_COLUMN =
ConfigProperty
+ .key(METADATA_PREFIX + ".index.secondary.column")
+ .noDefaultValue()
+ .sinceVersion("1.0.0")
+ .withDocumentation("Enable secondary index within the Metadata Table.");
Review Comment:
Fix the doc.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1054,6 +1092,75 @@ private HoodieData<HoodieRecord>
getFunctionalIndexUpdates(HoodieCommitMetadata
return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition,
dataMetaClient, parallelism, readerSchema, storageConf);
}
+ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+ dataMetaClient.getTableConfig().getMetadataPartitions()
+ .stream()
+ .filter(partition ->
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+ .forEach(partition -> {
+ HoodieData<HoodieRecord> secondaryIndexRecords;
+ try {
+ secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata,
partition, writeStatus);
+ } catch (Exception e) {
+ throw new HoodieMetadataException("Failed to get secondary index
updates for partition " + partition, e);
+ }
+ partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
+ });
+ }
+
+ private HoodieData<HoodieRecord>
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String
indexPartition, HoodieData<WriteStatus> writeStatus) throws Exception {
+ // Build a list of basefiles+delta-log-files for every partition that this
commit touches
+ // {
+ // {
+ // "partition1", { {"baseFile11", {"logFile11", "logFile12"}},
{"baseFile12", {"logFile11"} } },
+ // },
+ // {
+ // "partition2", { {"baseFile21", {"logFile21", "logFile22"}},
{"baseFile22", {"logFile21"} } }
+ // }
+ // }
+ List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = new
ArrayList<>();
+ commitMetadata.getPartitionToWriteStats().forEach((dataPartition,
writeStats) -> {
+ writeStats.forEach(writeStat -> {
+ if (writeStat instanceof HoodieDeltaWriteStat) {
+ partitionFilePairs.add(Pair.of(dataPartition,
Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(),
((HoodieDeltaWriteStat) writeStat).getLogFiles())));
+ } else {
+ partitionFilePairs.add(Pair.of(dataPartition,
Pair.of(writeStat.getPath(), new ArrayList<>())));
+ }
+ });
+ });
+
+ // Build a list of keys that need to be removed. A 'delete' record will be
emitted into the respective FileGroup of
+ // the secondary index partition for each of these keys. For a commit
which is deleting/updating a lot of records, this
+ // operation is going to be expensive (in CPU, memory and IO)
+ List<String> keysToRemove = new ArrayList<>();
+ writeStatus.collectAsList().forEach(status -> {
+ status.getWrittenRecordDelegates().forEach(recordDelegate -> {
+ // Consider those keys which were either updated or deleted in this
commit
+ if (!recordDelegate.getNewLocation().isPresent() ||
(recordDelegate.getCurrentLocation().isPresent() &&
recordDelegate.getNewLocation().isPresent())) {
+ keysToRemove.add(recordDelegate.getRecordKey());
+ }
+ });
+ });
+ HoodieFunctionalIndexDefinition indexDefinition =
getFunctionalIndexDefinition(indexPartition);
+
+ // Fetch the secondary keys that each of the record keys ('keysToRemove')
maps to
+ // This is obtained by scanning the entire secondary index partition in
the metadata table
+ // This could be an expensive operation for a large commit
(updating/deleting millions of rows)
+ Map<String, String> recordKeySecondaryKeyMap =
metadata.getSecondaryKeys(keysToRemove);
+ HoodieData<HoodieRecord> deletedRecords =
getDeletedSecondaryRecordMapping(engineContext, recordKeySecondaryKeyMap,
indexDefinition);
+
+ // Reuse record index parallelism config to build secondary index
+ int parallelism = Math.min(partitionFilePairs.size(),
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
Review Comment:
We better introduce a new config for the secondary index, it looks like the
secondary index is 1 exponent larger than the RLI index.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -295,6 +311,11 @@ protected HoodieMetadataPayload(String key, int type,
this.recordIndexMetadata = recordIndexMetadata;
}
+ public HoodieMetadataPayload(String key, HoodieSecondaryIndexInfo
secondaryIndexMetadata) {
Review Comment:
should it be public scope or private, move it to line 301 seems better.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -540,6 +547,51 @@ private HoodieFunctionalIndexDefinition
getFunctionalIndexDefinition(String inde
}
}
+ private Set<String> getSecondaryIndexPartitionsToInit() {
+ Set<String> secondaryIndexPartitions =
dataMetaClient.getFunctionalIndexMetadata().get().getIndexDefinitions().values().stream()
+ .map(HoodieFunctionalIndexDefinition::getIndexName)
+ .filter(indexName ->
indexName.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
Review Comment:
Looks like we have some code that can share with functional index.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -783,4 +784,101 @@ public int
getNumFileGroupsForPartition(MetadataPartitionType partition) {
metadataFileSystemView, partition.getPartitionPath()));
return partitionFileSliceMap.get(partition.getPartitionPath()).size();
}
+
+ @Override
+ protected Map<String, String> getSecondaryKeysForRecordKeys(List<String>
recordKeys, String partitionName) {
+ if (recordKeys.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ // Load the file slices for the partition. Each file slice is a shard
which saves a portion of the keys.
+ List<FileSlice> partitionFileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
+ k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
metadataFileSystemView, partitionName));
+ final int numFileSlices = partitionFileSlices.size();
+ ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for
partition " + partitionName + " should be > 0");
+
+ // Lookup keys from each file slice
+ // TODO: parallelize this loop
+ Map<String, String> reverseSecondaryKeyMap = new HashMap<>();
+ for (FileSlice partition : partitionFileSlices) {
+ reverseLookupSecondaryKeys(partitionName, recordKeys, partition,
reverseSecondaryKeyMap);
+ }
+
+ return reverseSecondaryKeyMap;
+ }
+
+ private void reverseLookupSecondaryKeys(String partitionName, List<String>
recordKeys, FileSlice fileSlice, Map<String, String> recordKeyMap) {
+ Set<String> keySet = new HashSet<>(recordKeys.size());
+ Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new
HashMap<>();
Review Comment:
Looks like we have many duplicate codes with RLI.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -783,4 +784,101 @@ public int
getNumFileGroupsForPartition(MetadataPartitionType partition) {
metadataFileSystemView, partition.getPartitionPath()));
return partitionFileSliceMap.get(partition.getPartitionPath()).size();
}
+
+ @Override
+ protected Map<String, String> getSecondaryKeysForRecordKeys(List<String>
recordKeys, String partitionName) {
+ if (recordKeys.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ // Load the file slices for the partition. Each file slice is a shard
which saves a portion of the keys.
+ List<FileSlice> partitionFileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
+ k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
metadataFileSystemView, partitionName));
+ final int numFileSlices = partitionFileSlices.size();
+ ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for
partition " + partitionName + " should be > 0");
+
+ // Lookup keys from each file slice
+ // TODO: parallelize this loop
+ Map<String, String> reverseSecondaryKeyMap = new HashMap<>();
+ for (FileSlice partition : partitionFileSlices) {
+ reverseLookupSecondaryKeys(partitionName, recordKeys, partition,
reverseSecondaryKeyMap);
+ }
+
+ return reverseSecondaryKeyMap;
+ }
+
+ private void reverseLookupSecondaryKeys(String partitionName, List<String>
recordKeys, FileSlice fileSlice, Map<String, String> recordKeyMap) {
+ Set<String> keySet = new HashSet<>(recordKeys.size());
+ Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new
HashMap<>();
+ Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
getOrCreateReaders(partitionName, fileSlice);
+ try {
+ HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
+ HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
+ if (baseFileReader == null && logRecordScanner == null) {
+ return;
+ }
+
+ // Sort it here once so that we don't need to sort individually for base
file and for each individual log files.
+ List<String> sortedKeys = new ArrayList<>(recordKeys);
+ Collections.sort(sortedKeys);
+ keySet.addAll(sortedKeys);
+
+ logRecordScanner.getRecords().forEach(record -> {
+ HoodieMetadataPayload payload = record.getData();
+ String recordKey = payload.getRecordKeyFromSecondaryIndex();
+ if (keySet.contains(recordKey)) {
+ logRecordsMap.put(recordKey, record);
+ }
+ });
+
+ // Map of (record-key, secondary-index-record)
+ Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords =
fetchBaseFileAllRecordsByPayload(baseFileReader, keySet, partitionName);
+
+ // Iterate over all provided log-records, merging them into existing
records
+ logRecordsMap.forEach((key1, value1) -> baseFileRecords.merge(
+ key1,
+ value1,
+ (oldRecord, newRecord) -> {
+ Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord =
+ HoodieMetadataPayload.combineSecondaryIndexRecord(oldRecord,
newRecord);
+ return mergedRecord.orElseGet(null);
+ }
+ ));
+
+ baseFileRecords.forEach((key, value) -> {
+ recordKeyMap.put(key, value.getRecordKey());
+ });
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Error merging records from metadata table
for " + recordKeys.size() + " key : ", ioe);
+ } finally {
+ if (!reuse) {
+ closeReader(readers);
+ }
+ }
+ }
+
+ private Map<String, HoodieRecord<HoodieMetadataPayload>>
fetchBaseFileAllRecordsByPayload(HoodieSeekingFileReader reader,
+
Set<String> keySet,
+
String partitionName) throws IOException {
+ if (reader == null) {
+ // No base file at all
+ return new HashMap<>();
Review Comment:
`Collections.emptyMap` is always better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]