This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f80df20b6f [Fix](multi-catalog) Fix read error in mixed partition 
locations. (#21399)
f80df20b6f is described below

commit f80df20b6ffe254f42ebc8e4575a1b8fe278155b
Author: Qi Chen <[email protected]>
AuthorDate: Mon Jul 3 15:14:17 2023 +0800

    [Fix](multi-catalog) Fix read error in mixed partition locations. (#21399)
    
    Issue Number: close #20948
    
    Fix read error in mixed partition locations(for example, some partitions 
locations are on s3, other are on hdfs) by `getLocationType` of file split 
level instead of the table level.
---
 .../doris/planner/external/FileQueryScanNode.java  | 82 ++++++++++------------
 .../doris/planner/external/HiveScanNode.java       |  6 +-
 .../doris/planner/external/MaxComputeScanNode.java |  5 ++
 .../apache/doris/planner/external/TVFScanNode.java |  5 ++
 .../planner/external/iceberg/IcebergScanNode.java  |  6 ++
 .../planner/external/paimon/PaimonScanNode.java    |  6 +-
 .../hive/test_mixed_par_locations.out              | 37 ++++++++++
 .../hive/test_mixed_par_locations.groovy           | 63 +++++++++++++++++
 8 files changed, 164 insertions(+), 46 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index ed004ff170..39727a6f04 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -230,51 +230,42 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
         if (inputSplits.isEmpty()) {
             return;
         }
-        FileSplit inputSplit = (FileSplit) inputSplits.get(0);
-        TFileType locationType = getLocationType();
-        params.setFileType(locationType);
         TFileFormatType fileFormatType = getFileFormatType();
         params.setFormatType(fileFormatType);
-        TFileCompressType fileCompressType = getFileCompressType(inputSplit);
-        params.setCompressType(fileCompressType);
-        boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || 
fileFormatType == TFileFormatType.FORMAT_JSON;
-        if (isCsvOrJson) {
-            params.setFileAttributes(getFileAttributes());
-        }
-
-        // set hdfs params for hdfs file type.
-        Map<String, String> locationProperties = getLocationProperties();
-        if (fileFormatType == TFileFormatType.FORMAT_JNI || locationType == 
TFileType.FILE_S3) {
-            params.setProperties(locationProperties);
-        }
-        if (locationType == TFileType.FILE_HDFS || locationType == 
TFileType.FILE_BROKER) {
-            String fsName = getFsName(inputSplit);
-            THdfsParams tHdfsParams = 
HdfsResource.generateHdfsParam(locationProperties);
-            tHdfsParams.setFsName(fsName);
-            params.setHdfsParams(tHdfsParams);
-
-            if (locationType == TFileType.FILE_BROKER) {
-                FsBroker broker = 
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
-                if (broker == null) {
-                    throw new UserException("No alive broker.");
-                }
-                params.addToBrokerAddresses(new TNetworkAddress(broker.host, 
broker.port));
-            }
-        }
-
         List<String> pathPartitionKeys = getPathPartitionKeys();
         for (Split split : inputSplits) {
+            TFileScanRangeParams scanRangeParams = new 
TFileScanRangeParams(params);
             FileSplit fileSplit = (FileSplit) split;
+            TFileType locationType = 
getLocationType(fileSplit.getPath().toString());
+            scanRangeParams.setFileType(locationType);
+            TFileCompressType fileCompressType = 
getFileCompressType(fileSplit);
+            scanRangeParams.setCompressType(fileCompressType);
+            boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || 
fileFormatType == TFileFormatType.FORMAT_JSON;
+            if (isCsvOrJson) {
+                scanRangeParams.setFileAttributes(getFileAttributes());
+            }
 
-            TFileScanRangeParams scanRangeParams;
-            if (!isCsvOrJson) {
-                scanRangeParams = params;
-            } else {
-                // If fileFormatType is csv/json format, uncompressed files 
may be coexists with compressed files
-                // So we need set compressType separately
-                scanRangeParams = new TFileScanRangeParams(params);
-                
scanRangeParams.setCompressType(getFileCompressType(fileSplit));
+            // set hdfs params for hdfs file type.
+            Map<String, String> locationProperties = getLocationProperties();
+            if (fileFormatType == TFileFormatType.FORMAT_JNI) {
+                scanRangeParams.setProperties(locationProperties);
+            } else if (locationType == TFileType.FILE_HDFS || locationType == 
TFileType.FILE_BROKER) {
+                String fsName = getFsName(fileSplit);
+                THdfsParams tHdfsParams = 
HdfsResource.generateHdfsParam(locationProperties);
+                tHdfsParams.setFsName(fsName);
+                scanRangeParams.setHdfsParams(tHdfsParams);
+
+                if (locationType == TFileType.FILE_BROKER) {
+                    FsBroker broker = 
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
+                    if (broker == null) {
+                        throw new UserException("No alive broker.");
+                    }
+                    scanRangeParams.addToBrokerAddresses(new 
TNetworkAddress(broker.host, broker.port));
+                }
+            } else if (locationType == TFileType.FILE_S3) {
+                scanRangeParams.setProperties(locationProperties);
             }
+
             TScanRangeLocations curLocations = newLocations(scanRangeParams);
 
             // If fileSplit has partition values, use the values collected 
from hive partitions.
@@ -288,7 +279,8 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
                     ? 
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), 
pathPartitionKeys, false, isACID)
                     : fileSplit.getPartitionValues();
 
-            TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, 
partitionValuesFromPath, pathPartitionKeys);
+            TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, 
partitionValuesFromPath, pathPartitionKeys,
+                    locationType);
             if (isACID) {
                 HiveSplit hiveSplit = (HiveSplit) split;
                 
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
@@ -354,7 +346,7 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
     }
 
     private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, 
