codope commented on a change in pull request #4693:
URL: https://github.com/apache/hudi/pull/4693#discussion_r835763700
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -663,20 +711,82 @@ private MetadataRecordsGenerationParams
getRecordsGenerationParams() {
/**
* Processes commit metadata from data table and commits to metadata table.
+ *
* @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the
respective metadata to List of HoodieRecords to be written to metadata table.
* @param <T> type of commit metadata.
* @param canTriggerTableService true if table services can be triggered.
false otherwise.
*/
private <T> void processAndCommit(String instantTime,
ConvertMetadataFunction convertMetadataFunction, boolean
canTriggerTableService) {
- if (enabled && metadata != null) {
- Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap
= convertMetadataFunction.convertMetadata();
- commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ if (!dataWriteConfig.isMetadataTableEnabled()) {
+ return;
+ }
+ Set<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+ partitionsToUpdate.forEach(p -> {
+ if (enabled && metadata != null) {
+ Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionRecordsMap = convertMetadataFunction.convertMetadata();
+ commit(instantTime, partitionRecordsMap, canTriggerTableService);
+ }
+ });
+ }
+
+ private Set<String> getMetadataPartitionsToUpdate() {
+ // fetch partitions to update from table config
+ Set<String> partitionsToUpdate =
Stream.of(dataMetaClient.getTableConfig().getCompletedMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet());
+
partitionsToUpdate.addAll(Stream.of(dataMetaClient.getTableConfig().getInflightMetadataIndexes().split(","))
+ .map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toSet()));
+ if (!partitionsToUpdate.isEmpty()) {
+ return partitionsToUpdate;
}
+ // fallback to update files partition only if table config returned no
partitions
+ partitionsToUpdate.add(MetadataPartitionType.FILES.getPartitionPath());
+ return partitionsToUpdate;
+ }
+
+ @Override
+ public void index(HoodieEngineContext engineContext,
List<HoodieIndexPartitionInfo> indexPartitionInfos) {
+ if (indexPartitionInfos.isEmpty()) {
+ LOG.warn("No partition to index in the plan");
+ return;
+ }
+ String indexUptoInstantTime =
indexPartitionInfos.get(0).getIndexUptoInstant();
+ indexPartitionInfos.forEach(indexPartitionInfo -> {
+ String relativePartitionPath =
indexPartitionInfo.getMetadataPartitionPath();
+ LOG.info(String.format("Creating a new metadata index for partition '%s'
under path %s upto instant %s",
+ relativePartitionPath, metadataWriteConfig.getBasePath(),
indexUptoInstantTime));
+ try {
+ // filegroup should have already been initialized while scheduling
index for this partition
+ if (!dataMetaClient.getFs().exists(new
Path(metadataWriteConfig.getBasePath(), relativePartitionPath))) {
+ throw new HoodieIndexException(String.format("File group not
initialized for metadata partition: %s, indexUptoInstant: %s. Looks like index
scheduling failed!",
+ relativePartitionPath, indexUptoInstantTime));
+ }
+ } catch (IOException e) {
+ throw new HoodieIndexException(String.format("Unable to check whether
file group is initialized for metadata partition: %s, indexUptoInstant: %s",
+ relativePartitionPath, indexUptoInstantTime));
+ }
+
+ // return early and populate enabledPartitionTypes correctly (check in
initialCommit)
+ MetadataPartitionType partitionType =
MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT));
+ if (!enabledPartitionTypes.contains(partitionType)) {
+ throw new HoodieIndexException(String.format("Indexing for metadata
partition: %s is not enabled", partitionType));
+ }
+ });
+ // before initial commit update table config
+
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_INDEX_INFLIGHT.key(),
indexPartitionInfos.stream()
Review comment:
hmm.. good point, we should append.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]