This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1-lakehouse in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4c313b076833d16d236430b92259cc58291c046e Author: Socrates <[email protected]> AuthorDate: Fri Jan 3 17:07:55 2025 +0800 [multi-catalog](hudi) impl hudi_metadata table value function (#46137) ### What problem does this PR solve? ### Release note impl `hudi_meta` tvf to query hudi table metadata ```sql select * from hudi_meta("table"="hive_krb.regression_hudi.timetravel_cow","query_type" = "timeline"); +-------------------+--------+--------------------------+-----------+-----------------------+ | timestamp | action | file_name | state | state_transition_time | +-------------------+--------+--------------------------+-----------+-----------------------+ | 20240724195843565 | commit | 20240724195843565.commit | COMPLETED | 20240724195844269 | | 20240724195845718 | commit | 20240724195845718.commit | COMPLETED | 20240724195846653 | | 20240724195848377 | commit | 20240724195848377.commit | COMPLETED | 20240724195849337 | | 20240724195850799 | commit | 20240724195850799.commit | COMPLETED | 20240724195851676 | +-------------------+--------+--------------------------+-----------+-----------------------+ 4 rows in set (0.03 sec) ``` see doc https://github.com/apache/doris-website/pull/1673 --- be/src/vec/exec/scan/vmeta_scanner.cpp | 23 +++ be/src/vec/exec/scan/vmeta_scanner.h | 2 + .../doris/catalog/BuiltinTableValuedFunctions.java | 2 + .../hudi/source/HudiCachedMetaClientProcessor.java | 21 ++- .../hudi/source/HudiMetadataCacheMgr.java | 10 +- .../expressions/functions/table/HudiMeta.java | 56 ++++++++ .../visitor/TableValuedFunctionVisitor.java | 5 + .../tablefunction/HudiTableValuedFunction.java | 156 +++++++++++++++++++++ .../doris/tablefunction/MetadataGenerator.java | 105 ++++++++++---- .../tablefunction/MetadataTableValuedFunction.java | 2 + .../doris/tablefunction/TableValuedFunctionIf.java | 4 +- gensrc/thrift/FrontendService.thrift | 1 + gensrc/thrift/PlanNodes.thrift | 8 ++ gensrc/thrift/Types.thrift | 7 +- .../data/external_table_p2/hudi/test_hudi_meta.out | Bin 0 -> 4093 bytes .../external_table_p2/hudi/test_hudi_meta.groovy | 46 ++++++ 16 files changed, 404 insertions(+), 44 deletions(-) diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp b/be/src/vec/exec/scan/vmeta_scanner.cpp index 40826f308ff..03917b65bd9 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.cpp +++ b/be/src/vec/exec/scan/vmeta_scanner.cpp @@ -245,6 +245,9 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { case TMetadataType::ICEBERG: RETURN_IF_ERROR(_build_iceberg_metadata_request(meta_scan_range, &request)); break; + case TMetadataType::HUDI: + RETURN_IF_ERROR(_build_hudi_metadata_request(meta_scan_range, &request)); + break; case TMetadataType::BACKENDS: RETURN_IF_ERROR(_build_backends_metadata_request(meta_scan_range, &request)); break; @@ -327,6 +330,26 @@ Status VMetaScanner::_build_iceberg_metadata_request(const TMetaScanRange& meta_ return Status::OK(); } +Status VMetaScanner::_build_hudi_metadata_request(const TMetaScanRange& meta_scan_range, + TFetchSchemaTableDataRequest* request) { + VLOG_CRITICAL << "VMetaScanner::_build_hudi_metadata_request"; + if (!meta_scan_range.__isset.hudi_params) { + return Status::InternalError("Can not find THudiMetadataParams from meta_scan_range."); + } + + // create request + request->__set_cluster_name(""); + request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); + + // create TMetadataTableRequestParams + TMetadataTableRequestParams metadata_table_params; + metadata_table_params.__set_metadata_type(TMetadataType::HUDI); + metadata_table_params.__set_hudi_metadata_params(meta_scan_range.hudi_params); + + request->__set_metada_table_params(metadata_table_params); + return Status::OK(); +} + Status VMetaScanner::_build_backends_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) { VLOG_CRITICAL << "VMetaScanner::_build_backends_metadata_request"; diff --git a/be/src/vec/exec/scan/vmeta_scanner.h b/be/src/vec/exec/scan/vmeta_scanner.h index 600624ee636..bb023f5b69c 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.h +++ b/be/src/vec/exec/scan/vmeta_scanner.h @@ -71,6 +71,8 @@ private: Status _fetch_metadata(const TMetaScanRange& meta_scan_range); Status _build_iceberg_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request); + Status _build_hudi_metadata_request(const TMetaScanRange& meta_scan_range, + TFetchSchemaTableDataRequest* request); Status _build_backends_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request); Status _build_frontends_metadata_request(const TMetaScanRange& meta_scan_range, diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java index 88c98162093..6b691c9526f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java @@ -24,6 +24,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks import org.apache.doris.nereids.trees.expressions.functions.table.GroupCommit; import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs; import org.apache.doris.nereids.trees.expressions.functions.table.HttpStream; +import org.apache.doris.nereids.trees.expressions.functions.table.HudiMeta; import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta; import org.apache.doris.nereids.trees.expressions.functions.table.Jobs; import org.apache.doris.nereids.trees.expressions.functions.table.Local; @@ -51,6 +52,7 @@ public class BuiltinTableValuedFunctions implements FunctionHelper { tableValued(FrontendsDisks.class, "frontends_disks"), tableValued(GroupCommit.class, "group_commit"), tableValued(Local.class, "local"), + tableValued(HudiMeta.class, "hudi_meta"), tableValued(IcebergMeta.class, "iceberg_meta"), tableValued(Hdfs.class, "hdfs"), tableValued(HttpStream.class, "http_stream"), diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java index 07726a54ffe..9ed1007e804 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java @@ -47,23 +47,22 @@ public class HudiCachedMetaClientProcessor { true, null); - this.hudiTableMetaClientCache = - partitionCacheFactory.buildCache( - this::createHoodieTableMetaClient, - null, - executor); + this.hudiTableMetaClientCache = partitionCacheFactory.buildCache( + this::createHoodieTableMetaClient, + null, + executor); } private HoodieTableMetaClient createHoodieTableMetaClient(HudiCachedClientKey key) { LOG.debug("create hudi table meta client for {}.{}", key.getDbName(), key.getTbName()); HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(key.getConf()); return HiveMetaStoreClientHelper.ugiDoAs( - key.getConf(), - () -> HoodieTableMetaClient - .builder() - .setConf(hadoopStorageConfiguration) - .setBasePath(key.getHudiBasePath()) - .build()); + key.getConf(), + () -> HoodieTableMetaClient + .builder() + .setConf(hadoopStorageConfiguration) + .setBasePath(key.getHudiBasePath()) + .build()); } public HoodieTableMetaClient getHoodieTableMetaClient( diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java index 4ede5c73cfa..8d921a2f3d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java @@ -17,7 +17,7 @@ package org.apache.doris.datasource.hudi.source; -import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.hive.HMSExternalCatalog; import com.google.common.collect.Maps; @@ -36,7 +36,7 @@ public class HudiMetadataCacheMgr { this.executor = executor; } - public HudiPartitionProcessor getPartitionProcessor(ExternalCatalog catalog) { + public HudiPartitionProcessor getPartitionProcessor(CatalogIf catalog) { return partitionProcessors.computeIfAbsent(catalog.getId(), catalogId -> { if (catalog instanceof HMSExternalCatalog) { return new HudiCachedPartitionProcessor(catalogId, executor); @@ -46,7 +46,7 @@ public class HudiMetadataCacheMgr { }); } - public HudiCachedFsViewProcessor getFsViewProcessor(ExternalCatalog catalog) { + public HudiCachedFsViewProcessor getFsViewProcessor(CatalogIf catalog) { return fsViewProcessors.computeIfAbsent(catalog.getId(), catalogId -> { if (catalog instanceof HMSExternalCatalog) { return new HudiCachedFsViewProcessor(executor); @@ -56,7 +56,7 @@ public class HudiMetadataCacheMgr { }); } - public HudiCachedMetaClientProcessor getHudiMetaClientProcessor(ExternalCatalog catalog) { + public HudiCachedMetaClientProcessor getHudiMetaClientProcessor(CatalogIf catalog) { return metaClientProcessors.computeIfAbsent(catalog.getId(), catalogId -> { if (catalog instanceof HMSExternalCatalog) { return new HudiCachedMetaClientProcessor(executor); @@ -126,7 +126,7 @@ public class HudiMetadataCacheMgr { } } - public Map<String, Map<String, String>> getCacheStats(ExternalCatalog catalog) { + public Map<String, Map<String, String>> getCacheStats(CatalogIf catalog) { Map<String, Map<String, String>> res = Maps.newHashMap(); HudiCachedPartitionProcessor partitionProcessor = (HudiCachedPartitionProcessor) getPartitionProcessor(catalog); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/HudiMeta.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/HudiMeta.java new file mode 100644 index 00000000000..2d62252fd2d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/HudiMeta.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.HudiTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** hudi_meta */ +public class HudiMeta extends TableValuedFunction { + public HudiMeta(Properties properties) { + super("hudi_meta", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map<String, String> arguments = getTVFProperties().getMap(); + return new HudiTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build HudiTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) { + return visitor.visitHudiMeta(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index 0b4b57e11dc..dfb59f0be2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -24,6 +24,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks import org.apache.doris.nereids.trees.expressions.functions.table.GroupCommit; import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs; import org.apache.doris.nereids.trees.expressions.functions.table.HttpStream; +import org.apache.doris.nereids.trees.expressions.functions.table.HudiMeta; import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta; import org.apache.doris.nereids.trees.expressions.functions.table.Jobs; import org.apache.doris.nereids.trees.expressions.functions.table.Local; @@ -88,6 +89,10 @@ public interface TableValuedFunctionVisitor<R, C> { return visitTableValuedFunction(httpStream, context); } + default R visitHudiMeta(HudiMeta hudiMeta, C context) { + return visitTableValuedFunction(hudiMeta, context); + } + default R visitIcebergMeta(IcebergMeta icebergMeta, C context) { return visitTableValuedFunction(icebergMeta, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java new file mode 100644 index 00000000000..87791df380a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.TableName; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.THudiMetadataParams; +import org.apache.doris.thrift.THudiQueryType; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import java.util.List; +import java.util.Map; + +/** + * The Implement of table valued function + * hudi_meta("table" = "ctl.db.tbl", "query_type" = "timeline"). + */ +public class HudiTableValuedFunction extends MetadataTableValuedFunction { + + public static final String NAME = "hudi_meta"; + private static final String TABLE = "table"; + private static final String QUERY_TYPE = "query_type"; + + private static final ImmutableSet<String> PROPERTIES_SET = ImmutableSet.of(TABLE, QUERY_TYPE); + + private static final ImmutableList<Column> SCHEMA_TIMELINE = ImmutableList.of( + new Column("timestamp", PrimitiveType.STRING, false), + new Column("action", PrimitiveType.STRING, false), + new Column("file_name", PrimitiveType.STRING, false), + new Column("state", PrimitiveType.STRING, false), + new Column("state_transition_time", PrimitiveType.STRING, false)); + + private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX; + + static { + ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder(); + for (int i = 0; i < SCHEMA_TIMELINE.size(); i++) { + builder.put(SCHEMA_TIMELINE.get(i).getName().toLowerCase(), i); + } + COLUMN_TO_INDEX = builder.build(); + } + + public static Integer getColumnIndexFromColumnName(String columnName) { + return COLUMN_TO_INDEX.get(columnName.toLowerCase()); + } + + private THudiQueryType queryType; + + // here tableName represents the name of a table in Hudi. + private final TableName hudiTableName; + + public HudiTableValuedFunction(Map<String, String> params) throws AnalysisException { + Map<String, String> validParams = Maps.newHashMap(); + for (String key : params.keySet()) { + if (!PROPERTIES_SET.contains(key.toLowerCase())) { + throw new AnalysisException("'" + key + "' is invalid property"); + } + // check ctl, db, tbl + validParams.put(key.toLowerCase(), params.get(key)); + } + String tableName = validParams.get(TABLE); + String queryTypeString = validParams.get(QUERY_TYPE); + if (tableName == null || queryTypeString == null) { + throw new AnalysisException("Invalid hudi metadata query"); + } + String[] names = tableName.split("\\."); + if (names.length != 3) { + throw new AnalysisException("The hudi table name contains the catalogName, databaseName, and tableName"); + } + this.hudiTableName = new TableName(names[0], names[1], names[2]); + // check auth + if (!Env.getCurrentEnv().getAccessManager() + .checkTblPriv(ConnectContext.get(), this.hudiTableName, PrivPredicate.SELECT)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "SELECT", + ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), + this.hudiTableName.getDb() + ": " + this.hudiTableName.getTbl()); + } + try { + this.queryType = THudiQueryType.valueOf(queryTypeString.toUpperCase()); + } catch (IllegalArgumentException e) { + throw new AnalysisException("Unsupported hudi metadata query type: " + queryType); + } + } + + public THudiQueryType getHudiQueryType() { + return queryType; + } + + @Override + public TMetadataType getMetadataType() { + return TMetadataType.HUDI; + } + + @Override + public TMetaScanRange getMetaScanRange() { + TMetaScanRange metaScanRange = new TMetaScanRange(); + metaScanRange.setMetadataType(TMetadataType.HUDI); + // set hudi metadata params + THudiMetadataParams hudiMetadataParams = new THudiMetadataParams(); + hudiMetadataParams.setHudiQueryType(queryType); + hudiMetadataParams.setCatalog(hudiTableName.getCtl()); + hudiMetadataParams.setDatabase(hudiTableName.getDb()); + hudiMetadataParams.setTable(hudiTableName.getTbl()); + metaScanRange.setHudiParams(hudiMetadataParams); + return metaScanRange; + } + + @Override + public String getTableName() { + return "HudiMetadataTableValuedFunction"; + } + + /** + * The tvf can register columns of metadata table + * The data is provided by getHudiMetadataTable in FrontendService + * + * @return metadata columns + * @see org.apache.doris.service.FrontendServiceImpl + */ + @Override + public List<Column> getTableColumns() { + if (queryType == THudiQueryType.TIMELINE) { + return SCHEMA_TIMELINE; + } + return Lists.newArrayList(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index c56dab51af9..d1c09f702b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -56,6 +56,7 @@ import org.apache.doris.datasource.TablePartitionValues; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HiveMetaStoreCache; +import org.apache.doris.datasource.hudi.source.HudiCachedMetaClientProcessor; import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergMetadataCache; @@ -82,6 +83,8 @@ import org.apache.doris.thrift.TBackendsMetadataParams; import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TFetchSchemaTableDataRequest; import org.apache.doris.thrift.TFetchSchemaTableDataResult; +import org.apache.doris.thrift.THudiMetadataParams; +import org.apache.doris.thrift.THudiQueryType; import org.apache.doris.thrift.TIcebergMetadataParams; import org.apache.doris.thrift.TIcebergQueryType; import org.apache.doris.thrift.TJobsMetadataParams; @@ -104,6 +107,9 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.gson.Gson; +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.iceberg.Snapshot; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -222,6 +228,9 @@ public class MetadataGenerator { case ICEBERG: result = icebergMetadataResult(params); break; + case HUDI: + result = hudiMetadataResult(params); + break; case BACKENDS: result = backendsMetadataResult(params); break; @@ -376,6 +385,48 @@ public class MetadataGenerator { return result; } + private static TFetchSchemaTableDataResult hudiMetadataResult(TMetadataTableRequestParams params) { + if (!params.isSetHudiMetadataParams()) { + return errorResult("Hudi metadata params is not set."); + } + + THudiMetadataParams hudiMetadataParams = params.getHudiMetadataParams(); + THudiQueryType hudiQueryType = hudiMetadataParams.getHudiQueryType(); + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(hudiMetadataParams.getCatalog()); + if (catalog == null) { + return errorResult("The specified catalog does not exist:" + hudiMetadataParams.getCatalog()); + } + HudiCachedMetaClientProcessor hudiMetadataCache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getHudiMetadataCacheMgr().getHudiMetaClientProcessor(catalog); + String hudiBasePathString = ((HMSExternalCatalog) catalog).getClient() + .getTable(hudiMetadataParams.getDatabase(), hudiMetadataParams.getTable()).getSd().getLocation(); + Configuration conf = ((HMSExternalCatalog) catalog).getConfiguration(); + + List<TRow> dataBatch = Lists.newArrayList(); + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + + switch (hudiQueryType) { + case TIMELINE: + HoodieTimeline timeline = hudiMetadataCache.getHoodieTableMetaClient(hudiMetadataParams.getDatabase(), + hudiMetadataParams.getTable(), hudiBasePathString, conf).getActiveTimeline(); + for (HoodieInstant instant : timeline.getInstants()) { + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setStringVal(instant.getTimestamp())); + trow.addToColumnValue(new TCell().setStringVal(instant.getAction())); + trow.addToColumnValue(new TCell().setStringVal(instant.getFileName())); + trow.addToColumnValue(new TCell().setStringVal(instant.getState().name())); + trow.addToColumnValue(new TCell().setStringVal(instant.getStateTransitionTime())); + dataBatch.add(trow); + } + break; + default: + return errorResult("Unsupported hudi inspect type: " + hudiQueryType); + } + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } + private static TFetchSchemaTableDataResult backendsMetadataResult(TMetadataTableRequestParams params) { if (!params.isSetBackendsMetadataParams()) { return errorResult("backends metadata param is not set."); @@ -1114,7 +1165,7 @@ public class MetadataGenerator { } private static void tableOptionsForInternalCatalog(UserIdentity currentUserIdentity, - CatalogIf catalog, DatabaseIf database, List<TableIf> tables, List<TRow> dataBatch) { + CatalogIf catalog, DatabaseIf database, List<TableIf> tables, List<TRow> dataBatch) { for (TableIf table : tables) { if (!(table instanceof OlapTable)) { continue; @@ -1167,7 +1218,7 @@ public class MetadataGenerator { } private static void tableOptionsForExternalCatalog(UserIdentity currentUserIdentity, - CatalogIf catalog, DatabaseIf database, List<TableIf> tables, List<TRow> dataBatch) { + CatalogIf catalog, DatabaseIf database, List<TableIf> tables, List<TRow> dataBatch) { for (TableIf table : tables) { if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(currentUserIdentity, catalog.getName(), database.getFullName(), table.getName(), PrivPredicate.SHOW)) { @@ -1209,7 +1260,7 @@ public class MetadataGenerator { String clg = params.getCatalog(); CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(clg); if (catalog == null) { - // catalog is NULL let return empty to BE + // catalog is NULL let return empty to BE result.setDataBatch(dataBatch); result.setStatus(new TStatus(TStatusCode.OK)); return result; @@ -1218,7 +1269,8 @@ public class MetadataGenerator { if (database == null) { // BE gets the database id list from FE and then invokes this interface // per database. there is a chance that in between database can be dropped. - // so need to handle database not exist case and return ok so that BE continue the + // so need to handle database not exist case and return ok so that BE continue + // the // loop with next database. result.setDataBatch(dataBatch); result.setStatus(new TStatus(TStatusCode.OK)); @@ -1236,7 +1288,7 @@ public class MetadataGenerator { } private static void tablePropertiesForInternalCatalog(UserIdentity currentUserIdentity, - CatalogIf catalog, DatabaseIf database, List<TableIf> tables, List<TRow> dataBatch) { + CatalogIf catalog, DatabaseIf database, List<TableIf> tables, List<TRow> dataBatch) { for (TableIf table : tables) { if (!(table instanceof OlapTable)) { continue; @@ -1261,7 +1313,7 @@ public class MetadataGenerator { continue; } - Map<String, String> propertiesMap = property.getProperties(); + Map<String, String> propertiesMap = property.getProperties(); propertiesMap.forEach((key, value) -> { TRow trow = new TRow(); trow.addToColumnValue(new TCell().setStringVal(catalog.getName())); // TABLE_CATALOG @@ -1284,7 +1336,8 @@ public class MetadataGenerator { database.getFullName(), table.getName(), PrivPredicate.SHOW)) { continue; } - // Currently for external catalog, we put properties as empty, can extend in future + // Currently for external catalog, we put properties as empty, can extend in + // future TRow trow = new TRow(); trow.addToColumnValue(new TCell().setStringVal(catalog.getName())); // TABLE_CATALOG trow.addToColumnValue(new TCell().setStringVal(database.getFullName())); // TABLE_SCHEMA @@ -1316,7 +1369,7 @@ public class MetadataGenerator { List<TRow> dataBatch = Lists.newArrayList(); CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(clg); if (catalog == null) { - // catalog is NULL let return empty to BE + // catalog is NULL let return empty to BE result.setDataBatch(dataBatch); result.setStatus(new TStatus(TStatusCode.OK)); return result; @@ -1325,8 +1378,8 @@ public class MetadataGenerator { if (database == null) { // BE gets the database id list from FE and then invokes this interface // per database. there is a chance that in between database can be dropped. - // so need to handle database not exist case and return ok so that BE continue the - // loop with next database. + // so need to handle database not exist case and return ok so that BE continue + // the loop with next database. result.setDataBatch(dataBatch); result.setStatus(new TStatus(TStatusCode.OK)); return result; @@ -1369,7 +1422,7 @@ public class MetadataGenerator { } private static void partitionsForInternalCatalog(UserIdentity currentUserIdentity, - CatalogIf catalog, DatabaseIf database, List<TableIf> tables, List<TRow> dataBatch) { + CatalogIf catalog, DatabaseIf database, List<TableIf> tables, List<TRow> dataBatch) { for (TableIf table : tables) { if (!(table instanceof OlapTable)) { continue; @@ -1392,8 +1445,8 @@ public class MetadataGenerator { trow.addToColumnValue(new TCell().setStringVal(partition.getName())); // PARTITION_NAME trow.addToColumnValue(new TCell().setStringVal("NULL")); // SUBPARTITION_NAME (always null) - trow.addToColumnValue(new TCell().setIntVal(0)); //PARTITION_ORDINAL_POSITION (not available) - trow.addToColumnValue(new TCell().setIntVal(0)); //SUBPARTITION_ORDINAL_POSITION (not available) + trow.addToColumnValue(new TCell().setIntVal(0)); // PARTITION_ORDINAL_POSITION (not available) + trow.addToColumnValue(new TCell().setIntVal(0)); // SUBPARTITION_ORDINAL_POSITION (not available) trow.addToColumnValue(new TCell().setStringVal( olapTable.getPartitionInfo().getType().toString())); // PARTITION_METHOD trow.addToColumnValue(new TCell().setStringVal("NULL")); // SUBPARTITION_METHOD(always null) @@ -1406,23 +1459,23 @@ public class MetadataGenerator { } else { trow.addToColumnValue(new TCell().setStringVal( olapTable.getPartitionInfo() - .getDisplayPartitionColumns().toString())); // PARTITION_EXPRESSION + .getDisplayPartitionColumns().toString())); // PARTITION_EXPRESSION trow.addToColumnValue( new TCell().setStringVal("NULL")); // SUBPARTITION_EXPRESSION (always null) trow.addToColumnValue(new TCell().setStringVal( item.getItemsSql())); // PARITION DESC } - trow.addToColumnValue(new TCell().setLongVal(partition.getRowCount())); //TABLE_ROWS (PARTITION row) - trow.addToColumnValue(new TCell().setLongVal(partition.getAvgRowLength())); //AVG_ROW_LENGTH - trow.addToColumnValue(new TCell().setLongVal(partition.getDataLength())); //DATA_LENGTH - trow.addToColumnValue(new TCell().setIntVal(0)); //MAX_DATA_LENGTH (not available) - trow.addToColumnValue(new TCell().setIntVal(0)); //INDEX_LENGTH (not available) - trow.addToColumnValue(new TCell().setIntVal(0)); //DATA_FREE (not available) - trow.addToColumnValue(new TCell().setStringVal("NULL")); //CREATE_TIME (not available) + trow.addToColumnValue(new TCell().setLongVal(partition.getRowCount())); // TABLE_ROWS (PARTITION) + trow.addToColumnValue(new TCell().setLongVal(partition.getAvgRowLength())); // AVG_ROW_LENGTH + trow.addToColumnValue(new TCell().setLongVal(partition.getDataLength())); // DATA_LENGTH + trow.addToColumnValue(new TCell().setIntVal(0)); // MAX_DATA_LENGTH (not available) + trow.addToColumnValue(new TCell().setIntVal(0)); // INDEX_LENGTH (not available) + trow.addToColumnValue(new TCell().setIntVal(0)); // DATA_FREE (not available) + trow.addToColumnValue(new TCell().setStringVal("NULL")); // CREATE_TIME (not available) trow.addToColumnValue(new TCell().setStringVal( - TimeUtils.longToTimeString(partition.getVisibleVersionTime()))); //UPDATE_TIME + TimeUtils.longToTimeString(partition.getVisibleVersionTime()))); // UPDATE_TIME trow.addToColumnValue(new TCell().setStringVal("NULL")); // CHECK_TIME (not available) - trow.addToColumnValue(new TCell().setIntVal(0)); //CHECKSUM (not available) + trow.addToColumnValue(new TCell().setIntVal(0)); // CHECKSUM (not available) trow.addToColumnValue(new TCell().setStringVal("")); // PARTITION_COMMENT (not available) trow.addToColumnValue(new TCell().setStringVal("")); // NODEGROUP (not available) trow.addToColumnValue(new TCell().setStringVal("")); // TABLESPACE_NAME (not available) @@ -1466,7 +1519,7 @@ public class MetadataGenerator { List<TRow> dataBatch = Lists.newArrayList(); CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(clg); if (catalog == null) { - // catalog is NULL let return empty to BE + // catalog is NULL let return empty to BE result.setDataBatch(dataBatch); result.setStatus(new TStatus(TStatusCode.OK)); return result; @@ -1475,7 +1528,8 @@ public class MetadataGenerator { if (database == null) { // BE gets the database id list from FE and then invokes this interface // per database. there is a chance that in between database can be dropped. - // so need to handle database not exist case and return ok so that BE continue the + // so need to handle database not exist case and return ok so that BE continue + // the // loop with next database. result.setDataBatch(dataBatch); result.setStatus(new TStatus(TStatusCode.OK)); @@ -1626,4 +1680,3 @@ public class MetadataGenerator { } } - diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java index 7bd28f363e7..074f3d66c96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java @@ -41,6 +41,8 @@ public abstract class MetadataTableValuedFunction extends TableValuedFunctionIf return FrontendsDisksTableValuedFunction.getColumnIndexFromColumnName(columnName); case ICEBERG: return IcebergTableValuedFunction.getColumnIndexFromColumnName(columnName); + case HUDI: + return HudiTableValuedFunction.getColumnIndexFromColumnName(columnName); case CATALOGS: return CatalogsTableValuedFunction.getColumnIndexFromColumnName(columnName); case MATERIALIZED_VIEWS: diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index eb323a76672..133c1885a84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -44,7 +44,7 @@ public abstract class TableValuedFunctionIf { // All table functions should be registered here public static TableValuedFunctionIf getTableFunction(String funcName, Map<String, String> params) - throws AnalysisException { + throws AnalysisException { switch (funcName.toLowerCase()) { case NumbersTableValuedFunction.NAME: return new NumbersTableValuedFunction(params); @@ -58,6 +58,8 @@ public abstract class TableValuedFunctionIf { return new LocalTableValuedFunction(params); case IcebergTableValuedFunction.NAME: return new IcebergTableValuedFunction(params); + case HudiTableValuedFunction.NAME: + return new HudiTableValuedFunction(params); case BackendsTableValuedFunction.NAME: return new BackendsTableValuedFunction(params); case FrontendsTableValuedFunction.NAME: diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 2b753a45ac1..858f4da4fe6 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -988,6 +988,7 @@ struct TMetadataTableRequestParams { 11: optional PlanNodes.TPartitionsMetadataParams partitions_metadata_params 12: optional PlanNodes.TMetaCacheStatsParams meta_cache_stats_params 13: optional PlanNodes.TPartitionValuesMetadataParams partition_values_metadata_params + 14: optional PlanNodes.THudiMetadataParams hudi_metadata_params } struct TSchemaTableRequestParams { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 7ccb12b3331..4eb29c40ff6 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -511,6 +511,13 @@ struct TIcebergMetadataParams { 4: optional string table } +struct THudiMetadataParams { + 1: optional Types.THudiQueryType hudi_query_type + 2: optional string catalog + 3: optional string database + 4: optional string table +} + struct TBackendsMetadataParams { 1: optional string cluster_name } @@ -571,6 +578,7 @@ struct TMetaScanRange { 9: optional TPartitionsMetadataParams partitions_params 10: optional TMetaCacheStatsParams meta_cache_stats_params 11: optional TPartitionValuesMetadataParams partition_values_params + 12: optional THudiMetadataParams hudi_params } // Specification of an individual data range which is held in its entirety diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 8639a1a1476..a31a1995d34 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -717,13 +717,18 @@ enum TMetadataType { TASKS, WORKLOAD_SCHED_POLICY, PARTITIONS, - PARTITION_VALUES; + PARTITION_VALUES, + HUDI, } enum TIcebergQueryType { SNAPSHOTS } +enum THudiQueryType { + TIMELINE +} + // represent a user identity struct TUserIdentity { 1: optional string username diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_meta.out b/regression-test/data/external_table_p2/hudi/test_hudi_meta.out new file mode 100644 index 00000000000..0c312efd3c0 Binary files /dev/null and b/regression-test/data/external_table_p2/hudi/test_hudi_meta.out differ diff --git a/regression-test/suites/external_table_p2/hudi/test_hudi_meta.groovy b/regression-test/suites/external_table_p2/hudi/test_hudi_meta.groovy new file mode 100644 index 00000000000..0fe650aded9 --- /dev/null +++ b/regression-test/suites/external_table_p2/hudi/test_hudi_meta.groovy @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hudi_meta", "p2,external,hudi,external_remote,external_remote_hudi") { + String enabled = context.config.otherConfigs.get("enableExternalHudiTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable hudi test") + } + + String catalog_name = "test_hudi_meta" + String props = context.config.otherConfigs.get("hudiEmrCatalog") + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + ${props} + ); + """ + + sql """ switch ${catalog_name};""" + sql """ use regression_hudi;""" + sql """ set enable_fallback_to_original_planner=false """ + + qt_hudi_meta1 """ select * from hudi_meta("table"="${catalog_name}.regression_hudi.user_activity_log_cow_non_partition", "query_type" = "timeline"); """ + qt_hudi_meta2 """ select * from hudi_meta("table"="${catalog_name}.regression_hudi.user_activity_log_mor_non_partition", "query_type" = "timeline"); """ + qt_hudi_meta3 """ select * from hudi_meta("table"="${catalog_name}.regression_hudi.user_activity_log_cow_partition", "query_type" = "timeline"); """ + qt_hudi_meta4 """ select * from hudi_meta("table"="${catalog_name}.regression_hudi.user_activity_log_cow_partition", "query_type" = "timeline"); """ + + qt_hudi_meta5 """ select * from hudi_meta("table"="${catalog_name}.regression_hudi.timetravel_cow", "query_type" = "timeline"); """ + qt_hudi_meta6 """ select * from hudi_meta("table"="${catalog_name}.regression_hudi.timetravel_mor", "query_type" = "timeline"); """ + + sql """drop catalog if exists ${catalog_name};""" +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
