This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 61630b8b432 branch-3.1: [multi-catalog](hudi) impl hudi_metadata table 
value function #46137 (#52015)
61630b8b432 is described below

commit 61630b8b432b5d8c371e7c479dfe3f67c0f7a056
Author: Socrates <[email protected]>
AuthorDate: Fri Jun 20 16:04:40 2025 +0800

    branch-3.1: [multi-catalog](hudi) impl hudi_metadata table value function 
#46137 (#52015)
    
    bp #46137
---
 be/src/vec/exec/scan/vmeta_scanner.cpp             |  23 +++
 be/src/vec/exec/scan/vmeta_scanner.h               |   2 +
 .../doris/catalog/BuiltinTableValuedFunctions.java |   2 +
 .../hudi/source/HudiCachedMetaClientProcessor.java |  21 ++-
 .../hudi/source/HudiMetadataCacheMgr.java          |  10 +-
 .../expressions/functions/table/HudiMeta.java      |  56 ++++++++
 .../visitor/TableValuedFunctionVisitor.java        |   5 +
 .../tablefunction/HudiTableValuedFunction.java     | 156 +++++++++++++++++++++
 .../doris/tablefunction/MetadataGenerator.java     | 103 ++++++++++----
 .../tablefunction/MetadataTableValuedFunction.java |   2 +
 .../doris/tablefunction/TableValuedFunctionIf.java |   4 +-
 gensrc/thrift/FrontendService.thrift               |   1 +
 gensrc/thrift/PlanNodes.thrift                     |   8 ++
 gensrc/thrift/Types.thrift                         |   7 +-
 .../data/external_table_p2/hudi/test_hudi_meta.out | Bin 0 -> 4093 bytes
 .../external_table_p2/hudi/test_hudi_meta.groovy   |  46 ++++++
 16 files changed, 403 insertions(+), 43 deletions(-)

diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp 
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index 7fbc3ed2d37..ff68fd67637 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -234,6 +234,9 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& 
meta_scan_range) {
     case TMetadataType::ICEBERG:
         RETURN_IF_ERROR(_build_iceberg_metadata_request(meta_scan_range, 
&request));
         break;
+    case TMetadataType::HUDI:
+        RETURN_IF_ERROR(_build_hudi_metadata_request(meta_scan_range, 
&request));
+        break;
     case TMetadataType::BACKENDS:
         RETURN_IF_ERROR(_build_backends_metadata_request(meta_scan_range, 
&request));
         break;
@@ -316,6 +319,26 @@ Status VMetaScanner::_build_iceberg_metadata_request(const 
TMetaScanRange& meta_
     return Status::OK();
 }
 
+Status VMetaScanner::_build_hudi_metadata_request(const TMetaScanRange& 
meta_scan_range,
+                                                  
TFetchSchemaTableDataRequest* request) {
+    VLOG_CRITICAL << "VMetaScanner::_build_hudi_metadata_request";
+    if (!meta_scan_range.__isset.hudi_params) {
+        return Status::InternalError("Can not find THudiMetadataParams from 
meta_scan_range.");
+    }
+
+    // create request
+    request->__set_cluster_name("");
+    request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);
+
+    // create TMetadataTableRequestParams
+    TMetadataTableRequestParams metadata_table_params;
+    metadata_table_params.__set_metadata_type(TMetadataType::HUDI);
+    
metadata_table_params.__set_hudi_metadata_params(meta_scan_range.hudi_params);
+
+    request->__set_metada_table_params(metadata_table_params);
+    return Status::OK();
+}
+
 Status VMetaScanner::_build_backends_metadata_request(const TMetaScanRange& 
meta_scan_range,
                                                       
TFetchSchemaTableDataRequest* request) {
     VLOG_CRITICAL << "VMetaScanner::_build_backends_metadata_request";
diff --git a/be/src/vec/exec/scan/vmeta_scanner.h 
b/be/src/vec/exec/scan/vmeta_scanner.h
index 350e0fbf807..3942fd793d9 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.h
+++ b/be/src/vec/exec/scan/vmeta_scanner.h
@@ -66,6 +66,8 @@ private:
     Status _fetch_metadata(const TMetaScanRange& meta_scan_range);
     Status _build_iceberg_metadata_request(const TMetaScanRange& 
meta_scan_range,
                                            TFetchSchemaTableDataRequest* 
request);
+    Status _build_hudi_metadata_request(const TMetaScanRange& meta_scan_range,
+                                        TFetchSchemaTableDataRequest* request);
     Status _build_backends_metadata_request(const TMetaScanRange& 
meta_scan_range,
                                             TFetchSchemaTableDataRequest* 
request);
     Status _build_frontends_metadata_request(const TMetaScanRange& 
meta_scan_range,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
index 88c98162093..6b691c9526f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
@@ -24,6 +24,7 @@ import 
org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks
 import org.apache.doris.nereids.trees.expressions.functions.table.GroupCommit;
 import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
 import org.apache.doris.nereids.trees.expressions.functions.table.HttpStream;
+import org.apache.doris.nereids.trees.expressions.functions.table.HudiMeta;
 import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
 import org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
 import org.apache.doris.nereids.trees.expressions.functions.table.Local;
@@ -51,6 +52,7 @@ public class BuiltinTableValuedFunctions implements 
FunctionHelper {
             tableValued(FrontendsDisks.class, "frontends_disks"),
             tableValued(GroupCommit.class, "group_commit"),
             tableValued(Local.class, "local"),
+            tableValued(HudiMeta.class, "hudi_meta"),
             tableValued(IcebergMeta.class, "iceberg_meta"),
             tableValued(Hdfs.class, "hdfs"),
             tableValued(HttpStream.class, "http_stream"),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java
index 07726a54ffe..9ed1007e804 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java
@@ -47,23 +47,22 @@ public class HudiCachedMetaClientProcessor {
                 true,
                 null);
 
-        this.hudiTableMetaClientCache =
-                partitionCacheFactory.buildCache(
-                        this::createHoodieTableMetaClient,
-                        null,
-                        executor);
+        this.hudiTableMetaClientCache = partitionCacheFactory.buildCache(
+                this::createHoodieTableMetaClient,
+                null,
+                executor);
     }
 
     private HoodieTableMetaClient 
createHoodieTableMetaClient(HudiCachedClientKey key) {
         LOG.debug("create hudi table meta client for {}.{}", key.getDbName(), 
key.getTbName());
         HadoopStorageConfiguration hadoopStorageConfiguration = new 
HadoopStorageConfiguration(key.getConf());
         return HiveMetaStoreClientHelper.ugiDoAs(
-            key.getConf(),
-            () -> HoodieTableMetaClient
-                .builder()
-                .setConf(hadoopStorageConfiguration)
-                .setBasePath(key.getHudiBasePath())
-                .build());
+                key.getConf(),
+                () -> HoodieTableMetaClient
+                        .builder()
+                        .setConf(hadoopStorageConfiguration)
+                        .setBasePath(key.getHudiBasePath())
+                        .build());
     }
 
     public HoodieTableMetaClient getHoodieTableMetaClient(
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java
index 4ede5c73cfa..8d921a2f3d3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.datasource.hudi.source;
 
-import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
 
 import com.google.common.collect.Maps;
@@ -36,7 +36,7 @@ public class HudiMetadataCacheMgr {
         this.executor = executor;
     }
 
-    public HudiPartitionProcessor getPartitionProcessor(ExternalCatalog 
catalog) {
+    public HudiPartitionProcessor getPartitionProcessor(CatalogIf catalog) {
         return partitionProcessors.computeIfAbsent(catalog.getId(), catalogId 
-> {
             if (catalog instanceof HMSExternalCatalog) {
                 return new HudiCachedPartitionProcessor(catalogId, executor);
@@ -46,7 +46,7 @@ public class HudiMetadataCacheMgr {
         });
     }
 
-    public HudiCachedFsViewProcessor getFsViewProcessor(ExternalCatalog 
catalog) {
+    public HudiCachedFsViewProcessor getFsViewProcessor(CatalogIf catalog) {
         return fsViewProcessors.computeIfAbsent(catalog.getId(), catalogId -> {
             if (catalog instanceof HMSExternalCatalog) {
                 return new HudiCachedFsViewProcessor(executor);
@@ -56,7 +56,7 @@ public class HudiMetadataCacheMgr {
         });
     }
 
-    public HudiCachedMetaClientProcessor 
getHudiMetaClientProcessor(ExternalCatalog catalog) {
+    public HudiCachedMetaClientProcessor getHudiMetaClientProcessor(CatalogIf 
catalog) {
         return metaClientProcessors.computeIfAbsent(catalog.getId(), catalogId 
-> {
             if (catalog instanceof HMSExternalCatalog) {
                 return new HudiCachedMetaClientProcessor(executor);
@@ -126,7 +126,7 @@ public class HudiMetadataCacheMgr {
         }
     }
 
-    public Map<String, Map<String, String>> getCacheStats(ExternalCatalog 
catalog) {
+    public Map<String, Map<String, String>> getCacheStats(CatalogIf catalog) {
         Map<String, Map<String, String>> res = Maps.newHashMap();
 
         HudiCachedPartitionProcessor partitionProcessor = 
(HudiCachedPartitionProcessor) getPartitionProcessor(catalog);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/HudiMeta.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/HudiMeta.java
new file mode 100644
index 00000000000..2d62252fd2d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/HudiMeta.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.table;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.coercion.AnyDataType;
+import org.apache.doris.tablefunction.HudiTableValuedFunction;
+import org.apache.doris.tablefunction.TableValuedFunctionIf;
+
+import java.util.Map;
+
+/** hudi_meta */
+public class HudiMeta extends TableValuedFunction {
+    public HudiMeta(Properties properties) {
+        super("hudi_meta", properties);
+    }
+
+    @Override
+    public FunctionSignature customSignature() {
+        return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, 
getArgumentsTypes());
+    }
+
+    @Override
+    protected TableValuedFunctionIf toCatalogFunction() {
+        try {
+            Map<String, String> arguments = getTVFProperties().getMap();
+            return new HudiTableValuedFunction(arguments);
+        } catch (Throwable t) {
+            throw new AnalysisException("Can not build HudiTableValuedFunction 
by "
+                    + this + ": " + t.getMessage(), t);
+        }
+    }
+
+    @Override
+    public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
+        return visitor.visitHudiMeta(this, context);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
index 0b4b57e11dc..dfb59f0be2c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
@@ -24,6 +24,7 @@ import 
org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks
 import org.apache.doris.nereids.trees.expressions.functions.table.GroupCommit;
 import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
 import org.apache.doris.nereids.trees.expressions.functions.table.HttpStream;
+import org.apache.doris.nereids.trees.expressions.functions.table.HudiMeta;
 import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
 import org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
 import org.apache.doris.nereids.trees.expressions.functions.table.Local;
@@ -88,6 +89,10 @@ public interface TableValuedFunctionVisitor<R, C> {
         return visitTableValuedFunction(httpStream, context);
     }
 
+    default R visitHudiMeta(HudiMeta hudiMeta, C context) {
+        return visitTableValuedFunction(hudiMeta, context);
+    }
+
     default R visitIcebergMeta(IcebergMeta icebergMeta, C context) {
         return visitTableValuedFunction(icebergMeta, context);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
new file mode 100644
index 00000000000..87791df380a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.THudiMetadataParams;
+import org.apache.doris.thrift.THudiQueryType;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Implement of table valued function
+ * hudi_meta("table" = "ctl.db.tbl", "query_type" = "timeline").
+ */
+public class HudiTableValuedFunction extends MetadataTableValuedFunction {
+
+    public static final String NAME = "hudi_meta";
+    private static final String TABLE = "table";
+    private static final String QUERY_TYPE = "query_type";
+
+    private static final ImmutableSet<String> PROPERTIES_SET = 
ImmutableSet.of(TABLE, QUERY_TYPE);
+
+    private static final ImmutableList<Column> SCHEMA_TIMELINE = 
ImmutableList.of(
+            new Column("timestamp", PrimitiveType.STRING, false),
+            new Column("action", PrimitiveType.STRING, false),
+            new Column("file_name", PrimitiveType.STRING, false),
+            new Column("state", PrimitiveType.STRING, false),
+            new Column("state_transition_time", PrimitiveType.STRING, false));
+
+    private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
+
+    static {
+        ImmutableMap.Builder<String, Integer> builder = new 
ImmutableMap.Builder();
+        for (int i = 0; i < SCHEMA_TIMELINE.size(); i++) {
+            builder.put(SCHEMA_TIMELINE.get(i).getName().toLowerCase(), i);
+        }
+        COLUMN_TO_INDEX = builder.build();
+    }
+
+    public static Integer getColumnIndexFromColumnName(String columnName) {
+        return COLUMN_TO_INDEX.get(columnName.toLowerCase());
+    }
+
+    private THudiQueryType queryType;
+
+    // here tableName represents the name of a table in Hudi.
+    private final TableName hudiTableName;
+
+    public HudiTableValuedFunction(Map<String, String> params) throws 
AnalysisException {
+        Map<String, String> validParams = Maps.newHashMap();
+        for (String key : params.keySet()) {
+            if (!PROPERTIES_SET.contains(key.toLowerCase())) {
+                throw new AnalysisException("'" + key + "' is invalid 
property");
+            }
+            // check ctl, db, tbl
+            validParams.put(key.toLowerCase(), params.get(key));
+        }
+        String tableName = validParams.get(TABLE);
+        String queryTypeString = validParams.get(QUERY_TYPE);
+        if (tableName == null || queryTypeString == null) {
+            throw new AnalysisException("Invalid hudi metadata query");
+        }
+        String[] names = tableName.split("\\.");
+        if (names.length != 3) {
+            throw new AnalysisException("The hudi table name contains the 
catalogName, databaseName, and tableName");
+        }
+        this.hudiTableName = new TableName(names[0], names[1], names[2]);
+        // check auth
+        if (!Env.getCurrentEnv().getAccessManager()
+                .checkTblPriv(ConnectContext.get(), this.hudiTableName, 
PrivPredicate.SELECT)) {
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, 
"SELECT",
+                    ConnectContext.get().getQualifiedUser(), 
ConnectContext.get().getRemoteIP(),
+                    this.hudiTableName.getDb() + ": " + 
this.hudiTableName.getTbl());
+        }
+        try {
+            this.queryType = 
THudiQueryType.valueOf(queryTypeString.toUpperCase());
+        } catch (IllegalArgumentException e) {
+            throw new AnalysisException("Unsupported hudi metadata query type: 
" + queryType);
+        }
+    }
+
+    public THudiQueryType getHudiQueryType() {
+        return queryType;
+    }
+
+    @Override
+    public TMetadataType getMetadataType() {
+        return TMetadataType.HUDI;
+    }
+
+    @Override
+    public TMetaScanRange getMetaScanRange() {
+        TMetaScanRange metaScanRange = new TMetaScanRange();
+        metaScanRange.setMetadataType(TMetadataType.HUDI);
+        // set hudi metadata params
+        THudiMetadataParams hudiMetadataParams = new THudiMetadataParams();
+        hudiMetadataParams.setHudiQueryType(queryType);
+        hudiMetadataParams.setCatalog(hudiTableName.getCtl());
+        hudiMetadataParams.setDatabase(hudiTableName.getDb());
+        hudiMetadataParams.setTable(hudiTableName.getTbl());
+        metaScanRange.setHudiParams(hudiMetadataParams);
+        return metaScanRange;
+    }
+
+    @Override
+    public String getTableName() {
+        return "HudiMetadataTableValuedFunction";
+    }
+
+    /**
+     * The tvf can register columns of metadata table
+     * The data is provided by getHudiMetadataTable in FrontendService
+     *
+     * @return metadata columns
+     * @see org.apache.doris.service.FrontendServiceImpl
+     */
+    @Override
+    public List<Column> getTableColumns() {
+        if (queryType == THudiQueryType.TIMELINE) {
+            return SCHEMA_TIMELINE;
+        }
+        return Lists.newArrayList();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 1420e03f375..d3b8529c3c1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -58,6 +58,7 @@ import org.apache.doris.datasource.TablePartitionValues;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.hive.HiveMetaStoreCache;
+import org.apache.doris.datasource.hudi.source.HudiCachedMetaClientProcessor;
 import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
@@ -82,6 +83,8 @@ import org.apache.doris.thrift.TBackendsMetadataParams;
 import org.apache.doris.thrift.TCell;
 import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
 import org.apache.doris.thrift.TFetchSchemaTableDataResult;
+import org.apache.doris.thrift.THudiMetadataParams;
+import org.apache.doris.thrift.THudiQueryType;
 import org.apache.doris.thrift.TIcebergMetadataParams;
 import org.apache.doris.thrift.TIcebergQueryType;
 import org.apache.doris.thrift.TJobsMetadataParams;
@@ -105,6 +108,9 @@ import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.iceberg.Snapshot;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -222,6 +228,9 @@ public class MetadataGenerator {
             case ICEBERG:
                 result = icebergMetadataResult(params);
                 break;
+            case HUDI:
+                result = hudiMetadataResult(params);
+                break;
             case BACKENDS:
                 result = backendsMetadataResult(params);
                 break;
@@ -376,6 +385,48 @@ public class MetadataGenerator {
         return result;
     }
 
+    private static TFetchSchemaTableDataResult 
hudiMetadataResult(TMetadataTableRequestParams params) {
+        if (!params.isSetHudiMetadataParams()) {
+            return errorResult("Hudi metadata params is not set.");
+        }
+
+        THudiMetadataParams hudiMetadataParams = 
params.getHudiMetadataParams();
+        THudiQueryType hudiQueryType = hudiMetadataParams.getHudiQueryType();
+        CatalogIf catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(hudiMetadataParams.getCatalog());
+        if (catalog == null) {
+            return errorResult("The specified catalog does not exist:" + 
hudiMetadataParams.getCatalog());
+        }
+        HudiCachedMetaClientProcessor hudiMetadataCache = 
Env.getCurrentEnv().getExtMetaCacheMgr()
+                .getHudiMetadataCacheMgr().getHudiMetaClientProcessor(catalog);
+        String hudiBasePathString = ((HMSExternalCatalog) catalog).getClient()
+                .getTable(hudiMetadataParams.getDatabase(), 
hudiMetadataParams.getTable()).getSd().getLocation();
+        Configuration conf = ((HMSExternalCatalog) catalog).getConfiguration();
+
+        List<TRow> dataBatch = Lists.newArrayList();
+        TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+
+        switch (hudiQueryType) {
+            case TIMELINE:
+                HoodieTimeline timeline = 
hudiMetadataCache.getHoodieTableMetaClient(hudiMetadataParams.getDatabase(),
+                        hudiMetadataParams.getTable(), hudiBasePathString, 
conf).getActiveTimeline();
+                for (HoodieInstant instant : timeline.getInstants()) {
+                    TRow trow = new TRow();
+                    trow.addToColumnValue(new 
TCell().setStringVal(instant.getTimestamp()));
+                    trow.addToColumnValue(new 
TCell().setStringVal(instant.getAction()));
+                    trow.addToColumnValue(new 
TCell().setStringVal(instant.getFileName()));
+                    trow.addToColumnValue(new 
TCell().setStringVal(instant.getState().name()));
+                    trow.addToColumnValue(new 
TCell().setStringVal(instant.getStateTransitionTime()));
+                    dataBatch.add(trow);
+                }
+                break;
+            default:
+                return errorResult("Unsupported hudi inspect type: " + 
hudiQueryType);
+        }
+        result.setDataBatch(dataBatch);
+        result.setStatus(new TStatus(TStatusCode.OK));
+        return result;
+    }
+
     private static TFetchSchemaTableDataResult 
backendsMetadataResult(TMetadataTableRequestParams params) {
         if (!params.isSetBackendsMetadataParams()) {
             return errorResult("backends metadata param is not set.");
@@ -1098,7 +1149,7 @@ public class MetadataGenerator {
     }
 
     private static void tableOptionsForInternalCatalog(UserIdentity 
currentUserIdentity,
-                    CatalogIf catalog, DatabaseIf database, List<TableIf> 
tables, List<TRow> dataBatch) {
+            CatalogIf catalog, DatabaseIf database, List<TableIf> tables, 
List<TRow> dataBatch) {
         for (TableIf table : tables) {
             if (!(table instanceof OlapTable)) {
                 continue;
@@ -1151,7 +1202,7 @@ public class MetadataGenerator {
     }
 
     private static void tableOptionsForExternalCatalog(UserIdentity 
currentUserIdentity,
-                    CatalogIf catalog, DatabaseIf database, List<TableIf> 
tables, List<TRow> dataBatch) {
+            CatalogIf catalog, DatabaseIf database, List<TableIf> tables, 
List<TRow> dataBatch) {
         for (TableIf table : tables) {
             if 
(!Env.getCurrentEnv().getAccessManager().checkTblPriv(currentUserIdentity, 
catalog.getName(),
                     database.getFullName(), table.getName(), 
PrivPredicate.SHOW)) {
@@ -1193,7 +1244,7 @@ public class MetadataGenerator {
         String clg = params.getCatalog();
         CatalogIf catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(clg);
         if (catalog == null) {
-            // catalog is NULL  let return empty to BE
+            // catalog is NULL let return empty to BE
             result.setDataBatch(dataBatch);
             result.setStatus(new TStatus(TStatusCode.OK));
             return result;
@@ -1202,7 +1253,8 @@ public class MetadataGenerator {
         if (database == null) {
             // BE gets the database id list from FE and then invokes this 
interface
             // per database. there is a chance that in between database can be 
dropped.
-            // so need to handle database not exist case and return ok so that 
BE continue the
+            // so need to handle database not exist case and return ok so that 
BE continue
+            // the
             // loop with next database.
             result.setDataBatch(dataBatch);
             result.setStatus(new TStatus(TStatusCode.OK));
@@ -1220,7 +1272,7 @@ public class MetadataGenerator {
     }
 
     private static void tablePropertiesForInternalCatalog(UserIdentity 
currentUserIdentity,
-                CatalogIf catalog, DatabaseIf database, List<TableIf> tables, 
List<TRow> dataBatch) {
+            CatalogIf catalog, DatabaseIf database, List<TableIf> tables, 
List<TRow> dataBatch) {
         for (TableIf table : tables) {
             if (!(table instanceof OlapTable)) {
                 continue;
@@ -1245,7 +1297,7 @@ public class MetadataGenerator {
                     continue;
                 }
 
-                Map<String, String>  propertiesMap = property.getProperties();
+                Map<String, String> propertiesMap = property.getProperties();
                 propertiesMap.forEach((key, value) -> {
                     TRow trow = new TRow();
                     trow.addToColumnValue(new 
TCell().setStringVal(catalog.getName())); // TABLE_CATALOG
@@ -1268,7 +1320,8 @@ public class MetadataGenerator {
                     database.getFullName(), table.getName(), 
PrivPredicate.SHOW)) {
                 continue;
             }
-            // Currently for external catalog, we put properties as empty, can 
extend in future
+            // Currently for external catalog, we put properties as empty, can 
extend in
+            // future
             TRow trow = new TRow();
             trow.addToColumnValue(new 
TCell().setStringVal(catalog.getName())); // TABLE_CATALOG
             trow.addToColumnValue(new 
TCell().setStringVal(database.getFullName())); // TABLE_SCHEMA
@@ -1300,7 +1353,7 @@ public class MetadataGenerator {
         List<TRow> dataBatch = Lists.newArrayList();
         CatalogIf catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(clg);
         if (catalog == null) {
-            // catalog is NULL  let return empty to BE
+            // catalog is NULL let return empty to BE
             result.setDataBatch(dataBatch);
             result.setStatus(new TStatus(TStatusCode.OK));
             return result;
@@ -1309,8 +1362,8 @@ public class MetadataGenerator {
         if (database == null) {
             // BE gets the database id list from FE and then invokes this 
interface
             // per database. there is a chance that in between database can be 
dropped.
-            // so need to handle database not exist case and return ok so that 
BE continue the
-            // loop with next database.
+            // so need to handle database not exist case and return ok so that 
BE continue
+            // the loop with next database.
             result.setDataBatch(dataBatch);
             result.setStatus(new TStatus(TStatusCode.OK));
             return result;
@@ -1353,7 +1406,7 @@ public class MetadataGenerator {
     }
 
     private static void partitionsForInternalCatalog(UserIdentity 
currentUserIdentity,
-                CatalogIf catalog, DatabaseIf database, List<TableIf> tables, 
List<TRow> dataBatch) {
+            CatalogIf catalog, DatabaseIf database, List<TableIf> tables, 
List<TRow> dataBatch) {
         for (TableIf table : tables) {
             if (!(table instanceof OlapTable)) {
                 continue;
@@ -1379,8 +1432,8 @@ public class MetadataGenerator {
                     trow.addToColumnValue(new 
TCell().setStringVal(partition.getName())); // PARTITION_NAME
                     trow.addToColumnValue(new TCell().setStringVal("NULL")); 
// SUBPARTITION_NAME (always null)
 
-                    trow.addToColumnValue(new TCell().setIntVal(0)); 
//PARTITION_ORDINAL_POSITION (not available)
-                    trow.addToColumnValue(new TCell().setIntVal(0)); 
//SUBPARTITION_ORDINAL_POSITION (not available)
+                    trow.addToColumnValue(new TCell().setIntVal(0)); // 
PARTITION_ORDINAL_POSITION (not available)
+                    trow.addToColumnValue(new TCell().setIntVal(0)); // 
SUBPARTITION_ORDINAL_POSITION (not available)
                     trow.addToColumnValue(new TCell().setStringVal(
                             partitionInfo.getType().toString())); // 
PARTITION_METHOD
                     trow.addToColumnValue(new TCell().setStringVal("NULL")); 
// SUBPARTITION_METHOD(always null)
@@ -1399,17 +1452,17 @@ public class MetadataGenerator {
                         trow.addToColumnValue(new TCell().setStringVal(
                                 item.getItemsSql())); // PARITION DESC
                     }
-                    trow.addToColumnValue(new 
TCell().setLongVal(partition.getRowCount())); //TABLE_ROWS (PARTITION row)
-                    trow.addToColumnValue(new 
TCell().setLongVal(partition.getAvgRowLength())); //AVG_ROW_LENGTH
-                    trow.addToColumnValue(new 
TCell().setLongVal(partition.getDataLength())); //DATA_LENGTH
-                    trow.addToColumnValue(new TCell().setIntVal(0)); 
//MAX_DATA_LENGTH (not available)
-                    trow.addToColumnValue(new TCell().setIntVal(0)); 
//INDEX_LENGTH (not available)
-                    trow.addToColumnValue(new TCell().setIntVal(0)); 
//DATA_FREE (not available)
-                    trow.addToColumnValue(new TCell().setStringVal("NULL")); 
//CREATE_TIME (not available)
+                    trow.addToColumnValue(new 
TCell().setLongVal(partition.getRowCount())); // TABLE_ROWS (PARTITION)
+                    trow.addToColumnValue(new 
TCell().setLongVal(partition.getAvgRowLength())); // AVG_ROW_LENGTH
+                    trow.addToColumnValue(new 
TCell().setLongVal(partition.getDataLength())); // DATA_LENGTH
+                    trow.addToColumnValue(new TCell().setIntVal(0)); // 
MAX_DATA_LENGTH (not available)
+                    trow.addToColumnValue(new TCell().setIntVal(0)); // 
INDEX_LENGTH (not available)
+                    trow.addToColumnValue(new TCell().setIntVal(0)); // 
DATA_FREE (not available)
+                    trow.addToColumnValue(new TCell().setStringVal("NULL")); 
// CREATE_TIME (not available)
                     trow.addToColumnValue(new TCell().setStringVal(
-                            
TimeUtils.longToTimeString(partition.getVisibleVersionTime()))); //UPDATE_TIME
+                            
TimeUtils.longToTimeString(partition.getVisibleVersionTime()))); // UPDATE_TIME
                     trow.addToColumnValue(new TCell().setStringVal("NULL")); 
// CHECK_TIME (not available)
-                    trow.addToColumnValue(new TCell().setIntVal(0)); 
//CHECKSUM (not available)
+                    trow.addToColumnValue(new TCell().setIntVal(0)); // 
CHECKSUM (not available)
                     trow.addToColumnValue(new TCell().setStringVal("")); // 
PARTITION_COMMENT (not available)
                     trow.addToColumnValue(new TCell().setStringVal("")); // 
NODEGROUP (not available)
                     trow.addToColumnValue(new TCell().setStringVal("")); // 
TABLESPACE_NAME (not available)
@@ -1510,7 +1563,7 @@ public class MetadataGenerator {
         List<TRow> dataBatch = Lists.newArrayList();
         CatalogIf catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(clg);
         if (catalog == null) {
-            // catalog is NULL  let return empty to BE
+            // catalog is NULL let return empty to BE
             result.setDataBatch(dataBatch);
             result.setStatus(new TStatus(TStatusCode.OK));
             return result;
@@ -1519,7 +1572,8 @@ public class MetadataGenerator {
         if (database == null) {
             // BE gets the database id list from FE and then invokes this 
interface
             // per database. there is a chance that in between database can be 
dropped.
-            // so need to handle database not exist case and return ok so that 
BE continue the
+            // so need to handle database not exist case and return ok so that 
BE continue
+            // the
             // loop with next database.
             result.setDataBatch(dataBatch);
             result.setStatus(new TStatus(TStatusCode.OK));
@@ -1670,4 +1724,3 @@ public class MetadataGenerator {
     }
 
 }
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
index 7bd28f363e7..074f3d66c96 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -41,6 +41,8 @@ public abstract class MetadataTableValuedFunction extends 
TableValuedFunctionIf
                 return 
FrontendsDisksTableValuedFunction.getColumnIndexFromColumnName(columnName);
             case ICEBERG:
                 return 
IcebergTableValuedFunction.getColumnIndexFromColumnName(columnName);
+            case HUDI:
+                return 
HudiTableValuedFunction.getColumnIndexFromColumnName(columnName);
             case CATALOGS:
                 return 
CatalogsTableValuedFunction.getColumnIndexFromColumnName(columnName);
             case MATERIALIZED_VIEWS:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index eb323a76672..133c1885a84 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -44,7 +44,7 @@ public abstract class TableValuedFunctionIf {
 
     // All table functions should be registered here
     public static TableValuedFunctionIf getTableFunction(String funcName, 
Map<String, String> params)
-                                                        throws 
AnalysisException {
+            throws AnalysisException {
         switch (funcName.toLowerCase()) {
             case NumbersTableValuedFunction.NAME:
                 return new NumbersTableValuedFunction(params);
@@ -58,6 +58,8 @@ public abstract class TableValuedFunctionIf {
                 return new LocalTableValuedFunction(params);
             case IcebergTableValuedFunction.NAME:
                 return new IcebergTableValuedFunction(params);
+            case HudiTableValuedFunction.NAME:
+                return new HudiTableValuedFunction(params);
             case BackendsTableValuedFunction.NAME:
                 return new BackendsTableValuedFunction(params);
             case FrontendsTableValuedFunction.NAME:
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 92205c5ae0f..19d91f0fded 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1037,6 +1037,7 @@ struct TMetadataTableRequestParams {
   11: optional PlanNodes.TPartitionsMetadataParams partitions_metadata_params
   12: optional PlanNodes.TMetaCacheStatsParams meta_cache_stats_params
   13: optional PlanNodes.TPartitionValuesMetadataParams 
partition_values_metadata_params
+  14: optional PlanNodes.THudiMetadataParams hudi_metadata_params
 }
 
 struct TSchemaTableRequestParams {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index c92b1c91523..611c58d2bfd 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -541,6 +541,13 @@ struct TIcebergMetadataParams {
   4: optional string table
 }
 
+struct THudiMetadataParams {
+  1: optional Types.THudiQueryType hudi_query_type
+  2: optional string catalog
+  3: optional string database
+  4: optional string table
+}
+
 struct TBackendsMetadataParams {
   1: optional string cluster_name
 }
@@ -601,6 +608,7 @@ struct TMetaScanRange {
   9: optional TPartitionsMetadataParams partitions_params
   10: optional TMetaCacheStatsParams meta_cache_stats_params
   11: optional TPartitionValuesMetadataParams partition_values_params
+  12: optional THudiMetadataParams hudi_params
 }
 
 // Specification of an individual data range which is held in its entirety
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index dbb385d0d1d..58716f51f3f 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -735,13 +735,18 @@ enum TMetadataType {
   TASKS,
   WORKLOAD_SCHED_POLICY,
   PARTITIONS,
-  PARTITION_VALUES;
+  PARTITION_VALUES,
+  HUDI,
 }
 
 enum TIcebergQueryType {
   SNAPSHOTS
 }
 
+enum THudiQueryType {
+  TIMELINE
+}
+
 // represent a user identity
 struct TUserIdentity {
     1: optional string username
diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_meta.out 
b/regression-test/data/external_table_p2/hudi/test_hudi_meta.out
new file mode 100644
index 00000000000..0c312efd3c0
Binary files /dev/null and 
b/regression-test/data/external_table_p2/hudi/test_hudi_meta.out differ
diff --git 
a/regression-test/suites/external_table_p2/hudi/test_hudi_meta.groovy 
b/regression-test/suites/external_table_p2/hudi/test_hudi_meta.groovy
new file mode 100644
index 00000000000..0fe650aded9
--- /dev/null
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_meta.groovy
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hudi_meta", 
"p2,external,hudi,external_remote,external_remote_hudi") {
+    String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable hudi test")
+    }
+
+    String catalog_name = "test_hudi_meta"
+    String props = context.config.otherConfigs.get("hudiEmrCatalog")
+    sql """drop catalog if exists ${catalog_name};"""
+    sql """
+        create catalog if not exists ${catalog_name} properties (
+            ${props}
+        );
+    """
+
+    sql """ switch ${catalog_name};"""
+    sql """ use regression_hudi;""" 
+    sql """ set enable_fallback_to_original_planner=false """
+    
+    qt_hudi_meta1 """ select * from 
hudi_meta("table"="${catalog_name}.regression_hudi.user_activity_log_cow_non_partition",
 "query_type" = "timeline"); """
+    qt_hudi_meta2 """ select * from 
hudi_meta("table"="${catalog_name}.regression_hudi.user_activity_log_mor_non_partition",
 "query_type" = "timeline"); """
+    qt_hudi_meta3 """ select * from 
hudi_meta("table"="${catalog_name}.regression_hudi.user_activity_log_cow_partition",
 "query_type" = "timeline"); """
+    qt_hudi_meta4 """ select * from 
hudi_meta("table"="${catalog_name}.regression_hudi.user_activity_log_cow_partition",
 "query_type" = "timeline"); """
+
+    qt_hudi_meta5 """ select * from 
hudi_meta("table"="${catalog_name}.regression_hudi.timetravel_cow", 
"query_type" = "timeline"); """
+    qt_hudi_meta6 """ select * from 
hudi_meta("table"="${catalog_name}.regression_hudi.timetravel_mor", 
"query_type" = "timeline"); """
+
+    sql """drop catalog if exists ${catalog_name};"""
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to