This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 8e147f4c93eff20be5668b7498efc958fb29eed8 Author: GoGoWen <[email protected]> AuthorDate: Tue Feb 6 08:31:59 2024 +0800 [BugFix](MultiCatalog) Fix oss file location is not avaiable in iceberg hadoop catalog (#30761) 1, create iceberg hadoop catalog like below: CREATE CATALOG iceberg_catalog PROPERTIES ( "warehouse" = "s3a://xxx/xxx", "type" = "iceberg", "s3.secret_key" = "*XXX", "s3.region" = "region", "s3.endpoint" = "http://xxx.jd.local", "s3.bucket" = "xxx-test", "s3.access_key" = "xxxxx", "iceberg.catalog.type" = "hadoop", "fs.s3a.impl" = "org.apache.hadoop.fs.s3a.S3AFileSystem", "create_time" = "2024-02-02 11:15:28.570" ); 2, run select * from iceberg_catalog.table limit 1; will get errCode = 2, detailMessage = Unknown file location nullnulls3a:/xxxx expect: OK also need to bp to branch-2.0 --- .../org/apache/doris/common/util/LocationPath.java | 191 ++++++++++++--------- .../planner/external/iceberg/IcebergScanNode.java | 9 +- .../apache/doris/common/util/LocationPathTest.java | 14 ++ 3 files changed, 127 insertions(+), 87 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java index 0ddba406cdc..fd7da29e519 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java @@ -39,6 +39,8 @@ import java.net.URISyntaxException; import java.net.URLDecoder; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import java.nio.file.InvalidPathException; +import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; @@ -49,7 +51,7 @@ public class LocationPath { private final LocationType locationType; private final String location; - enum LocationType { + public enum LocationType { HDFS, LOCAL, // Local File BOS, // Baidu @@ -66,7 +68,8 @@ public class LocationPath { S3A, S3N, VIEWFS, - UNKNOWN + UNKNOWN, + NOSCHEME // no scheme info } private LocationPath(String location) { @@ -75,107 +78,123 @@ public class LocationPath { public LocationPath(String location, Map<String, String> props) { String scheme = parseScheme(location).toLowerCase(); - switch (scheme) { - case FeConstants.FS_PREFIX_HDFS: - locationType = LocationType.HDFS; - // Need add hdfs host to location - String host = props.get(HdfsResource.DSF_NAMESERVICES); - this.location = normalizedHdfsPath(location, host); - break; - case FeConstants.FS_PREFIX_S3: - locationType = LocationType.S3; - this.location = location; - break; - case FeConstants.FS_PREFIX_S3A: - locationType = LocationType.S3A; - this.location = convertToS3(location); - break; - case FeConstants.FS_PREFIX_S3N: - // include the check for multi locations and in a table, such as both s3 and hdfs are in a table. - locationType = LocationType.S3N; - this.location = convertToS3(location); - break; - case FeConstants.FS_PREFIX_BOS: - locationType = LocationType.BOS; - // use s3 client to access - this.location = convertToS3(location); - break; - case FeConstants.FS_PREFIX_GCS: - locationType = LocationType.GCS; - // use s3 client to access - this.location = convertToS3(location); - break; - case FeConstants.FS_PREFIX_OSS: - if (isHdfsOnOssEndpoint(location)) { - locationType = LocationType.OSS_HDFS; + if (scheme.isEmpty()) { + locationType = LocationType.NOSCHEME; + this.location = location; + } else { + switch (scheme) { + case FeConstants.FS_PREFIX_HDFS: + locationType = LocationType.HDFS; + // Need add hdfs host to location + String host = props.get(HdfsResource.DSF_NAMESERVICES); + this.location = normalizedHdfsPath(location, host); + break; + case FeConstants.FS_PREFIX_S3: + locationType = LocationType.S3; this.location = location; - } else { + break; + case FeConstants.FS_PREFIX_S3A: + locationType = LocationType.S3A; + this.location = convertToS3(location); + break; + case FeConstants.FS_PREFIX_S3N: + // include the check for multi locations and in a table, such as both s3 and hdfs are in a table. + locationType = LocationType.S3N; + this.location = convertToS3(location); + break; + case FeConstants.FS_PREFIX_BOS: + locationType = LocationType.BOS; + // use s3 client to access + this.location = convertToS3(location); + break; + case FeConstants.FS_PREFIX_GCS: + locationType = LocationType.GCS; + // use s3 client to access + this.location = convertToS3(location); + break; + case FeConstants.FS_PREFIX_OSS: + if (isHdfsOnOssEndpoint(location)) { + locationType = LocationType.OSS_HDFS; + this.location = location; + } else { + if (useS3EndPoint(props)) { + this.location = convertToS3(location); + } else { + this.location = location; + } + locationType = LocationType.OSS; + } + break; + case FeConstants.FS_PREFIX_COS: if (useS3EndPoint(props)) { this.location = convertToS3(location); } else { this.location = location; } - locationType = LocationType.OSS; - } - break; - case FeConstants.FS_PREFIX_COS: - if (useS3EndPoint(props)) { - this.location = convertToS3(location); - } else { + locationType = LocationType.COS; + break; + case FeConstants.FS_PREFIX_OBS: + if (useS3EndPoint(props)) { + this.location = convertToS3(location); + } else { + this.location = location; + } + locationType = LocationType.OBS; + break; + case FeConstants.FS_PREFIX_OFS: + locationType = LocationType.OFS; this.location = location; - } - locationType = LocationType.COS; - break; - case FeConstants.FS_PREFIX_OBS: - if (useS3EndPoint(props)) { - this.location = convertToS3(location); - } else { + break; + case FeConstants.FS_PREFIX_JFS: + locationType = LocationType.JFS; this.location = location; - } - locationType = LocationType.OBS; - break; - case FeConstants.FS_PREFIX_OFS: - locationType = LocationType.OFS; - this.location = location; - break; - case FeConstants.FS_PREFIX_JFS: - locationType = LocationType.JFS; - this.location = location; - break; - case FeConstants.FS_PREFIX_GFS: - locationType = LocationType.GFS; - this.location = location; - break; - case FeConstants.FS_PREFIX_COSN: - // if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues - locationType = LocationType.COSN; - this.location = location; - break; - case FeConstants.FS_PREFIX_VIEWFS: - locationType = LocationType.VIEWFS; - this.location = location; - break; - case FeConstants.FS_PREFIX_FILE: - locationType = LocationType.LOCAL; - this.location = location; - break; - default: - locationType = LocationType.UNKNOWN; - this.location = location; + break; + case FeConstants.FS_PREFIX_GFS: + locationType = LocationType.GFS; + this.location = location; + break; + case FeConstants.FS_PREFIX_COSN: + // if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues + locationType = LocationType.COSN; + this.location = location; + break; + case FeConstants.FS_PREFIX_VIEWFS: + locationType = LocationType.VIEWFS; + this.location = location; + break; + case FeConstants.FS_PREFIX_FILE: + locationType = LocationType.LOCAL; + this.location = location; + break; + default: + locationType = LocationType.UNKNOWN; + this.location = location; + } } } private static String parseScheme(String location) { + String scheme = ""; String[] schemeSplit = location.split(SCHEME_DELIM); if (schemeSplit.length > 1) { - return schemeSplit[0]; + scheme = schemeSplit[0]; } else { schemeSplit = location.split(NONSTANDARD_SCHEME_DELIM); if (schemeSplit.length > 1) { - return schemeSplit[0]; + scheme = schemeSplit[0]; } - throw new IllegalArgumentException("Fail to parse scheme, invalid location: " + location); } + + // if not get scheme, need consider /path/to/local to no scheme + if (scheme.isEmpty()) { + try { + Paths.get(location); + } catch (InvalidPathException exception) { + throw new IllegalArgumentException("Fail to parse scheme, invalid location: " + location); + } + } + + return scheme; } private boolean useS3EndPoint(Map<String, String> props) { @@ -196,6 +215,7 @@ public class LocationPath { /** * The converted path is used for FE to get metadata + * * @param location origin location * @return metadata location path. just convert when storage is compatible with s3 client. */ @@ -219,7 +239,7 @@ public class LocationPath { // Need to encode these characters before creating URI. // But doesn't encode '/' and ':' so that we can get the correct uri host. location = URLEncoder.encode(location, StandardCharsets.UTF_8.name()) - .replace("%2F", "/").replace("%3A", ":"); + .replace("%2F", "/").replace("%3A", ":"); URI normalizedUri = new URI(location); // compatible with 'hdfs:///' or 'hdfs:/' if (StringUtils.isEmpty(normalizedUri.getHost())) { @@ -336,6 +356,7 @@ public class LocationPath { /** * The converted path is used for BE + * * @return BE scan range path */ public Path toScanRangeLocation() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index 7d055d5a8b1..2a0de32acbc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -352,11 +352,16 @@ public class IcebergScanNode extends FileQueryScanNode { private String normalizeLocation(String location) { Map<String, String> props = source.getCatalog().getProperties(); + LocationPath locationPath = new LocationPath(location, props); String icebergCatalogType = props.get(IcebergExternalCatalog.ICEBERG_CATALOG_TYPE); if ("hadoop".equalsIgnoreCase(icebergCatalogType)) { - if (!location.startsWith(HdfsResource.HDFS_PREFIX)) { + // if no scheme info, fill will HADOOP_FS_NAME + // if no HADOOP_FS_NAME, then should be local file system + if (locationPath.getLocationType() == LocationPath.LocationType.NOSCHEME) { String fsName = props.get(HdfsResource.HADOOP_FS_NAME); - location = fsName + location; + if (fsName != null) { + location = fsName + location; + } } } return location; diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java index 71ee9100ffc..571826aa9c8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java @@ -171,8 +171,22 @@ public class LocationPathTest { LocationPath locationPath = new LocationPath("unknown://test.com", rangeProps); // FE Assertions.assertTrue(locationPath.get().startsWith("unknown://")); + Assertions.assertTrue(locationPath.getLocationType() == LocationPath.LocationType.UNKNOWN); // BE String beLocation = locationPath.toScanRangeLocation().toString(); Assertions.assertTrue(beLocation.startsWith("unknown://")); } + + @Test + public void testNoSchemeLocation() { + // when use unknown location, pass to BE + Map<String, String> rangeProps = new HashMap<>(); + LocationPath locationPath = new LocationPath("/path/to/local", rangeProps); + // FE + Assertions.assertTrue(locationPath.get().equalsIgnoreCase("/path/to/local")); + Assertions.assertTrue(locationPath.getLocationType() == LocationPath.LocationType.NOSCHEME); + // BE + String beLocation = locationPath.toScanRangeLocation().toString(); + Assertions.assertTrue(beLocation.equalsIgnoreCase("/path/to/local")); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
