This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch partition-bucket-index
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 39f1481fb78a1adce63a3c271af35c6b55b5e207
Author: Danny Chan <[email protected]>
AuthorDate: Tue Mar 18 05:38:25 2025 +0800

    [HUDI-9175] Remove the unnecessary MDT metadata check for col_stats index 
config update (#12977)
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  2 +-
 .../hudi/client/HoodieColumnStatsIndexUtils.java   | 45 +++++++++-------------
 .../action/commit/BaseCommitActionExecutor.java    |  2 +-
 .../commit/BaseFlinkCommitActionExecutor.java      |  3 +-
 .../org/apache/hudi/common/model/ActionType.java   | 15 +++++++-
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 21 +++++-----
 .../org/apache/hudi/util/FlinkWriteClients.java    |  1 +
 7 files changed, 47 insertions(+), 42 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index bcd0b587ee9..efd94e90540 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -284,7 +284,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     writeTableMetadata(table, instantTime, metadata);
     activeTimeline.saveAsComplete(false, 
table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, 
commitActionType, instantTime), Option.of(metadata));
     // update cols to Index as applicable
-    HoodieColumnStatsIndexUtils.updateColsToIndex(table, config, metadata,
+    HoodieColumnStatsIndexUtils.updateColsToIndex(table, config, metadata, 
commitActionType,
         (Functions.Function2<HoodieTableMetaClient, List<String>, Void>) 
(metaClient, columnsToIndex) -> {
           updateColumnsToIndexWithColStats(metaClient, columnsToIndex);
           return null;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java
index d24e70d65f2..e8c61ed87c4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieColumnStatsIndexUtils.java
@@ -19,17 +19,15 @@
 
 package org.apache.hudi.client;
 
+import org.apache.hudi.common.model.ActionType;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
-import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.table.HoodieTable;
 
 import java.util.ArrayList;
@@ -47,32 +45,27 @@ public class HoodieColumnStatsIndexUtils {
    * @param dataTable {@link HoodieTable} of interest.
    * @param config {@link HoodieWriteConfig} of interest.
    * @param commitMetadata commit metadata of interest.
-   * @param updateColSatsFunc function to assist with updating columns to 
index.
+   * @param commitActionType commit action type to include interested actions.
+   * @param updateColStatsFunc function to assist with updating columns to 
index.
    */
   @VisibleForTesting
-  public static void updateColsToIndex(HoodieTable dataTable, 
HoodieWriteConfig config, HoodieCommitMetadata commitMetadata,
-                                Functions.Function2<HoodieTableMetaClient, 
List<String>, Void> updateColSatsFunc) {
-    if (config.getMetadataConfig().isColumnStatsIndexEnabled()) {
+  public static void updateColsToIndex(HoodieTable dataTable,
+                                       HoodieWriteConfig config,
+                                       HoodieCommitMetadata commitMetadata,
+                                       String commitActionType,
+                                       
Functions.Function2<HoodieTableMetaClient, List<String>, Void> 
updateColStatsFunc) {
+    if (config.isMetadataTableEnabled()                            // this is 
a data table
+        && config.getMetadataConfig().isColumnStatsIndexEnabled()  // the 
col_stats is enabled
+        && ActionType.isCommitActionType(commitActionType)) {      // with 
interested actions
       dataTable.getMetaClient().reloadTableConfig();
-      if 
(dataTable.getMetaClient().getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))
 {
-        try {
-          HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
-              .setStorage(dataTable.getStorage())
-              
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(dataTable.getMetaClient().getBasePath()))
-              .build();
-          HoodieInstant latestInstant = 
mdtMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant().get();
-          final HoodieCommitMetadata mdtCommitMetadata =
-              
mdtMetaClient.getActiveTimeline().readCommitMetadata(latestInstant);
-          if 
(mdtCommitMetadata.getPartitionToWriteStats().containsKey(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))
 {
-            // update data table's table config for list of columns indexed.
-            List<String> columnsToIndex = new 
ArrayList(HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata, 
dataTable.getMetaClient(), config.getMetadataConfig(),
-                Option.of(config.getRecordMerger().getRecordType())).keySet());
-            // if col stats is getting updated, lets also update list of 
columns indexed if changed.
-            updateColSatsFunc.apply(dataTable.getMetaClient(), columnsToIndex);
-          }
-        } catch (Exception e) {
-          throw new HoodieException("Updating data table config to latest set 
of columns indexed with col stats failed ", e);
-        }
+      try {
+        // update data table's table config for list of columns indexed.
+        List<String> columnsToIndex = new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(commitMetadata, 
dataTable.getMetaClient(), config.getMetadataConfig(),
+            Option.of(config.getRecordMerger().getRecordType())).keySet());
+        // if col stats is getting updated, lets also update list of columns 
indexed if changed.
+        updateColStatsFunc.apply(dataTable.getMetaClient(), columnsToIndex);
+      } catch (Exception e) {
+        throw new HoodieException("Updating data table config to latest set of 
columns indexed with col stats failed ", e);
       }
     }
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index aa78ba95c35..9709654edb0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -235,7 +235,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, 
R>
       LOG.info("Committed " + instantTime);
       result.setCommitMetadata(Option.of(metadata));
       // update cols to Index as applicable
-      HoodieColumnStatsIndexUtils.updateColsToIndex(table, config, metadata,
+      HoodieColumnStatsIndexUtils.updateColsToIndex(table, config, metadata, 
actionType,
           (Functions.Function2<HoodieTableMetaClient, List<String>, Void>) 
(metaClient, columnsToIndex) -> {
             updateColumnsToIndexForColumnStats(metaClient, columnsToIndex);
             return null;
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
index e285657dead..97bc998e865 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.FlinkLazyInsertIterable;
 import org.apache.hudi.io.ExplicitWriteHandleFactory;
@@ -208,6 +207,6 @@ public abstract class BaseFlinkCommitActionExecutor<T> 
extends
 
   @Override
   protected void updateColumnsToIndexForColumnStats(HoodieTableMetaClient 
metaClient, List<String> columnsToIndex) {
-    throw new HoodieNotSupportedException("Col stats with flink is not yet 
supported");
+    // no-op
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/ActionType.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/ActionType.java
index ea0ab7f3333..11b5df5b4fb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/ActionType.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/ActionType.java
@@ -22,5 +22,18 @@ package org.apache.hudi.common.model;
  * The supported action types.
  */
 public enum ActionType {
-  commit, savepoint, compaction, clean, rollback, replacecommit, deltacommit, 
logcompaction, clustering
+  commit,
+  savepoint,
+  compaction,
+  clean,
+  rollback,
+  replacecommit,
+  deltacommit,
+  logcompaction,
+  clustering;
+
+  public static boolean isCommitActionType(String actionTypeStr) {
+    ActionType actionType = ActionType.valueOf(actionTypeStr.toLowerCase());
+    return actionType == commit || actionType == replacecommit || actionType 
== deltacommit;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index c7f33550262..ead5f77518f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1507,10 +1507,11 @@ public class HoodieTableMetadataUtil {
     HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
 
     // NOTE: Writer schema added to commit metadata will not contain Hudi's 
metadata fields
-    Option<Schema> tableSchema = writerSchema.map(schema ->
-        tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
+    Option<Schema> tableSchema = writerSchema.isEmpty()
+        ? tableConfig.getTableCreateSchema() // the write schema does not set 
up correctly
+        : writerSchema.map(schema -> tableConfig.populateMetaFields() ? 
addMetadataFields(schema) : schema);
 
-    return getColumnsToIndex(dataMetaClient.getTableConfig(), metadataConfig,
+    return getColumnsToIndex(tableConfig, metadataConfig,
         Lazy.eagerly(tableSchema), false, recordTypeOpt);
   }
 
@@ -1576,10 +1577,10 @@ public class HoodieTableMetadataUtil {
                                                                            
Option<HoodieRecordType> recordType) {
     List<String> columnsToIndex = 
metadataConfig.getColumnsEnabledForColumnStatsIndex();
     if (!columnsToIndex.isEmpty()) {
-      // if explicitly overriden
+      // if explicitly overridden
       if (isTableInitializing) {
         Map<String, Schema> toReturn = new LinkedHashMap<>();
-        columnsToIndex.stream().forEach(colName -> toReturn.put(colName, 
null));
+        columnsToIndex.forEach(colName -> toReturn.put(colName, null));
         return toReturn;
       }
       ValidationUtils.checkArgument(tableSchemaLazyOpt.get().isPresent(), 
"Table schema not found for the table while computing col stats");
@@ -1588,9 +1589,8 @@ public class HoodieTableMetadataUtil {
       Map<String, Schema> colsToIndexSchemaMap = new LinkedHashMap<>();
       columnsToIndex.stream().filter(fieldName -> 
!META_COL_SET_TO_INDEX.contains(fieldName))
           .map(colName -> Pair.of(colName, 
HoodieAvroUtils.getSchemaForField(tableSchema.get(), 
colName).getRight().schema()))
-          .filter(fieldNameSchemaPair -> {
-            return isColumnTypeSupported(fieldNameSchemaPair.getValue(), 
recordType);
-          }).forEach(entry -> colsToIndexSchemaMap.put(entry.getKey(), 
entry.getValue()));
+          .filter(fieldNameSchemaPair -> 
isColumnTypeSupported(fieldNameSchemaPair.getValue(), recordType))
+          .forEach(entry -> colsToIndexSchemaMap.put(entry.getKey(), 
entry.getValue()));
       return colsToIndexSchemaMap;
     }
     // if not overridden
@@ -1599,10 +1599,9 @@ public class HoodieTableMetadataUtil {
       tableSchemaLazyOpt.get().map(schema -> getFirstNSupportedFields(schema, 
metadataConfig.maxColumnsToIndexForColStats(), 
recordType)).orElse(Stream.empty())
           .forEach(entry -> colsToIndexSchemaMap.put(entry.getKey(), 
entry.getValue()));
       return colsToIndexSchemaMap;
-    } else if (isTableInitializing) {
-      return Collections.emptyMap();
     } else {
-      throw new HoodieMetadataException("Cannot initialize col stats with 
empty list of cols");
+      // initialize col stats index config with empty list of cols
+      return Collections.emptyMap();
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 1984a7c7bae..1e9f7f89fdf 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -215,6 +215,7 @@ public class FlinkWriteClients {
                 
.parquetMaxFileSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE) * 
1024 * 1024L)
                 .build())
             .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+                .withEngineType(EngineType.FLINK) // this affects the default 
value inference
                 .enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
                 
.withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
                 .build())

Reply via email to