List<String> columnsFromPath,
-                                               List<String> 
columnsFromPathKeys)
+                                               List<String> 
columnsFromPathKeys, TFileType locationType)
             throws UserException {
         TFileRangeDesc rangeDesc = new TFileRangeDesc();
         rangeDesc.setStartOffset(fileSplit.getStart());
@@ -365,11 +357,11 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
         rangeDesc.setColumnsFromPath(columnsFromPath);
         rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
 
-        if (getLocationType() == TFileType.FILE_HDFS) {
+        if (locationType == TFileType.FILE_HDFS) {
             rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
-        } else if (getLocationType() == TFileType.FILE_S3
-                || getLocationType() == TFileType.FILE_BROKER
-                || getLocationType() == TFileType.FILE_NET) {
+        } else if (locationType == TFileType.FILE_S3
+                || locationType == TFileType.FILE_BROKER
+                || locationType == TFileType.FILE_NET) {
             // need full path
             rangeDesc.setPath(fileSplit.getPath().toString());
         }
@@ -379,6 +371,8 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
 
     protected abstract TFileType getLocationType() throws UserException;
 
+    protected abstract TFileType getLocationType(String location) throws 
UserException;
+
     protected abstract TFileFormatType getFileFormatType() throws 
UserException;
 
     protected TFileCompressType getFileCompressType(FileSplit fileSplit) 
throws UserException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index 710fed10dd..ec85f8eb25 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -230,7 +230,11 @@ public class HiveScanNode extends FileQueryScanNode {
 
     @Override
     protected TFileType getLocationType() throws UserException {
-        String location = hmsTable.getRemoteTable().getSd().getLocation();
+        return 
getLocationType(hmsTable.getRemoteTable().getSd().getLocation());
+    }
+
+    @Override
+    protected TFileType getLocationType(String location) throws UserException {
         return getTFileType(location).orElseThrow(() ->
             new DdlException("Unknown file location " + location + " for hms 
table " + hmsTable.getName()));
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
index 102292e4c0..11e9bafd86 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
@@ -53,6 +53,11 @@ public class MaxComputeScanNode extends FileQueryScanNode {
 
     @Override
     protected TFileType getLocationType() throws UserException {
+        return getLocationType(null);
+    }
+
+    @Override
+    protected TFileType getLocationType(String location) throws UserException {
         return TFileType.FILE_NET;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
index 0dfb78abed..476e16b098 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
@@ -84,6 +84,11 @@ public class TVFScanNode extends FileQueryScanNode {
 
     @Override
     public TFileType getLocationType() throws DdlException, 
MetaNotFoundException {
+        return getLocationType(null);
+    }
+
+    @Override
+    public TFileType getLocationType(String location) throws DdlException, 
MetaNotFoundException {
         return tableValuedFunction.getTFileType();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index bc23cec092..2de2f8291c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -244,6 +244,12 @@ public class IcebergScanNode extends FileQueryScanNode {
     public TFileType getLocationType() throws UserException {
         Table icebergTable = source.getIcebergTable();
         String location = icebergTable.location();
+        return getLocationType(location);
+    }
+
+    @Override
+    public TFileType getLocationType(String location) throws UserException {
+        Table icebergTable = source.getIcebergTable();
         return getTFileType(location).orElseThrow(() ->
             new DdlException("Unknown file location " + location + " for 
iceberg table " + icebergTable.name()));
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
index d8fcca48ac..a31ba1340b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
@@ -141,7 +141,11 @@ public class PaimonScanNode extends FileQueryScanNode {
 
     @Override
     public TFileType getLocationType() throws DdlException, 
MetaNotFoundException {
-        String location = ((AbstractFileStoreTable) 
source.getPaimonTable()).location().toString();
+        return getLocationType(((AbstractFileStoreTable) 
source.getPaimonTable()).location().toString());
+    }
+
+    @Override
+    public TFileType getLocationType(String location) throws DdlException, 
MetaNotFoundException {
         if (location != null && !location.isEmpty()) {
             if (S3Util.isObjStorage(location)) {
                 return TFileType.FILE_S3;
diff --git 
a/regression-test/data/external_table_emr_p2/hive/test_mixed_par_locations.out 
b/regression-test/data/external_table_emr_p2/hive/test_mixed_par_locations.out
new file mode 100644
index 0000000000..e4344d897f
--- /dev/null
+++ 
b/regression-test/data/external_table_emr_p2/hive/test_mixed_par_locations.out
@@ -0,0 +1,37 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !01 --
+1      Tom     48      shanghai        male    20230101
+2      Jerry   35      guangzhou       male    20230101
+3      Frank   25      hangzhou        male    20230101
+4      Ada     22      beijing female  20230101
+5      Jason   46      shanghai        male    20230102
+6      Andy    38      guangzhou       male    20230102
+7      Sam     29      hangzhou        male    20230102
+8      Chloea  18      beijing female  20230102
+
+-- !02 --
+8
+
+-- !03 --
+guangzhou      2
+hangzhou       2
+shanghai       2
+
+-- !01 --
+1      Tom     48      shanghai        male    20230101
+2      Jerry   35      guangzhou       male    20230101
+3      Frank   25      hangzhou        male    20230101
+4      Ada     22      beijing female  20230101
+5      Jason   46      shanghai        male    20230102
+6      Andy    38      guangzhou       male    20230102
+7      Sam     29      hangzhou        male    20230102
+8      Chloea  18      beijing female  20230102
+
+-- !02 --
+8
+
+-- !03 --
+guangzhou      2
+hangzhou       2
+shanghai       2
+
diff --git 
a/regression-test/suites/external_table_emr_p2/hive/test_mixed_par_locations.groovy
 
b/regression-test/suites/external_table_emr_p2/hive/test_mixed_par_locations.groovy
new file mode 100644
index 0000000000..ec092f99e7
--- /dev/null
+++ 
b/regression-test/suites/external_table_emr_p2/hive/test_mixed_par_locations.groovy
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mixed_par_locations", "p2") {
+
+    def formats = ["_parquet", "_orc"]
+    def q1 = """select * from test_mixed_par_locationsSUFFIX order by id;"""
+    def q2 = """select count(id) from test_mixed_par_locationsSUFFIX;"""
+    def q3 = """select city, count(*) from test_mixed_par_locations_parquet 
where sex = 'male' group by city order by city;"""
+
+    String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        try {
+            String extHiveHmsHost = 
context.config.otherConfigs.get("extHiveHmsHost")
+            String extHiveHmsPort = 
context.config.otherConfigs.get("extHiveHmsPort")
+            String extAk = context.config.otherConfigs.get("extAk");
+            String extSk = context.config.otherConfigs.get("extSk");
+            String ext3Endpoint = 
context.config.otherConfigs.get("ext3Endpoint");
+            String extS3Region = 
context.config.otherConfigs.get("extS3Region");
+            String catalog_name = "test_mixed_par_locations"
+
+            sql """drop catalog if exists ${catalog_name};"""
+            sql """
+                create catalog if not exists ${catalog_name} properties (
+                    'type'='hms',
+                    'hive.metastore.uris' = 
'thrift://${extHiveHmsHost}:${extHiveHmsPort}',
+                    'AWS_ACCESS_KEY' = "${extAk}",
+                    'AWS_SECRET_KEY' = "${extSk}",
+                    'AWS_ENDPOINT' = "${ext3Endpoint}",
+                    'AWS_REGION' = "${extS3Region}"
+                );
+            """
+            logger.info("catalog " + catalog_name + " created")
+            sql """switch ${catalog_name};"""
+            logger.info("switched to catalog " + catalog_name)
+            sql """use multi_catalog;"""
+            logger.info("use multi_catalog")
+
+            for (String format in formats) {
+                logger.info("Process format " + format)
+                qt_01 q1.replace("SUFFIX", format)
+                qt_02 q2.replace("SUFFIX", format)
+                qt_03 q3.replace("SUFFIX", format)
+            }
+            sql """drop catalog if exists ${catalog_name}"""
+        } finally {
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to