This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 260568db172ee2e8cc939ba5e68876ad2e2dd9dd
Author: Ashin Gau <[email protected]>
AuthorDate: Thu Feb 22 17:42:37 2024 +0800

    [update](hudi) update hudi version to 0.14.1 and compatible with flink hive 
catalog (#31181)
    
    1. Update hudi version from 0.13.1 to .14.1
    2. Compatible with the hudi table created by flink hive catalog
---
 .../org/apache/doris/hudi/BaseSplitReader.scala      |  2 +-
 .../doris/datasource/hive/HMSExternalTable.java      |  5 ++++-
 .../hudi/source/HudiPartitionProcessor.java          |  4 +---
 .../doris/datasource/hudi/source/HudiScanNode.java   | 20 +++++---------------
 fe/pom.xml                                           |  2 +-
 .../data/external_table_p2/hive/test_hive_hudi.out   | 16 ++++++++++++++++
 .../external_table_p2/hive/test_hive_hudi.groovy     |  6 ++++++
 7 files changed, 34 insertions(+), 21 deletions(-)

diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
index 3c10f8a4cd7..a730f2cd1b2 100644
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
@@ -464,7 +464,7 @@ abstract class BaseSplitReader(val split: HoodieSplit) {
                                              options: Map[String, String],
                                              hadoopConf: Configuration,
                                              appendPartitionValues: Boolean = 
false): PartitionedFile => Iterator[InternalRow] = {
-    val parquetFileFormat: ParquetFileFormat = 
sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get
+    val parquetFileFormat: ParquetFileFormat = 
sparkAdapter.createLegacyHoodieParquetFileFormat(appendPartitionValues).get
     val readParquetFile: PartitionedFile => Iterator[Any] = 
parquetFileFormat.buildReaderWithPartitionValues(
       sparkSession = sparkSession,
       dataSchema = dataSchema,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 62b2c35b8c5..6da6073a4a2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -218,8 +218,11 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         if (remoteTable.getSd() == null) {
             return false;
         }
+        Map<String, String> paras = remoteTable.getParameters();
         String inputFormatName = remoteTable.getSd().getInputFormat();
-        return inputFormatName != null && 
SUPPORTED_HUDI_FILE_FORMATS.contains(inputFormatName);
+        // compatible with flink hive catalog
+        return (paras != null && 
"hudi".equalsIgnoreCase(paras.get("flink.connector")))
+                || (inputFormatName != null && 
SUPPORTED_HUDI_FILE_FORMATS.contains(inputFormatName));
     }
 
     public boolean isHoodieCowTable() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
index 91f9eff259d..4baa1477041 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java
@@ -22,7 +22,6 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
-import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
@@ -52,8 +51,7 @@ public abstract class HudiPartitionProcessor {
 
         HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(
                 new HoodieLocalEngineContext(tableMetaClient.getHadoopConf()), 
metadataConfig,
-                tableMetaClient.getBasePathV2().toString(),
-                FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), 
true);
+                tableMetaClient.getBasePathV2().toString(), true);
 
         return newTableMetadata.getAllPartitionPaths();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 4f4b1c3e8ff..dfbb12e8584 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -47,7 +47,6 @@ import com.google.common.collect.Maps;
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BaseFile;
@@ -93,8 +92,10 @@ public class HudiScanNode extends HiveScanNode {
      */
     public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
         super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, 
needCheckColumnPriv);
-        isCowOrRoTable = hmsTable.isHoodieCowTable() || "skip_merge".equals(
-                
hmsTable.getCatalogProperties().get("hoodie.datasource.merge.type"));
+        Map<String, String> paras = hmsTable.getRemoteTable().getParameters();
+        isCowOrRoTable = hmsTable.isHoodieCowTable()
+                || 
"skip_merge".equals(hmsTable.getCatalogProperties().get("hoodie.datasource.merge.type"))
+                || (paras != null && 
"COPY_ON_WRITE".equalsIgnoreCase(paras.get("flink.table.type")));
         if (isCowOrRoTable) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Hudi table {} can read as cow/read optimize table", 
hmsTable.getName());
@@ -239,19 +240,8 @@ public class HudiScanNode extends HiveScanNode {
 
         List<String> columnNames = new ArrayList<>();
         List<String> columnTypes = new ArrayList<>();
-        List<FieldSchema> allFields = Lists.newArrayList();
-        allFields.addAll(hmsTable.getRemoteTable().getSd().getCols());
-        allFields.addAll(hmsTable.getRemoteTable().getPartitionKeys());
-
         for (Schema.Field hudiField : hudiSchema.getFields()) {
-            String columnName = hudiField.name().toLowerCase(Locale.ROOT);
-            // keep hive metastore column in hudi avro schema.
-            Optional<FieldSchema> field = allFields.stream().filter(f -> 
f.getName().equals(columnName)).findFirst();
-            if (!field.isPresent()) {
-                String errorMsg = String.format("Hudi column %s not exists in 
hive metastore.", hudiField.name());
-                throw new IllegalArgumentException(errorMsg);
-            }
-            columnNames.add(columnName);
+            columnNames.add(hudiField.name().toLowerCase(Locale.ROOT));
             String columnType = 
HudiUtils.fromAvroHudiTypeToHiveTypeString(hudiField.schema());
             columnTypes.add(columnType);
         }
diff --git a/fe/pom.xml b/fe/pom.xml
index 3fd1bfc751f..5ebd759f013 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -307,7 +307,7 @@ under the License.
         <avro.version>1.11.3</avro.version>
         <arrow.version>15.0.0</arrow.version>
         <!-- hudi -->
-        <hudi.version>0.13.1</hudi.version>
+        <hudi.version>0.14.1</hudi.version>
         <presto.hadoop.version>2.7.4-11</presto.hadoop.version>
         <presto.hive.version>3.0.0-8</presto.hive.version>
 
diff --git a/regression-test/data/external_table_p2/hive/test_hive_hudi.out 
b/regression-test/data/external_table_p2/hive/test_hive_hudi.out
index adff3889cf2..9202749e617 100644
--- a/regression-test/data/external_table_p2/hive/test_hive_hudi.out
+++ b/regression-test/data/external_table_p2/hive/test_hive_hudi.out
@@ -119,6 +119,22 @@ row_4      2021-02-01      4       v_4
 20230922203209630      20230922203209630_0_992 994             
8b83d7ed-c150-4177-9dbb-d41169e8b9c7-0_0-163-0_20230922203209630.parquet        
994     a991    [[991], [330], [991, -9, null]] {"k991":[1, null, 9], "k2":[], 
"k3":null, "k4":[null, 9]}       {"col1": "2012-02-02 06:24:05.000000", "col2": 
[1000, 991, null]}
 20230922203209630      20230922203209630_0_9910        9912            
8b83d7ed-c150-4177-9dbb-d41169e8b9c7-0_0-163-0_20230922203209630.parquet        
9912    a9909   [[9909], [3303], [9909, 8909, null]]    {"k9909":[1, null, 9], 
"k2":[], "k3":null, "k4":[null, 9]}      {"col1": "2012-02-04 02:50:13.000000", 
"col2": [9918, 9909, null]}
 
+-- !flink_hive_catalog --
+20240221112835666      20240221112835666_0_1   
1dced545-862b-4ceb-8b43-d2a568f6616b            
cc1f45f0-5d1e-4c3e-872e-b4c123d1250b    1695332066204   
1dced545-862b-4ceb-8b43-d2a568f6616b    rider-E driver-O        93.5    
san_francisco
+20240221112835666      20240221112835666_0_3   
3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04            
cc1f45f0-5d1e-4c3e-872e-b4c123d1250b    1695173887231   
3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04    rider-I driver-S        41.06   chennai
+20240221112835666      20240221112835666_0_4   
7a84095f-737f-40bc-b62f-6b69664712d2            
cc1f45f0-5d1e-4c3e-872e-b4c123d1250b    1695376420876   
7a84095f-737f-40bc-b62f-6b69664712d2    rider-G driver-Q        43.4    
sao_paulo
+20240221112917786      20240221112917786_0_1   
9909a8b1-2d15-4d3d-8ec9-efc48c536a00            
cc1f45f0-5d1e-4c3e-872e-b4c123d1250b    1695046462179   
9909a8b1-2d15-4d3d-8ec9-efc48c536a00    rider-D driver-L        88.88   
san_francisco
+20240221112835666      20240221112835666_0_7   
e3cf430c-889d-4015-bc98-59bdce1e530c            
cc1f45f0-5d1e-4c3e-872e-b4c123d1250b    1695516137016   
e3cf430c-889d-4015-bc98-59bdce1e530c    rider-F driver-P        34.15   
sao_paulo
+20240221112835666      20240221112835666_0_8   
e96c4396-3fad-413a-a942-4cb36106d721            
cc1f45f0-5d1e-4c3e-872e-b4c123d1250b    1695091554788   
e96c4396-3fad-413a-a942-4cb36106d721    rider-C driver-M        27.7    
san_francisco
+
+-- !flink_hudi_catalog --
+20240221111045165      20240221111045165_0_1   
334e26e9-8355-45cc-97c6-c31daf0df330    san_francisco   
3efcaa94-3e58-436a-b489-1232731ed088    1695159649087   
334e26e9-8355-45cc-97c6-c31daf0df330    rider-A driver-K        25.0    
san_francisco
+20240221111000868      20240221111000868_0_5   
3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04    chennai 
44d55961-1263-4639-bcab-abe4d240b009    1695173887231   
3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04    rider-I driver-S        41.06   chennai
+20240221111000868      20240221111000868_0_3   
9909a8b1-2d15-4d3d-8ec9-efc48c536a00    san_francisco   
3efcaa94-3e58-436a-b489-1232731ed088    1695046462179   
9909a8b1-2d15-4d3d-8ec9-efc48c536a00    rider-D driver-L        33.9    
san_francisco
+20240221111000868      20240221111000868_0_6   
c8abbe79-8d89-47ea-b4ce-4d224bae5bfa    chennai 
44d55961-1263-4639-bcab-abe4d240b009    1695115999911   
c8abbe79-8d89-47ea-b4ce-4d224bae5bfa    rider-J driver-T        17.85   chennai
+20240221111000868      20240221111000868_0_8   
e3cf430c-889d-4015-bc98-59bdce1e530c    sao_paulo       
c97347e9-033a-4c19-a033-94ac1de9f892    1695516137016   
e3cf430c-889d-4015-bc98-59bdce1e530c    rider-F driver-P        34.15   
sao_paulo
+20240221111000868      20240221111000868_0_4   
e96c4396-3fad-413a-a942-4cb36106d721    san_francisco   
3efcaa94-3e58-436a-b489-1232731ed088    1695091554788   
e96c4396-3fad-413a-a942-4cb36106d721    rider-C driver-M        27.7    
san_francisco
+
 -- !skip_merge --
 20230605145009209      20230605145009209_0_0   rowId:row_1     
partitionId=2021-01-01/versionId=v_0    
65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet        
row_1   2021-01-01      0       bob     v_0     toBeDel0        0       1000000
 20230605145403388      20230605145403388_2_0   rowId:row_1     
partitionId=2011-11-11/versionId=v_1    
dbff8acb-42bc-400c-be33-47d9e0bae9b7-0_2-83-222_20230605145403388.parquet       
row_1   2011-11-11      1       bob     v_1     toBeDel1        0       1000001
diff --git 
a/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy 
b/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy
index 8648aa4bc98..d852e604df5 100644
--- a/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy
+++ b/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy
@@ -42,6 +42,12 @@ suite("test_hive_hudi", "p2,external,hive,hudi") {
         // test complex types
         qt_complex_types """select * from complex_type_rt order by name desc 
limit 100"""
 
+        // hudi table created by flink hive catalog
+        qt_flink_hive_catalog """select * from hive_ctl_table order by uuid"""
+
+        // hudi table created by flink hudi catalog
+        qt_flink_hudi_catalog """select * from hudi_ctl_table order by uuid"""
+
         // skip logs
         sql """drop catalog if exists ${catalog_name};"""
         sql """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to