This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 389c702a3f957730c14503ac131e2f82563fc303
Author: wuwenchi <[email protected]>
AuthorDate: Thu Feb 22 13:39:17 2024 +0800

    [improvement](hudi)Obtain partition information through HMS's API (#30962)
    
    When a Hudi table is synchronized to HMS, the partition information is also 
synchronized, so even if the metastore is not enabled in the Hudi table (for 
example, if the Metastore is false for a Hudi table created with Flink), you 
can still obtain the partition information through the HMS API.
---
 docs/en/docs/lakehouse/multi-catalog/hudi.md       |  6 +++++
 docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md    |  6 +++++
 .../doris/datasource/hive/HMSExternalTable.java    | 11 +++++++++
 .../hudi/source/HudiCachedPartitionProcessor.java  | 27 ++++++++++++++++++----
 .../doris/datasource/hudi/source/HudiScanNode.java |  8 +++++--
 5 files changed, 52 insertions(+), 6 deletions(-)

diff --git a/docs/en/docs/lakehouse/multi-catalog/hudi.md 
b/docs/en/docs/lakehouse/multi-catalog/hudi.md
index 52892db2df2..a52c2370ced 100644
--- a/docs/en/docs/lakehouse/multi-catalog/hudi.md
+++ b/docs/en/docs/lakehouse/multi-catalog/hudi.md
@@ -55,6 +55,12 @@ CREATE CATALOG hudi PROPERTIES (
 );
 ```
 
+Optional configuration parameters:
+
+|name|description|default|
+|---|---|---|
+|use_hive_sync_partition|Use hms synchronized partition data|false|
+
 ## Column Type Mapping
 
 Same as that in Hive Catalogs. See the relevant section in [Hive](./hive.md).
diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md 
b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md
index b619283cacf..38bb26d3bc7 100644
--- a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md
+++ b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md
@@ -55,6 +55,12 @@ CREATE CATALOG hudi PROPERTIES (
 );
 ```
 
+可选配置参数:
+
+|参数名|说明|默认值|
+|---|---|---|
+|use_hive_sync_partition|使用hms已同步的分区数据|false|
+
 ## 列类型映射
 
 和 Hive Catalog 一致,可参阅 [Hive Catalog](./hive.md) 中 **列类型映射** 一节。
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index c841c749fb2..62b2c35b8c5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -104,6 +104,9 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
     private static final String SPARK_STATS_MAX_LEN = ".avgLen";
     private static final String SPARK_STATS_HISTOGRAM = ".histogram";
 
+    private static final String USE_HIVE_SYNC_PARTITION = 
"use_hive_sync_partition";
+
+
     static {
         SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet();
         
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat");
@@ -227,6 +230,14 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         return 
"org.apache.hudi.hadoop.HoodieParquetInputFormat".equals(inputFormatName);
     }
 
