This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 61630b8b432 branch-3.1: [multi-catalog](hudi) impl hudi_metadata table
value function #46137 (#52015)
61630b8b432 is described below
commit 61630b8b432b5d8c371e7c479dfe3f67c0f7a056
Author: Socrates <[email protected]>
AuthorDate: Fri Jun 20 16:04:40 2025 +0800
branch-3.1: [multi-catalog](hudi) impl hudi_metadata table value function
#46137 (#52015)
bp #46137
---
be/src/vec/exec/scan/vmeta_scanner.cpp | 23 +++
be/src/vec/exec/scan/vmeta_scanner.h | 2 +
.../doris/catalog/BuiltinTableValuedFunctions.java | 2 +
.../hudi/source/HudiCachedMetaClientProcessor.java | 21 ++-
.../hudi/source/HudiMetadataCacheMgr.java | 10 +-
.../expressions/functions/table/HudiMeta.java | 56 ++++++++
.../visitor/TableValuedFunctionVisitor.java | 5 +
.../tablefunction/HudiTableValuedFunction.java | 156 +++++++++++++++++++++
.../doris/tablefunction/MetadataGenerator.java | 103 ++++++++++----
.../tablefunction/MetadataTableValuedFunction.java | 2 +
.../doris/tablefunction/TableValuedFunctionIf.java | 4 +-
gensrc/thrift/FrontendService.thrift | 1 +
gensrc/thrift/PlanNodes.thrift | 8 ++
gensrc/thrift/Types.thrift | 7 +-
.../data/external_table_p2/hudi/test_hudi_meta.out | Bin 0 -> 4093 bytes
.../external_table_p2/hudi/test_hudi_meta.groovy | 46 ++++++
16 files changed, 403 insertions(+), 43 deletions(-)
diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index 7fbc3ed2d37..ff68fd67637 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -234,6 +234,9 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange&
meta_scan_range) {
case TMetadataType::ICEBERG:
RETURN_IF_ERROR(_build_iceberg_metadata_request(meta_scan_range,
&request));
break;
+ case TMetadataType::HUDI:
+ RETURN_IF_ERROR(_build_hudi_metadata_request(meta_scan_range,
&request));
+ break;
case TMetadataType::BACKENDS:
RETURN_IF_ERROR(_build_backends_metadata_request(meta_scan_range,
&request));
break;
@@ -316,6 +319,26 @@ Status VMetaScanner::_build_iceberg_metadata_request(const
TMetaScanRange& meta_
return Status::OK();
}
+Status VMetaScanner::_build_hudi_metadata_request(const TMetaScanRange&
meta_scan_range,
+
TFetchSchemaTableDataRequest* request) {
+ VLOG_CRITICAL << "VMetaScanner::_build_hudi_metadata_request";
+ if (!meta_scan_range.__isset.hudi_params) {
+ return Status::InternalError("Can not find THudiMetadataParams from
meta_scan_range.");
+ }
+
+ // create request
+ request->__set_cluster_name("");
+ request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);
+
+ // create TMetadataTableRequestParams
+ TMetadataTableRequestParams metadata_table_params;
+ metadata_table_params.__set_metadata_type(TMetadataType::HUDI);
+
metadata_table_params.__set_hudi_metadata_params(meta_scan_range.hudi_params);
+
+ request->__set_metada_table_params(metadata_table_params);
+ return Status::OK();
+}
+
Status VMetaScanner::_build_backends_metadata_request(const TMetaScanRange&
meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_backends_metadata_request";
diff --git a/be/src/vec/exec/scan/vmeta_scanner.h
b/be/src/vec/exec/scan/vmeta_scanner.h
index 350e0fbf807..3942fd793d9 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.h
+++ b/be/src/vec/exec/scan/vmeta_scanner.h
@@ -66,6 +66,8 @@ private:
Status _fetch_metadata(const TMetaScanRange& meta_scan_range);
Status _build_iceberg_metadata_request(const TMetaScanRange&
meta_scan_range,
TFetchSchemaTableDataRequest*
request);
+ Status _build_hudi_metadata_request(const TMetaScanRange& meta_scan_range,
+ TFetchSchemaTableDataRequest* request);
Status _build_backends_metadata_request(const TMetaScanRange&
meta_scan_range,
TFetchSchemaTableDataRequest*
request);
Status _build_frontends_metadata_request(const TMetaScanRange&
meta_scan_range,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
index 88c98162093..6b691c9526f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
@@ -24,6 +24,7 @@ import
org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks
import org.apache.doris.nereids.trees.expressions.functions.table.GroupCommit;
import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
import org.apache.doris.nereids.trees.expressions.functions.table.HttpStream;
+import org.apache.doris.nereids.trees.expressions.functions.table.HudiMeta;
import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
import org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
@@ -51,6 +52,7 @@ public class BuiltinTableValuedFunctions implements
FunctionHelper {
tableValued(FrontendsDisks.class, "frontends_disks"),
tableValued(GroupCommit.class, "group_commit"),
tableValued(Local.class, "local"),
+ tableValued(HudiMeta.class, "hudi_meta"),
tableValued(IcebergMeta.class, "iceberg_meta"),
tableValued(Hdfs.class, "hdfs"),
tableValued(HttpStream.class, "http_stream"),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java
index 07726a54ffe..9ed1007e804 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java
@@ -47,23 +47,22 @@ public class HudiCachedMetaClientProcessor {
true,
null);
- this.hudiTableMetaClientCache =
- partitionCacheFactory.buildCache(
- this::createHoodieTableMetaClient,
- null,
- executor);
+ this.hudiTableMetaClientCache = partitionCacheFactory.buildCache(
+ this::createHoodieTableMetaClient,
+ null,
+ executor);
}
private HoodieTableMetaClient
createHoodieTableMetaClient(HudiCachedClientKey key) {
LOG.debug("create hudi table meta client for {}.{}", key.getDbName(),
key.getTbName());
HadoopStorageConfiguration hadoopStorageConfiguration = new
HadoopStorageConfiguration(key.getConf());
return HiveMetaStoreClientHelper.ugiDoAs(
- key.getConf(),
- () -> HoodieTableMetaClient
- .builder()
- .setConf(hadoopStorageConfiguration)
- .setBasePath(key.getHudiBasePath())
- .build());
+ key.getConf(),
+ () -> HoodieTableMetaClient
+ .builder()
+ .setConf(hadoopStorageConfiguration)
+ .setBasePath(key.getHudiBasePath())
+ .build());
}
public HoodieTableMetaClient getHoodieTableMetaClient(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java
index 4ede5c73cfa..8d921a2f3d3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java
@@ -17,7 +17,7 @@
package org.apache.doris.datasource.hudi.source;
-import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import com.google.common.collect.Maps;
@@ -36,7 +36,7 @@ public class HudiMetadataCacheMgr {
this.executor = executor;
}
- public HudiPartitionProcessor getPartitionProcessor(ExternalCatalog
catalog) {
+ public HudiPartitionProcessor getPartitionProcessor(CatalogIf catalog) {
return partitionProcessors.computeIfAbsent(catalog.getId(), catalogId
-> {
if (catalog instanceof HMSExternalCatalog) {
return new HudiCachedPartitionProcessor(catalogId, executor);
@@ -46,7 +46,7 @@ public class HudiMetadataCacheMgr {
});
}
- public HudiCachedFsViewProcessor getFsViewProcessor(ExternalCatalog
catalog) {
+ public HudiCachedFsViewProcessor getFsViewProcessor(CatalogIf catalog) {
return fsViewProcessors.computeIfAbsent(catalog.getId(), catalogId -> {
if (catalog instanceof HMSExternalCatalog) {
return new HudiCachedFsViewProcessor(executor);
@@ -56,7 +56,7 @@ public class HudiMetadataCacheMgr {
});
}
- public HudiCachedMetaClientProcessor
getHudiMetaClientProcessor(ExternalCatalog catalog) {
+ public HudiCachedMetaClientProcessor getHudiMetaClientProcessor(CatalogIf
catalog) {
return metaClientProcessors.computeIfAbsent(catalog.getId(), catalogId
-> {
if (catalog instanceof HMSExternalCatalog) {
return new HudiCachedMetaClientProcessor(executor);
@@ -126,7 +126,7 @@ public class HudiMetadataCacheMgr {
}
}
- public Map<String, Map<String, String>> getCacheStats(ExternalCatalog
catalog) {
+ public Map<String, Map<String, String>> getCacheStats(CatalogIf catalog) {
Map<String, Map<String, String>> res = Maps.newHashMap();
HudiCachedPartitionProcessor partitionProcessor =
(HudiCachedPartitionProcessor) getPartitionProcessor(catalog);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/HudiMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/HudiMeta.java
new file mode 100644
index 00000000000..2d62252fd2d
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/HudiMeta.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.table;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.coercion.AnyDataType;
+import org.apache.doris.tablefunction.HudiTableValuedFunction;
+import org.apache.doris.tablefunction.TableValuedFunctionIf;
+
+import java.util.Map;
+
+/** hudi_meta */
+public class HudiMeta extends TableValuedFunction {
+ public HudiMeta(Properties properties) {
+ super("hudi_meta", properties);
+ }
+
+ @Override
+ public FunctionSignature customSignature() {
+ return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX,
getArgumentsTypes());
+ }
+
+ @Override
+ protected TableValuedFunctionIf toCatalogFunction() {
+ try {
+ Map<String, String> arguments = getTVFProperties().getMap();
+ return new HudiTableValuedFunction(arguments);
+ } catch (Throwable t) {
+ throw new AnalysisException("Can not build HudiTableValuedFunction
by "
+ + this + ": " + t.getMessage(), t);
+ }
+ }
+
+ @Override
+ public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
+ return visitor.visitHudiMeta(this, context);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
index 0b4b57e11dc..dfb59f0be2c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
@@ -24,6 +24,7 @@ import
org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks
import org.apache.doris.nereids.trees.expressions.functions.table.GroupCommit;
import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
import org.apache.doris.nereids.trees.expressions.functions.table.HttpStream;
+import org.apache.doris.nereids.trees.expressions.functions.table.HudiMeta;
import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
import org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
@@ -88,6 +89,10 @@ public interface TableValuedFunctionVisitor<R, C> {
return visitTableValuedFunction(httpStream, context);
}
+ default R visitHudiMeta(HudiMeta hudiMeta, C context) {
+ return visitTableValuedFunction(hudiMeta, context);
+ }
+
default R visitIcebergMeta(IcebergMeta icebergMeta, C context) {
return visitTableValuedFunction(icebergMeta, context);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
new file mode 100644
index 00000000000..87791df380a
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.THudiMetadataParams;
+import org.apache.doris.thrift.THudiQueryType;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Implement of table valued function
+ * hudi_meta("table" = "ctl.db.tbl", "query_type" = "timeline").
+ */
+public class HudiTableValuedFunction extends MetadataTableValuedFunction {
+
+ public static final String NAME = "hudi_meta";
+ private static final String TABLE = "table";
+ private static final String QUERY_TYPE = "query_type";
+
+ private static final ImmutableSet<String> PROPERTIES_SET =
ImmutableSet.of(TABLE, QUERY_TYPE);
+
+ private static final ImmutableList<Column> SCHEMA_TIMELINE =
ImmutableList.of(
+ new Column("timestamp", PrimitiveType.STRING, false),
+ new Column("action", PrimitiveType.STRING, false),
+ new Column("file_name", PrimitiveType.STRING, false),
+ new Column("state", PrimitiveType.STRING, false),
+ new Column("state_transition_time", PrimitiveType.STRING, false));
+
+ private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
+
+ static {
+ ImmutableMap.Builder<String, Integer> builder = new
ImmutableMap.Builder();
+ for (int i = 0; i < SCHEMA_TIMELINE.size(); i++) {
+ builder.put(SCHEMA_TIMELINE.get(i).getName().toLowerCase(), i);
+ }
+ COLUMN_TO_INDEX = builder.build();
+ }
+
+ public static Integer getColumnIndexFromColumnName(String columnName) {
+ return COLUMN_TO_INDEX.get(columnName.toLowerCase());
+ }
+
+ private THudiQueryType queryType;
+
+ // here tableName represents the name of a table in Hudi.
+ private final TableName hudiTableName;
+
+ public HudiTableValuedFunction(Map<String, String> params) throws
AnalysisException {
+ Map<String, String> validParams = Maps.newHashMap();
+ for (String key : params.keySet()) {
+ if (!PROPERTIES_SET.contains(key.toLowerCase())) {
+ throw new AnalysisException("'" + key + "' is invalid
property");
+ }
+ // check ctl, db, tbl
+ validParams.put(key.toLowerCase(), params.get(key));
+ }
+ String tableName = validParams.get(TABLE);
+ String queryTypeString = validParams.get(QUERY_TYPE);
+ if (tableName == null || queryTypeString == null) {
+ throw new AnalysisException("Invalid hudi metadata query");
+ }
+ String[] names = tableName.split("\\.");
+ if (names.length != 3) {
+ throw new AnalysisException("The hudi table name contains the
catalogName, databaseName, and tableName");
+ }
+ this.hudiTableName = new TableName(names[0], names[1], names[2]);
+ // check auth
+ if (!Env.getCurrentEnv().getAccessManager()
+ .checkTblPriv(ConnectContext.get(), this.hudiTableName,
PrivPredicate.SELECT)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
"SELECT",
+ ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
+ this.hudiTableName.getDb() + ": " +
this.hudiTableName.getTbl());
+ }
+ try {
+ this.queryType =
THudiQueryType.valueOf(queryTypeString.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new AnalysisException("Unsupported hudi metadata query type:
" + queryType);
+ }
+ }
+
+ public THudiQueryType getHudiQueryType() {
+ return queryType;
+ }
+
+ @Override
+ public TMetadataType getMetadataType() {
+ return TMetadataType.HUDI;
+ }
+
+ @Override
+ public TMetaScanRange getMetaScanRange() {
+ TMetaScanRange metaScanRange = new TMetaScanRange();
+ metaScanRange.setMetadataType(TMetadataType.HUDI);
+ // set hudi metadata params
+ THudiMetadataParams hudiMetadataParams = new THudiMetadataParams();
+ hudiMetadataParams.setHudiQueryType(queryType);
+ hudiMetadataParams.setCatalog(hudiTableName.getCtl());
+ hudiMetadataParams.setDatabase(hudiTableName.getDb());
+ hudiMetadataParams.setTable(hudiTableName.getTbl());
+ metaScanRange.setHudiParams(hudiMetadataParams);
+ return metaScanRange;
+ }
+
+ @Override
+ public String getTableName() {
+ return "HudiMetadataTableValuedFunction";
+ }
+
+ /**
+ * The tvf can register columns of metadata table
+ * The data is provided by getHudiMetadataTable in FrontendService
+ *
+ * @return metadata columns
+ * @see org.apache.doris.service.FrontendServiceImpl
+ */
+ @Override
+ public List<Column> getTableColumns() {
+ if (queryType == THudiQueryType.TIMELINE) {
+ return SCHEMA_TIMELINE;
+ }
+ return Lists.newArrayList();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 1420e03f375..d3b8529c3c1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -58,6 +58,7 @@ import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
+import org.apache.doris.datasource.hudi.source.HudiCachedMetaClientProcessor;
import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
@@ -82,6 +83,8 @@ import org.apache.doris.thrift.TBackendsMetadataParams;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
import org.apache.doris.thrift.TFetchSchemaTableDataResult;
+import org.apache.doris.thrift.THudiMetadataParams;
+import org.apache.doris.thrift.THudiQueryType;
import org.apache.doris.thrift.TIcebergMetadataParams;
import org.apache.doris.thrift.TIcebergQueryType;
import org.apache.doris.thrift.TJobsMetadataParams;
@@ -105,6 +108,9 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.iceberg.Snapshot;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -222,6 +228,9 @@ public class MetadataGenerator {
case ICEBERG:
result = icebergMetadataResult(params);
break;
+ case HUDI:
+ result = hudiMetadataResult(params);
+ break;
case BACKENDS:
result = backendsMetadataResult(params);
break;
@@ -376,6 +385,48 @@ public class MetadataGenerator {
return result;
}
+ private static TFetchSchemaTableDataResult
hudiMetadataResult(TMetadataTableRequestParams params) {
+ if (!params.isSetHudiMetadataParams()) {
+ return errorResult("Hudi metadata params is not set.");
+ }
+
+ THudiMetadataParams hudiMetadataParams =
params.getHudiMetadataParams();
+ THudiQueryType hudiQueryType = hudiMetadataParams.getHudiQueryType();
+ CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(hudiMetadataParams.getCatalog());
+ if (catalog == null) {
+ return errorResult("The specified catalog does not exist:" +
hudiMetadataParams.getCatalog());
+ }
+ HudiCachedMetaClientProcessor hudiMetadataCache =
Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getHudiMetadataCacheMgr().getHudiMetaClientProcessor(catalog);
+ String hudiBasePathString = ((HMSExternalCatalog) catalog).getClient()
+ .getTable(hudiMetadataParams.getDatabase(),
hudiMetadataParams.getTable()).getSd().getLocation();
+ Configuration conf = ((HMSExternalCatalog) catalog).getConfiguration();
+
+ List<TRow> dataBatch = Lists.newArrayList();
+ TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+
+ switch (hudiQueryType) {
+ case TIMELINE:
+ HoodieTimeline timeline =
hudiMetadataCache.getHoodieTableMetaClient(hudiMetadataParams.getDatabase(),
+ hudiMetadataParams.getTable(), hudiBasePathString,
conf).getActiveTimeline();
+ for (HoodieInstant instant : timeline.getInstants()) {
+ TRow trow = new TRow();
+ trow.addToColumnValue(new
TCell().setStringVal(instant.getTimestamp()));
+ trow.addToColumnValue(new
TCell().setStringVal(instant.getAction()));
+ trow.addToColumnValue(new
TCell().setStringVal(instant.getFileName()));
+ trow.addToColumnValue(new
TCell().setStringVal(instant.getState().name()));
+ trow.addToColumnValue(new
TCell().setStringVal(instant.getStateTransitionTime()));
+ dataBatch.add(trow);
+ }
+ break;
+ default:
+ return errorResult("Unsupported hudi inspect type: " +
hudiQueryType);
+ }
+ result.setDataBatch(dataBatch);
+ result.setStatus(new TStatus(TStatusCode.OK));
+ return result;
+ }
+
private static TFetchSchemaTableDataResult
backendsMetadataResult(TMetadataTableRequestParams params) {
if (!params.isSetBackendsMetadataParams()) {
return errorResult("backends metadata param is not set.");
@@ -1098,7 +1149,7 @@ public class MetadataGenerator {
}
private static void tableOptionsForInternalCatalog(UserIdentity
currentUserIdentity,
- CatalogIf catalog, DatabaseIf database, List<TableIf>
tables, List<TRow> dataBatch) {
+ CatalogIf catalog, DatabaseIf database, List<TableIf> tables,
List<TRow> dataBatch) {
for (TableIf table : tables) {
if (!(table instanceof OlapTable)) {
continue;
@@ -1151,7 +1202,7 @@ public class MetadataGenerator {
}
private static void tableOptionsForExternalCatalog(UserIdentity
currentUserIdentity,
- CatalogIf catalog, DatabaseIf database, List<TableIf>
tables, List<TRow> dataBatch) {
+ CatalogIf catalog, DatabaseIf database, List<TableIf> tables,
List<TRow> dataBatch) {
for (TableIf table : tables) {
if
(!Env.getCurrentEnv().getAccessManager().checkTblPriv(currentUserIdentity,
catalog.getName(),
database.getFullName(), table.getName(),
PrivPredicate.SHOW)) {
@@ -1193,7 +1244,7 @@ public class MetadataGenerator {
String clg = params.getCatalog();
CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(clg);
if (catalog == null) {
- // catalog is NULL let return empty to BE
+ // catalog is NULL let return empty to BE
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
@@ -1202,7 +1253,8 @@ public class MetadataGenerator {
if (database == null) {
// BE gets the database id list from FE and then invokes this
interface
// per database. there is a chance that in between database can be
dropped.
- // so need to handle database not exist case and return ok so that
BE continue the
+ // so need to handle database not exist case and return ok so that
BE continue
+ // the
// loop with next database.
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
@@ -1220,7 +1272,7 @@ public class MetadataGenerator {
}
private static void tablePropertiesForInternalCatalog(UserIdentity
currentUserIdentity,
- CatalogIf catalog, DatabaseIf database, List<TableIf> tables,
List<TRow> dataBatch) {
+ CatalogIf catalog, DatabaseIf database, List<TableIf> tables,
List<TRow> dataBatch) {
for (TableIf table : tables) {
if (!(table instanceof OlapTable)) {
continue;
@@ -1245,7 +1297,7 @@ public class MetadataGenerator {
continue;
}
- Map<String, String> propertiesMap = property.getProperties();
+ Map<String, String> propertiesMap = property.getProperties();
propertiesMap.forEach((key, value) -> {
TRow trow = new TRow();
trow.addToColumnValue(new
TCell().setStringVal(catalog.getName())); // TABLE_CATALOG
@@ -1268,7 +1320,8 @@ public class MetadataGenerator {
database.getFullName(), table.getName(),
PrivPredicate.SHOW)) {
continue;
}
- // Currently for external catalog, we put properties as empty, can
extend in future
+ // Currently for external catalog, we put properties as empty, can
extend in
+ // future
TRow trow = new TRow();
trow.addToColumnValue(new
TCell().setStringVal(catalog.getName())); // TABLE_CATALOG
trow.addToColumnValue(new
TCell().setStringVal(database.getFullName())); // TABLE_SCHEMA
@@ -1300,7 +1353,7 @@ public class MetadataGenerator {
List<TRow> dataBatch = Lists.newArrayList();
CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(clg);
if (catalog == null) {
- // catalog is NULL let return empty to BE
+ // catalog is NULL let return empty to BE
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
@@ -1309,8 +1362,8 @@ public class MetadataGenerator {
if (database == null) {
// BE gets the database id list from FE and then invokes this
interface
// per database. there is a chance that in between database can be
dropped.
- // so need to handle database not exist case and return ok so that
BE continue the
- // loop with next database.
+ // so need to handle database not exist case and return ok so that
BE continue
+ // the loop with next database.
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
@@ -1353,7 +1406,7 @@ public class MetadataGenerator {
}
private static void partitionsForInternalCatalog(UserIdentity
currentUserIdentity,
- CatalogIf catalog, DatabaseIf database, List<TableIf> tables,
List<TRow> dataBatch) {
+ CatalogIf catalog, DatabaseIf database, List<TableIf> tables,
List<TRow> dataBatch) {
for (TableIf table : tables) {
if (!(table instanceof OlapTable)) {
continue;
@@ -1379,8 +1432,8 @@ public class MetadataGenerator {
trow.addToColumnValue(new
TCell().setStringVal(partition.getName())); // PARTITION_NAME
trow.addToColumnValue(new TCell().setStringVal("NULL"));
// SUBPARTITION_NAME (always null)
- trow.addToColumnValue(new TCell().setIntVal(0));
//PARTITION_ORDINAL_POSITION (not available)
- trow.addToColumnValue(new TCell().setIntVal(0));
//SUBPARTITION_ORDINAL_POSITION (not available)
+ trow.addToColumnValue(new TCell().setIntVal(0)); //
PARTITION_ORDINAL_POSITION (not available)
+ trow.addToColumnValue(new TCell().setIntVal(0)); //
SUBPARTITION_ORDINAL_POSITION (not available)
trow.addToColumnValue(new TCell().setStringVal(
partitionInfo.getType().toString())); //
PARTITION_METHOD
trow.addToColumnValue(new TCell().setStringVal("NULL"));
// SUBPARTITION_METHOD(always null)
@@ -1399,17 +1452,17 @@ public class MetadataGenerator {
trow.addToColumnValue(new TCell().setStringVal(
item.getItemsSql())); // PARITION DESC
}
- trow.addToColumnValue(new
TCell().setLongVal(partition.getRowCount())); //TABLE_ROWS (PARTITION row)
- trow.addToColumnValue(new
TCell().setLongVal(partition.getAvgRowLength())); //AVG_ROW_LENGTH
- trow.addToColumnValue(new
TCell().setLongVal(partition.getDataLength())); //DATA_LENGTH
- trow.addToColumnValue(new TCell().setIntVal(0));
//MAX_DATA_LENGTH (not available)
- trow.addToColumnValue(new TCell().setIntVal(0));
//INDEX_LENGTH (not available)
- trow.addToColumnValue(new TCell().setIntVal(0));
//DATA_FREE (not available)
- trow.addToColumnValue(new TCell().setStringVal("NULL"));
//CREATE_TIME (not available)
+ trow.addToColumnValue(new
TCell().setLongVal(partition.getRowCount())); // TABLE_ROWS (PARTITION)
+ trow.addToColumnValue(new
TCell().setLongVal(partition.getAvgRowLength())); // AVG_ROW_LENGTH
+ trow.addToColumnValue(new
TCell().setLongVal(partition.getDataLength())); // DATA_LENGTH
+ trow.addToColumnValue(new TCell().setIntVal(0)); //
MAX_DATA_LENGTH (not available)
+ trow.addToColumnValue(new TCell().setIntVal(0)); //
INDEX_LENGTH (not available)
+ trow.addToColumnValue(new TCell().setIntVal(0)); //
DATA_FREE (not available)
+ trow.addToColumnValue(new TCell().setStringVal("NULL"));
// CREATE_TIME (not available)
trow.addToColumnValue(new TCell().setStringVal(
-
TimeUtils.longToTimeString(partition.getVisibleVersionTime()))); //UPDATE_TIME
+
TimeUtils.longToTimeString(partition.getVisibleVersionTime()))); // UPDATE_TIME
trow.addToColumnValue(new TCell().setStringVal("NULL"));
// CHECK_TIME (not available)
- trow.addToColumnValue(new TCell().setIntVal(0));
//CHECKSUM (not available)
+ trow.addToColumnValue(new TCell().setIntVal(0)); //
CHECKSUM (not available)
trow.addToColumnValue(new TCell().setStringVal("")); //
PARTITION_COMMENT (not available)
trow.addToColumnValue(new TCell().setStringVal("")); //
NODEGROUP (not available)
trow.addToColumnValue(new TCell().setStringVal("")); //
TABLESPACE_NAME (not available)
@@ -1510,7 +1563,7 @@ public class MetadataGenerator {
List<TRow> dataBatch = Lists.newArrayList();
CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(clg);
if (catalog == null) {
- // catalog is NULL let return empty to BE
+ // catalog is NULL let return empty to BE
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
@@ -1519,7 +1572,8 @@ public class MetadataGenerator {
if (database == null) {
// BE gets the database id list from FE and then invokes this
interface
// per database. there is a chance that in between database can be
dropped.
- // so need to handle database not exist case and return ok so that
BE continue the
+ // so need to handle database not exist case and return ok so that
BE continue
+ // the
// loop with next database.
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
@@ -1670,4 +1724,3 @@ public class MetadataGenerator {
}
}
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
index 7bd28f363e7..074f3d66c96 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -41,6 +41,8 @@ public abstract class MetadataTableValuedFunction extends
TableValuedFunctionIf
return
FrontendsDisksTableValuedFunction.getColumnIndexFromColumnName(columnName);
case ICEBERG:
return
IcebergTableValuedFunction.getColumnIndexFromColumnName(columnName);
+ case HUDI:
+ return
HudiTableValuedFunction.getColumnIndexFromColumnName(columnName);
case CATALOGS:
return
CatalogsTableValuedFunction.getColumnIndexFromColumnName(columnName);
case MATERIALIZED_VIEWS:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index eb323a76672..133c1885a84 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -44,7 +44,7 @@ public abstract class TableValuedFunctionIf {
// All table functions should be registered here
public static TableValuedFunctionIf getTableFunction(String funcName,
Map<String, String> params)
- throws
AnalysisException {
+ throws AnalysisException {
switch (funcName.toLowerCase()) {
case NumbersTableValuedFunction.NAME:
return new NumbersTableValuedFunction(params);
@@ -58,6 +58,8 @@ public abstract class TableValuedFunctionIf {
return new LocalTableValuedFunction(params);
case IcebergTableValuedFunction.NAME:
return new IcebergTableValuedFunction(params);
+ case HudiTableValuedFunction.NAME:
+ return new HudiTableValuedFunction(params);
case BackendsTableValuedFunction.NAME:
return new BackendsTableValuedFunction(params);
case FrontendsTableValuedFunction.NAME:
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 92205c5ae0f..19d91f0fded 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1037,6 +1037,7 @@ struct TMetadataTableRequestParams {
11: optional PlanNodes.TPartitionsMetadataParams partitions_metadata_params
12: optional PlanNodes.TMetaCacheStatsParams meta_cache_stats_params
13: optional PlanNodes.TPartitionValuesMetadataParams
partition_values_metadata_params
+ 14: optional PlanNodes.THudiMetadataParams hudi_metadata_params
}
struct TSchemaTableRequestParams {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index c92b1c91523..611c58d2bfd 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -541,6 +541,13 @@ struct TIcebergMetadataParams {
4: optional string table
}
+struct THudiMetadataParams {
+ 1: optional Types.THudiQueryType hudi_query_type
+ 2: optional string catalog
+ 3: optional string database
+ 4: optional string table
+}
+
struct TBackendsMetadataParams {
1: optional string cluster_name
}
@@ -601,6 +608,7 @@ struct TMetaScanRange {
9: optional TPartitionsMetadataParams partitions_params
10: optional TMetaCacheStatsParams meta_cache_stats_params
11: optional TPartitionValuesMetadataParams partition_values_params
+ 12: optional THudiMetadataParams hudi_params
}
// Specification of an individual data range which is held in its entirety
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index dbb385d0d1d..58716f51f3f 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -735,13 +735,18 @@ enum TMetadataType {
TASKS,
WORKLOAD_SCHED_POLICY,
PARTITIONS,
- PARTITION_VALUES;
+ PARTITION_VALUES,
+ HUDI,
}
enum TIcebergQueryType {
SNAPSHOTS
}
+enum THudiQueryType {
+ TIMELINE
+}
+
// represent a user identity
struct TUserIdentity {
1: optional string username
diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_meta.out
b/regression-test/data/external_table_p2/hudi/test_hudi_meta.out
new file mode 100644
index 00000000000..0c312efd3c0
Binary files /dev/null and
b/regression-test/data/external_table_p2/hudi/test_hudi_meta.out differ
diff --git
a/regression-test/suites/external_table_p2/hudi/test_hudi_meta.groovy
b/regression-test/suites/external_table_p2/hudi/test_hudi_meta.groovy
new file mode 100644
index 00000000000..0fe650aded9
--- /dev/null
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_meta.groovy
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hudi_meta",
"p2,external,hudi,external_remote,external_remote_hudi") {
+ String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable hudi test")
+ }
+
+ String catalog_name = "test_hudi_meta"
+ String props = context.config.otherConfigs.get("hudiEmrCatalog")
+ sql """drop catalog if exists ${catalog_name};"""
+ sql """
+ create catalog if not exists ${catalog_name} properties (
+ ${props}
+ );
+ """
+
+ sql """ switch ${catalog_name};"""
+ sql """ use regression_hudi;"""
+ sql """ set enable_fallback_to_original_planner=false """
+
+ qt_hudi_meta1 """ select * from
hudi_meta("table"="${catalog_name}.regression_hudi.user_activity_log_cow_non_partition",
"query_type" = "timeline"); """
+ qt_hudi_meta2 """ select * from
hudi_meta("table"="${catalog_name}.regression_hudi.user_activity_log_mor_non_partition",
"query_type" = "timeline"); """
+ qt_hudi_meta3 """ select * from
hudi_meta("table"="${catalog_name}.regression_hudi.user_activity_log_cow_partition",
"query_type" = "timeline"); """
+ qt_hudi_meta4 """ select * from
hudi_meta("table"="${catalog_name}.regression_hudi.user_activity_log_cow_partition",
"query_type" = "timeline"); """
+
+ qt_hudi_meta5 """ select * from
hudi_meta("table"="${catalog_name}.regression_hudi.timetravel_cow",
"query_type" = "timeline"); """
+ qt_hudi_meta6 """ select * from
hudi_meta("table"="${catalog_name}.regression_hudi.timetravel_mor",
"query_type" = "timeline"); """
+
+ sql """drop catalog if exists ${catalog_name};"""
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]