This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f80df20b6f [Fix](multi-catalog) Fix read error in mixed partition
locations. (#21399)
f80df20b6f is described below
commit f80df20b6ffe254f42ebc8e4575a1b8fe278155b
Author: Qi Chen <[email protected]>
AuthorDate: Mon Jul 3 15:14:17 2023 +0800
[Fix](multi-catalog) Fix read error in mixed partition locations. (#21399)
Issue Number: close #20948
Fix read error in mixed partition locations(for example, some partitions
locations are on s3, other are on hdfs) by `getLocationType` of file split
level instead of the table level.
---
.../doris/planner/external/FileQueryScanNode.java | 82 ++++++++++------------
.../doris/planner/external/HiveScanNode.java | 6 +-
.../doris/planner/external/MaxComputeScanNode.java | 5 ++
.../apache/doris/planner/external/TVFScanNode.java | 5 ++
.../planner/external/iceberg/IcebergScanNode.java | 6 ++
.../planner/external/paimon/PaimonScanNode.java | 6 +-
.../hive/test_mixed_par_locations.out | 37 ++++++++++
.../hive/test_mixed_par_locations.groovy | 63 +++++++++++++++++
8 files changed, 164 insertions(+), 46 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index ed004ff170..39727a6f04 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -230,51 +230,42 @@ public abstract class FileQueryScanNode extends
FileScanNode {
if (inputSplits.isEmpty()) {
return;
}
- FileSplit inputSplit = (FileSplit) inputSplits.get(0);
- TFileType locationType = getLocationType();
- params.setFileType(locationType);
TFileFormatType fileFormatType = getFileFormatType();
params.setFormatType(fileFormatType);
- TFileCompressType fileCompressType = getFileCompressType(inputSplit);
- params.setCompressType(fileCompressType);
- boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) ||
fileFormatType == TFileFormatType.FORMAT_JSON;
- if (isCsvOrJson) {
- params.setFileAttributes(getFileAttributes());
- }
-
- // set hdfs params for hdfs file type.
- Map<String, String> locationProperties = getLocationProperties();
- if (fileFormatType == TFileFormatType.FORMAT_JNI || locationType ==
TFileType.FILE_S3) {
- params.setProperties(locationProperties);
- }
- if (locationType == TFileType.FILE_HDFS || locationType ==
TFileType.FILE_BROKER) {
- String fsName = getFsName(inputSplit);
- THdfsParams tHdfsParams =
HdfsResource.generateHdfsParam(locationProperties);
- tHdfsParams.setFsName(fsName);
- params.setHdfsParams(tHdfsParams);
-
- if (locationType == TFileType.FILE_BROKER) {
- FsBroker broker =
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
- if (broker == null) {
- throw new UserException("No alive broker.");
- }
- params.addToBrokerAddresses(new TNetworkAddress(broker.host,
broker.port));
- }
- }
-
List<String> pathPartitionKeys = getPathPartitionKeys();
for (Split split : inputSplits) {
+ TFileScanRangeParams scanRangeParams = new
TFileScanRangeParams(params);
FileSplit fileSplit = (FileSplit) split;
+ TFileType locationType =
getLocationType(fileSplit.getPath().toString());
+ scanRangeParams.setFileType(locationType);
+ TFileCompressType fileCompressType =
getFileCompressType(fileSplit);
+ scanRangeParams.setCompressType(fileCompressType);
+ boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) ||
fileFormatType == TFileFormatType.FORMAT_JSON;
+ if (isCsvOrJson) {
+ scanRangeParams.setFileAttributes(getFileAttributes());
+ }
- TFileScanRangeParams scanRangeParams;
- if (!isCsvOrJson) {
- scanRangeParams = params;
- } else {
- // If fileFormatType is csv/json format, uncompressed files
may be coexists with compressed files
- // So we need set compressType separately
- scanRangeParams = new TFileScanRangeParams(params);
-
scanRangeParams.setCompressType(getFileCompressType(fileSplit));
+ // set hdfs params for hdfs file type.
+ Map<String, String> locationProperties = getLocationProperties();
+ if (fileFormatType == TFileFormatType.FORMAT_JNI) {
+ scanRangeParams.setProperties(locationProperties);
+ } else if (locationType == TFileType.FILE_HDFS || locationType ==
TFileType.FILE_BROKER) {
+ String fsName = getFsName(fileSplit);
+ THdfsParams tHdfsParams =
HdfsResource.generateHdfsParam(locationProperties);
+ tHdfsParams.setFsName(fsName);
+ scanRangeParams.setHdfsParams(tHdfsParams);
+
+ if (locationType == TFileType.FILE_BROKER) {
+ FsBroker broker =
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
+ if (broker == null) {
+ throw new UserException("No alive broker.");
+ }
+ scanRangeParams.addToBrokerAddresses(new
TNetworkAddress(broker.host, broker.port));
+ }
+ } else if (locationType == TFileType.FILE_S3) {
+ scanRangeParams.setProperties(locationProperties);
}
+
TScanRangeLocations curLocations = newLocations(scanRangeParams);
// If fileSplit has partition values, use the values collected
from hive partitions.
@@ -288,7 +279,8 @@ public abstract class FileQueryScanNode extends
FileScanNode {
?
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
pathPartitionKeys, false, isACID)
: fileSplit.getPartitionValues();
- TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit,
partitionValuesFromPath, pathPartitionKeys);
+ TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit,
partitionValuesFromPath, pathPartitionKeys,
+ locationType);
if (isACID) {
HiveSplit hiveSplit = (HiveSplit) split;
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
@@ -354,7 +346,7 @@ public abstract class FileQueryScanNode extends
FileScanNode {
}
private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit,
List<String> columnsFromPath,
- List<String>
columnsFromPathKeys)
+ List<String>
columnsFromPathKeys, TFileType locationType)
throws UserException {
TFileRangeDesc rangeDesc = new TFileRangeDesc();
rangeDesc.setStartOffset(fileSplit.getStart());
@@ -365,11 +357,11 @@ public abstract class FileQueryScanNode extends
FileScanNode {
rangeDesc.setColumnsFromPath(columnsFromPath);
rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
- if (getLocationType() == TFileType.FILE_HDFS) {
+ if (locationType == TFileType.FILE_HDFS) {
rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
- } else if (getLocationType() == TFileType.FILE_S3
- || getLocationType() == TFileType.FILE_BROKER
- || getLocationType() == TFileType.FILE_NET) {
+ } else if (locationType == TFileType.FILE_S3
+ || locationType == TFileType.FILE_BROKER
+ || locationType == TFileType.FILE_NET) {
// need full path
rangeDesc.setPath(fileSplit.getPath().toString());
}
@@ -379,6 +371,8 @@ public abstract class FileQueryScanNode extends
FileScanNode {
protected abstract TFileType getLocationType() throws UserException;
+ protected abstract TFileType getLocationType(String location) throws
UserException;
+
protected abstract TFileFormatType getFileFormatType() throws
UserException;
protected TFileCompressType getFileCompressType(FileSplit fileSplit)
throws UserException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index 710fed10dd..ec85f8eb25 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -230,7 +230,11 @@ public class HiveScanNode extends FileQueryScanNode {
@Override
protected TFileType getLocationType() throws UserException {
- String location = hmsTable.getRemoteTable().getSd().getLocation();
+ return
getLocationType(hmsTable.getRemoteTable().getSd().getLocation());
+ }
+
+ @Override
+ protected TFileType getLocationType(String location) throws UserException {
return getTFileType(location).orElseThrow(() ->
new DdlException("Unknown file location " + location + " for hms
table " + hmsTable.getName()));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
index 102292e4c0..11e9bafd86 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
@@ -53,6 +53,11 @@ public class MaxComputeScanNode extends FileQueryScanNode {
@Override
protected TFileType getLocationType() throws UserException {
+ return getLocationType(null);
+ }
+
+ @Override
+ protected TFileType getLocationType(String location) throws UserException {
return TFileType.FILE_NET;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
index 0dfb78abed..476e16b098 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
@@ -84,6 +84,11 @@ public class TVFScanNode extends FileQueryScanNode {
@Override
public TFileType getLocationType() throws DdlException,
MetaNotFoundException {
+ return getLocationType(null);
+ }
+
+ @Override
+ public TFileType getLocationType(String location) throws DdlException,
MetaNotFoundException {
return tableValuedFunction.getTFileType();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index bc23cec092..2de2f8291c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -244,6 +244,12 @@ public class IcebergScanNode extends FileQueryScanNode {
public TFileType getLocationType() throws UserException {
Table icebergTable = source.getIcebergTable();
String location = icebergTable.location();
+ return getLocationType(location);
+ }
+
+ @Override
+ public TFileType getLocationType(String location) throws UserException {
+ Table icebergTable = source.getIcebergTable();
return getTFileType(location).orElseThrow(() ->
new DdlException("Unknown file location " + location + " for
iceberg table " + icebergTable.name()));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
index d8fcca48ac..a31ba1340b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
@@ -141,7 +141,11 @@ public class PaimonScanNode extends FileQueryScanNode {
@Override
public TFileType getLocationType() throws DdlException,
MetaNotFoundException {
- String location = ((AbstractFileStoreTable)
source.getPaimonTable()).location().toString();
+ return getLocationType(((AbstractFileStoreTable)
source.getPaimonTable()).location().toString());
+ }
+
+ @Override
+ public TFileType getLocationType(String location) throws DdlException,
MetaNotFoundException {
if (location != null && !location.isEmpty()) {
if (S3Util.isObjStorage(location)) {
return TFileType.FILE_S3;
diff --git
a/regression-test/data/external_table_emr_p2/hive/test_mixed_par_locations.out
b/regression-test/data/external_table_emr_p2/hive/test_mixed_par_locations.out
new file mode 100644
index 0000000000..e4344d897f
--- /dev/null
+++
b/regression-test/data/external_table_emr_p2/hive/test_mixed_par_locations.out
@@ -0,0 +1,37 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !01 --
+1 Tom 48 shanghai male 20230101
+2 Jerry 35 guangzhou male 20230101
+3 Frank 25 hangzhou male 20230101
+4 Ada 22 beijing female 20230101
+5 Jason 46 shanghai male 20230102
+6 Andy 38 guangzhou male 20230102
+7 Sam 29 hangzhou male 20230102
+8 Chloea 18 beijing female 20230102
+
+-- !02 --
+8
+
+-- !03 --
+guangzhou 2
+hangzhou 2
+shanghai 2
+
+-- !01 --
+1 Tom 48 shanghai male 20230101
+2 Jerry 35 guangzhou male 20230101
+3 Frank 25 hangzhou male 20230101
+4 Ada 22 beijing female 20230101
+5 Jason 46 shanghai male 20230102
+6 Andy 38 guangzhou male 20230102
+7 Sam 29 hangzhou male 20230102
+8 Chloea 18 beijing female 20230102
+
+-- !02 --
+8
+
+-- !03 --
+guangzhou 2
+hangzhou 2
+shanghai 2
+
diff --git
a/regression-test/suites/external_table_emr_p2/hive/test_mixed_par_locations.groovy
b/regression-test/suites/external_table_emr_p2/hive/test_mixed_par_locations.groovy
new file mode 100644
index 0000000000..ec092f99e7
--- /dev/null
+++
b/regression-test/suites/external_table_emr_p2/hive/test_mixed_par_locations.groovy
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mixed_par_locations", "p2") {
+
+ def formats = ["_parquet", "_orc"]
+ def q1 = """select * from test_mixed_par_locationsSUFFIX order by id;"""
+ def q2 = """select count(id) from test_mixed_par_locationsSUFFIX;"""
+ def q3 = """select city, count(*) from test_mixed_par_locations_parquet
where sex = 'male' group by city order by city;"""
+
+ String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ try {
+ String extHiveHmsHost =
context.config.otherConfigs.get("extHiveHmsHost")
+ String extHiveHmsPort =
context.config.otherConfigs.get("extHiveHmsPort")
+ String extAk = context.config.otherConfigs.get("extAk");
+ String extSk = context.config.otherConfigs.get("extSk");
+ String ext3Endpoint =
context.config.otherConfigs.get("ext3Endpoint");
+ String extS3Region =
context.config.otherConfigs.get("extS3Region");
+ String catalog_name = "test_mixed_par_locations"
+
+ sql """drop catalog if exists ${catalog_name};"""
+ sql """
+ create catalog if not exists ${catalog_name} properties (
+ 'type'='hms',
+ 'hive.metastore.uris' =
'thrift://${extHiveHmsHost}:${extHiveHmsPort}',
+ 'AWS_ACCESS_KEY' = "${extAk}",
+ 'AWS_SECRET_KEY' = "${extSk}",
+ 'AWS_ENDPOINT' = "${ext3Endpoint}",
+ 'AWS_REGION' = "${extS3Region}"
+ );
+ """
+ logger.info("catalog " + catalog_name + " created")
+ sql """switch ${catalog_name};"""
+ logger.info("switched to catalog " + catalog_name)
+ sql """use multi_catalog;"""
+ logger.info("use multi_catalog")
+
+ for (String format in formats) {
+ logger.info("Process format " + format)
+ qt_01 q1.replace("SUFFIX", format)
+ qt_02 q2.replace("SUFFIX", format)
+ qt_03 q3.replace("SUFFIX", format)
+ }
+ sql """drop catalog if exists ${catalog_name}"""
+ } finally {
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]