+    /**
+     * Some data lakes (such as Hudi) will synchronize their partition 
information to HMS,
+     * then we can quickly obtain the partition information of the table from 
HMS.
+     */
+    public boolean useHiveSyncPartition() {
+        return 
Boolean.parseBoolean(catalog.getProperties().getOrDefault(USE_HIVE_SYNC_PARTITION,
 "false"));
+    }
+
     /**
      * Now we only support three file input format hive tables: 
parquet/orc/text.
      * Support managed_table and external_table.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
index eb09c5efb59..90c89dbbda2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
@@ -21,6 +21,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.datasource.CacheException;
 import org.apache.doris.datasource.TablePartitionValues;
 import org.apache.doris.datasource.TablePartitionValues.TablePartitionKey;
+import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 
 import com.google.common.base.Preconditions;
@@ -31,6 +32,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.List;
@@ -39,6 +42,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 public class HudiCachedPartitionProcessor extends HudiPartitionProcessor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HudiCachedPartitionProcessor.class);
     private final long catalogId;
     private final Executor executor;
     private final LoadingCache<TablePartitionKey, TablePartitionValues> 
partitionCache;
@@ -81,7 +85,7 @@ public class HudiCachedPartitionProcessor extends 
HudiPartitionProcessor {
     }
 
     public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable 
table,
-            HoodieTableMetaClient tableMetaClient, String timestamp) {
+            HoodieTableMetaClient tableMetaClient, String timestamp, boolean 
useHiveSyncPartition) {
         Preconditions.checkState(catalogId == table.getCatalog().getId());
         Option<String[]> partitionColumns = 
tableMetaClient.getTableConfig().getPartitionFields();
         if (!partitionColumns.isPresent()) {
@@ -94,7 +98,7 @@ public class HudiCachedPartitionProcessor extends 
HudiPartitionProcessor {
         }
         long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp());
         if (Long.parseLong(timestamp) == lastTimestamp) {
-            return getPartitionValues(table, tableMetaClient);
+            return getPartitionValues(table, tableMetaClient, 
useHiveSyncPartition);
         }
         List<String> partitionNameAndValues = 
getPartitionNamesBeforeOrEquals(timeline, timestamp);
         List<String> partitionNames = Arrays.asList(partitionColumns.get());
@@ -105,7 +109,8 @@ public class HudiCachedPartitionProcessor extends 
HudiPartitionProcessor {
         return partitionValues;
     }
 
-    public TablePartitionValues getPartitionValues(HMSExternalTable table, 
HoodieTableMetaClient tableMetaClient)
+    public TablePartitionValues getPartitionValues(HMSExternalTable table, 
HoodieTableMetaClient tableMetaClient,
+                                                   boolean 
useHiveSyncPartition)
             throws CacheException {
         Preconditions.checkState(catalogId == table.getCatalog().getId());
         Option<String[]> partitionColumns = 
tableMetaClient.getTableConfig().getPartitionFields();
@@ -137,7 +142,21 @@ public class HudiCachedPartitionProcessor extends 
HudiPartitionProcessor {
                 if (lastTimestamp <= lastUpdateTimestamp) {
                     return partitionValues;
                 }
-                List<String> partitionNames = 
getAllPartitionNames(tableMetaClient);
+                HMSExternalCatalog catalog = (HMSExternalCatalog) 
table.getCatalog();
+                List<String> partitionNames;
+                if (useHiveSyncPartition) {
+                    // When a Hudi table is synchronized to HMS, the partition 
information is also synchronized,
+                    // so even if the metastore is not enabled in the Hudi 
table
+                    //     (for example, if the Metastore is false for a Hudi 
table created with Flink),
+                    // we can still obtain the partition information through 
the HMS API.
+                    partitionNames = 
catalog.getClient().listPartitionNames(table.getDbName(), table.getName());
+                    if (partitionNames.size() == 0) {
+                        LOG.warn("Failed to get partitions from hms api, 
switch it from hudi api.");
+                        partitionNames = getAllPartitionNames(tableMetaClient);
+                    }
+                } else {
+                    partitionNames = getAllPartitionNames(tableMetaClient);
+                }
                 List<String> partitionColumnsList = 
Arrays.asList(partitionColumns.get());
                 partitionValues.cleanPartitions();
                 partitionValues.addPartitions(partitionNames,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 144d308140e..4f4b1c3e8ff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -83,6 +83,8 @@ public class HudiScanNode extends HiveScanNode {
 
     private final AtomicLong noLogsSplitNum = new AtomicLong(0);
 
+    private final boolean useHiveSyncPartition;
+
     /**
      * External file scan node for Query Hudi table
      * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check 
column priv
@@ -102,6 +104,7 @@ public class HudiScanNode extends HiveScanNode {
                 LOG.debug("Hudi table {} is a mor table, and will use JNI to 
read data in BE", hmsTable.getName());
             }
         }
+        useHiveSyncPartition = hmsTable.useHiveSyncPartition();
     }
 
     @Override
@@ -171,9 +174,10 @@ public class HudiScanNode extends HiveScanNode {
                     
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog());
             TablePartitionValues partitionValues;
             if (snapshotTimestamp.isPresent()) {
-                partitionValues = 
processor.getSnapshotPartitionValues(hmsTable, metaClient, 
snapshotTimestamp.get());
+                partitionValues = processor.getSnapshotPartitionValues(
+                    hmsTable, metaClient, snapshotTimestamp.get(), 
useHiveSyncPartition);
             } else {
-                partitionValues = processor.getPartitionValues(hmsTable, 
metaClient);
+                partitionValues = processor.getPartitionValues(hmsTable, 
metaClient, useHiveSyncPartition);
             }
             if (partitionValues != null) {
                 // 2. prune partitions by expr


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to