This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 389c702a3f957730c14503ac131e2f82563fc303 Author: wuwenchi <[email protected]> AuthorDate: Thu Feb 22 13:39:17 2024 +0800 [improvement](hudi)Obtain partition information through HMS's API (#30962) When a Hudi table is synchronized to HMS, the partition information is also synchronized, so even if the metastore is not enabled in the Hudi table (for example, if the Metastore is false for a Hudi table created with Flink), you can still obtain the partition information through the HMS API. --- docs/en/docs/lakehouse/multi-catalog/hudi.md | 6 +++++ docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md | 6 +++++ .../doris/datasource/hive/HMSExternalTable.java | 11 +++++++++ .../hudi/source/HudiCachedPartitionProcessor.java | 27 ++++++++++++++++++---- .../doris/datasource/hudi/source/HudiScanNode.java | 8 +++++-- 5 files changed, 52 insertions(+), 6 deletions(-) diff --git a/docs/en/docs/lakehouse/multi-catalog/hudi.md b/docs/en/docs/lakehouse/multi-catalog/hudi.md index 52892db2df2..a52c2370ced 100644 --- a/docs/en/docs/lakehouse/multi-catalog/hudi.md +++ b/docs/en/docs/lakehouse/multi-catalog/hudi.md @@ -55,6 +55,12 @@ CREATE CATALOG hudi PROPERTIES ( ); ``` +Optional configuration parameters: + +|name|description|default| +|---|---|---| +|use_hive_sync_partition|Use hms synchronized partition data|false| + ## Column Type Mapping Same as that in Hive Catalogs. See the relevant section in [Hive](./hive.md). diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md index b619283cacf..38bb26d3bc7 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md @@ -55,6 +55,12 @@ CREATE CATALOG hudi PROPERTIES ( ); ``` +可选配置参数: + +|参数名|说明|默认值| +|---|---|---| +|use_hive_sync_partition|使用hms已同步的分区数据|false| + ## 列类型映射 和 Hive Catalog 一致,可参阅 [Hive Catalog](./hive.md) 中 **列类型映射** 一节。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index c841c749fb2..62b2c35b8c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -104,6 +104,9 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI private static final String SPARK_STATS_MAX_LEN = ".avgLen"; private static final String SPARK_STATS_HISTOGRAM = ".histogram"; + private static final String USE_HIVE_SYNC_PARTITION = "use_hive_sync_partition"; + + static { SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet(); SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"); @@ -227,6 +230,14 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI return "org.apache.hudi.hadoop.HoodieParquetInputFormat".equals(inputFormatName); } + /** + * Some data lakes (such as Hudi) will synchronize their partition information to HMS, + * then we can quickly obtain the partition information of the table from HMS. + */ + public boolean useHiveSyncPartition() { + return Boolean.parseBoolean(catalog.getProperties().getOrDefault(USE_HIVE_SYNC_PARTITION, "false")); + } + /** * Now we only support three file input format hive tables: parquet/orc/text. * Support managed_table and external_table. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java index eb09c5efb59..90c89dbbda2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java @@ -21,6 +21,7 @@ import org.apache.doris.common.Config; import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.TablePartitionValues; import org.apache.doris.datasource.TablePartitionValues.TablePartitionKey; +import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; import com.google.common.base.Preconditions; @@ -31,6 +32,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.List; @@ -39,6 +42,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { + private static final Logger LOG = LoggerFactory.getLogger(HudiCachedPartitionProcessor.class); private final long catalogId; private final Executor executor; private final LoadingCache<TablePartitionKey, TablePartitionValues> partitionCache; @@ -81,7 +85,7 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { } public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable table, - HoodieTableMetaClient tableMetaClient, String timestamp) { + HoodieTableMetaClient tableMetaClient, String timestamp, boolean useHiveSyncPartition) { Preconditions.checkState(catalogId == table.getCatalog().getId()); Option<String[]> partitionColumns = tableMetaClient.getTableConfig().getPartitionFields(); if (!partitionColumns.isPresent()) { @@ -94,7 +98,7 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { } long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp()); if (Long.parseLong(timestamp) == lastTimestamp) { - return getPartitionValues(table, tableMetaClient); + return getPartitionValues(table, tableMetaClient, useHiveSyncPartition); } List<String> partitionNameAndValues = getPartitionNamesBeforeOrEquals(timeline, timestamp); List<String> partitionNames = Arrays.asList(partitionColumns.get()); @@ -105,7 +109,8 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { return partitionValues; } - public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTableMetaClient tableMetaClient) + public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTableMetaClient tableMetaClient, + boolean useHiveSyncPartition) throws CacheException { Preconditions.checkState(catalogId == table.getCatalog().getId()); Option<String[]> partitionColumns = tableMetaClient.getTableConfig().getPartitionFields(); @@ -137,7 +142,21 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { if (lastTimestamp <= lastUpdateTimestamp) { return partitionValues; } - List<String> partitionNames = getAllPartitionNames(tableMetaClient); + HMSExternalCatalog catalog = (HMSExternalCatalog) table.getCatalog(); + List<String> partitionNames; + if (useHiveSyncPartition) { + // When a Hudi table is synchronized to HMS, the partition information is also synchronized, + // so even if the metastore is not enabled in the Hudi table + // (for example, if the Metastore is false for a Hudi table created with Flink), + // we can still obtain the partition information through the HMS API. + partitionNames = catalog.getClient().listPartitionNames(table.getDbName(), table.getName()); + if (partitionNames.size() == 0) { + LOG.warn("Failed to get partitions from hms api, switch it from hudi api."); + partitionNames = getAllPartitionNames(tableMetaClient); + } + } else { + partitionNames = getAllPartitionNames(tableMetaClient); + } List<String> partitionColumnsList = Arrays.asList(partitionColumns.get()); partitionValues.cleanPartitions(); partitionValues.addPartitions(partitionNames, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 144d308140e..4f4b1c3e8ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -83,6 +83,8 @@ public class HudiScanNode extends HiveScanNode { private final AtomicLong noLogsSplitNum = new AtomicLong(0); + private final boolean useHiveSyncPartition; + /** * External file scan node for Query Hudi table * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv @@ -102,6 +104,7 @@ public class HudiScanNode extends HiveScanNode { LOG.debug("Hudi table {} is a mor table, and will use JNI to read data in BE", hmsTable.getName()); } } + useHiveSyncPartition = hmsTable.useHiveSyncPartition(); } @Override @@ -171,9 +174,10 @@ public class HudiScanNode extends HiveScanNode { .getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog()); TablePartitionValues partitionValues; if (snapshotTimestamp.isPresent()) { - partitionValues = processor.getSnapshotPartitionValues(hmsTable, metaClient, snapshotTimestamp.get()); + partitionValues = processor.getSnapshotPartitionValues( + hmsTable, metaClient, snapshotTimestamp.get(), useHiveSyncPartition); } else { - partitionValues = processor.getPartitionValues(hmsTable, metaClient); + partitionValues = processor.getPartitionValues(hmsTable, metaClient, useHiveSyncPartition); } if (partitionValues != null) { // 2. prune partitions by expr --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
