This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch partition-bucket-index in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 39f1481fb78a1adce63a3c271af35c6b55b5e207 Author: Danny Chan <[email protected]> AuthorDate: Tue Mar 18 05:38:25 2025 +0800 [HUDI-9175] Remove the unnecessary MDT metadata check for col_stats index config update (#12977) --- .../apache/hudi/client/BaseHoodieWriteClient.java | 2 +- .../hudi/client/HoodieColumnStatsIndexUtils.java | 45 +++++++++------------- .../action/commit/BaseCommitActionExecutor.java | 2 +- .../commit/BaseFlinkCommitActionExecutor.java | 3 +- .../org/apache/hudi/common/model/ActionType.java | 15 +++++++- .../hudi/metadata/HoodieTableMetadataUtil.java | 21 +++++----- .../org/apache/hudi/util/FlinkWriteClients.java | 1 + 7 files changed, 47 insertions(+), 42 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index bcd0b587ee9..efd94e90540 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -284,7 +284,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient writeTableMetadata(table, instantTime, metadata); activeTimeline.saveAsComplete(false, table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, commitActionType, instantTime), Option.of(metadata)); // update cols to Index as applicable - HoodieColumnStatsIndexUtils.updateColsToIndex(table, config, metadata, + HoodieColumnStatsIndexUtils.updateColsToIndex(table, config, metadata, commitActionType, (Functions.Function2<HoodieTableMetaClient, List<String>, Void>) (metaClient, columnsToIndex) -> { updateColumnsToIndexWithColStats(metaClient, columnsToIndex); return null; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java index d24e70d65f2..e8c61ed87c4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java @@ -19,17 +19,15 @@ package org.apache.hudi.client; +import org.apache.hudi.common.model.ActionType; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataUtil; -import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.table.HoodieTable; import java.util.ArrayList; @@ -47,32 +45,27 @@ public class HoodieColumnStatsIndexUtils { * @param dataTable {@link HoodieTable} of interest. * @param config {@link HoodieWriteConfig} of interest. * @param commitMetadata commit metadata of interest. - * @param updateColSatsFunc function to assist with updating columns to index. + * @param commitActionType commit action type to include interested actions. + * @param updateColStatsFunc function to assist with updating columns to index. */ @VisibleForTesting - public static void updateColsToIndex(HoodieTable dataTable, HoodieWriteConfig config, HoodieCommitMetadata commitMetadata, - Functions.Function2<HoodieTableMetaClient, List<String>, Void> updateColSatsFunc) { - if (config.getMetadataConfig().isColumnStatsIndexEnabled()) { + public static void updateColsToIndex(HoodieTable dataTable, + HoodieWriteConfig config, + HoodieCommitMetadata commitMetadata, + String commitActionType, + Functions.Function2<HoodieTableMetaClient, List<String>, Void> updateColStatsFunc) { + if (config.isMetadataTableEnabled() // this is a data table + && config.getMetadataConfig().isColumnStatsIndexEnabled() // the col_stats is enabled + && ActionType.isCommitActionType(commitActionType)) { // with interested actions dataTable.getMetaClient().reloadTableConfig(); - if (dataTable.getMetaClient().getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath())) { - try { - HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder() - .setStorage(dataTable.getStorage()) - .setBasePath(HoodieTableMetadata.getMetadataTableBasePath(dataTable.getMetaClient().getBasePath())) - .build(); - HoodieInstant latestInstant = mdtMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant().get(); - final HoodieCommitMetadata mdtCommitMetadata = - mdtMetaClient.getActiveTimeline().readCommitMetadata(latestInstant); - if (mdtCommitMetadata.getPartitionToWriteStats().containsKey(MetadataPartitionType.COLUMN_STATS.getPartitionPath())) { - // update data table's table config for list of columns indexed. - List<String> columnsToIndex = new ArrayList(HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata, dataTable.getMetaClient(), config.getMetadataConfig(), - Option.of(config.getRecordMerger().getRecordType())).keySet()); - // if col stats is getting updated, lets also update list of columns indexed if changed. - updateColSatsFunc.apply(dataTable.getMetaClient(), columnsToIndex); - } - } catch (Exception e) { - throw new HoodieException("Updating data table config to latest set of columns indexed with col stats failed ", e); - } + try { + // update data table's table config for list of columns indexed. + List<String> columnsToIndex = new ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata, dataTable.getMetaClient(), config.getMetadataConfig(), + Option.of(config.getRecordMerger().getRecordType())).keySet()); + // if col stats is getting updated, lets also update list of columns indexed if changed. + updateColStatsFunc.apply(dataTable.getMetaClient(), columnsToIndex); + } catch (Exception e) { + throw new HoodieException("Updating data table config to latest set of columns indexed with col stats failed ", e); } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index aa78ba95c35..9709654edb0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -235,7 +235,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, R> LOG.info("Committed " + instantTime); result.setCommitMetadata(Option.of(metadata)); // update cols to Index as applicable - HoodieColumnStatsIndexUtils.updateColsToIndex(table, config, metadata, + HoodieColumnStatsIndexUtils.updateColsToIndex(table, config, metadata, actionType, (Functions.Function2<HoodieTableMetaClient, List<String>, Void>) (metaClient, columnsToIndex) -> { updateColumnsToIndexForColumnStats(metaClient, columnsToIndex); return null; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index e285657dead..97bc998e865 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -28,7 +28,6 @@ import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.FlinkLazyInsertIterable; import org.apache.hudi.io.ExplicitWriteHandleFactory; @@ -208,6 +207,6 @@ public abstract class BaseFlinkCommitActionExecutor<T> extends @Override protected void updateColumnsToIndexForColumnStats(HoodieTableMetaClient metaClient, List<String> columnsToIndex) { - throw new HoodieNotSupportedException("Col stats with flink is not yet supported"); + // no-op } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/ActionType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/ActionType.java index ea0ab7f3333..11b5df5b4fb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/ActionType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/ActionType.java @@ -22,5 +22,18 @@ package org.apache.hudi.common.model; * The supported action types. */ public enum ActionType { - commit, savepoint, compaction, clean, rollback, replacecommit, deltacommit, logcompaction, clustering + commit, + savepoint, + compaction, + clean, + rollback, + replacecommit, + deltacommit, + logcompaction, + clustering; + + public static boolean isCommitActionType(String actionTypeStr) { + ActionType actionType = ActionType.valueOf(actionTypeStr.toLowerCase()); + return actionType == commit || actionType == replacecommit || actionType == deltacommit; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index c7f33550262..ead5f77518f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -1507,10 +1507,11 @@ public class HoodieTableMetadataUtil { HoodieTableConfig tableConfig = dataMetaClient.getTableConfig(); // NOTE: Writer schema added to commit metadata will not contain Hudi's metadata fields - Option<Schema> tableSchema = writerSchema.map(schema -> - tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema); + Option<Schema> tableSchema = writerSchema.isEmpty() + ? tableConfig.getTableCreateSchema() // the write schema does not set up correctly + : writerSchema.map(schema -> tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema); - return getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig, + return getColumnsToIndex(tableConfig, metadataConfig, Lazy.eagerly(tableSchema), false, recordTypeOpt); } @@ -1576,10 +1577,10 @@ public class HoodieTableMetadataUtil { Option<HoodieRecordType> recordType) { List<String> columnsToIndex = metadataConfig.getColumnsEnabledForColumnStatsIndex(); if (!columnsToIndex.isEmpty()) { - // if explicitly overriden + // if explicitly overridden if (isTableInitializing) { Map<String, Schema> toReturn = new LinkedHashMap<>(); - columnsToIndex.stream().forEach(colName -> toReturn.put(colName, null)); + columnsToIndex.forEach(colName -> toReturn.put(colName, null)); return toReturn; } ValidationUtils.checkArgument(tableSchemaLazyOpt.get().isPresent(), "Table schema not found for the table while computing col stats"); @@ -1588,9 +1589,8 @@ public class HoodieTableMetadataUtil { Map<String, Schema> colsToIndexSchemaMap = new LinkedHashMap<>(); columnsToIndex.stream().filter(fieldName -> !META_COL_SET_TO_INDEX.contains(fieldName)) .map(colName -> Pair.of(colName, HoodieAvroUtils.getSchemaForField(tableSchema.get(), colName).getRight().schema())) - .filter(fieldNameSchemaPair -> { - return isColumnTypeSupported(fieldNameSchemaPair.getValue(), recordType); - }).forEach(entry -> colsToIndexSchemaMap.put(entry.getKey(), entry.getValue())); + .filter(fieldNameSchemaPair -> isColumnTypeSupported(fieldNameSchemaPair.getValue(), recordType)) + .forEach(entry -> colsToIndexSchemaMap.put(entry.getKey(), entry.getValue())); return colsToIndexSchemaMap; } // if not overridden @@ -1599,10 +1599,9 @@ public class HoodieTableMetadataUtil { tableSchemaLazyOpt.get().map(schema -> getFirstNSupportedFields(schema, metadataConfig.maxColumnsToIndexForColStats(), recordType)).orElse(Stream.empty()) .forEach(entry -> colsToIndexSchemaMap.put(entry.getKey(), entry.getValue())); return colsToIndexSchemaMap; - } else if (isTableInitializing) { - return Collections.emptyMap(); } else { - throw new HoodieMetadataException("Cannot initialize col stats with empty list of cols"); + // initialize col stats index config with empty list of cols + return Collections.emptyMap(); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java index 1984a7c7bae..1e9f7f89fdf 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java @@ -215,6 +215,7 @@ public class FlinkWriteClients { .parquetMaxFileSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE) * 1024 * 1024L) .build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withEngineType(EngineType.FLINK) // this affects the default value inference .enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED)) .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS)) .build())
