This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 09000743858 [BugFix](MultiCatalog) Fix oss file location is not
avaiable in iceberg hadoop catalog (#30761)
09000743858 is described below
commit 09000743858323d8d51e7d4213a2c4df19e2610c
Author: GoGoWen <[email protected]>
AuthorDate: Tue Feb 6 08:31:59 2024 +0800
[BugFix](MultiCatalog) Fix oss file location is not avaiable in iceberg
hadoop catalog (#30761)
1, create iceberg hadoop catalog like below:
CREATE CATALOG iceberg_catalog PROPERTIES (
"warehouse" = "s3a://xxx/xxx",
"type" = "iceberg",
"s3.secret_key" = "*XXX",
"s3.region" = "region",
"s3.endpoint" = "http://xxx.jd.local",
"s3.bucket" = "xxx-test",
"s3.access_key" = "xxxxx",
"iceberg.catalog.type" = "hadoop",
"fs.s3a.impl" = "org.apache.hadoop.fs.s3a.S3AFileSystem",
"create_time" = "2024-02-02 11:15:28.570"
);
2, run select * from iceberg_catalog.table limit 1;
will get errCode = 2, detailMessage = Unknown file location
nullnulls3a:/xxxx
expect:
OK
also need to bp to branch-2.0
---
.../org/apache/doris/common/util/LocationPath.java | 191 ++++++++++++---------
.../planner/external/iceberg/IcebergScanNode.java | 9 +-
.../apache/doris/common/util/LocationPathTest.java | 14 ++
3 files changed, 127 insertions(+), 87 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
index 0ddba406cdc..fd7da29e519 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
@@ -39,6 +39,8 @@ import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
+import java.nio.file.InvalidPathException;
+import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
@@ -49,7 +51,7 @@ public class LocationPath {
private final LocationType locationType;
private final String location;
- enum LocationType {
+ public enum LocationType {
HDFS,
LOCAL, // Local File
BOS, // Baidu
@@ -66,7 +68,8 @@ public class LocationPath {
S3A,
S3N,
VIEWFS,
- UNKNOWN
+ UNKNOWN,
+ NOSCHEME // no scheme info
}
private LocationPath(String location) {
@@ -75,107 +78,123 @@ public class LocationPath {
public LocationPath(String location, Map<String, String> props) {
String scheme = parseScheme(location).toLowerCase();
- switch (scheme) {
- case FeConstants.FS_PREFIX_HDFS:
- locationType = LocationType.HDFS;
- // Need add hdfs host to location
- String host = props.get(HdfsResource.DSF_NAMESERVICES);
- this.location = normalizedHdfsPath(location, host);
- break;
- case FeConstants.FS_PREFIX_S3:
- locationType = LocationType.S3;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_S3A:
- locationType = LocationType.S3A;
- this.location = convertToS3(location);
- break;
- case FeConstants.FS_PREFIX_S3N:
- // include the check for multi locations and in a table, such
as both s3 and hdfs are in a table.
- locationType = LocationType.S3N;
- this.location = convertToS3(location);
- break;
- case FeConstants.FS_PREFIX_BOS:
- locationType = LocationType.BOS;
- // use s3 client to access
- this.location = convertToS3(location);
- break;
- case FeConstants.FS_PREFIX_GCS:
- locationType = LocationType.GCS;
- // use s3 client to access
- this.location = convertToS3(location);
- break;
- case FeConstants.FS_PREFIX_OSS:
- if (isHdfsOnOssEndpoint(location)) {
- locationType = LocationType.OSS_HDFS;
+ if (scheme.isEmpty()) {
+ locationType = LocationType.NOSCHEME;
+ this.location = location;
+ } else {
+ switch (scheme) {
+ case FeConstants.FS_PREFIX_HDFS:
+ locationType = LocationType.HDFS;
+ // Need add hdfs host to location
+ String host = props.get(HdfsResource.DSF_NAMESERVICES);
+ this.location = normalizedHdfsPath(location, host);
+ break;
+ case FeConstants.FS_PREFIX_S3:
+ locationType = LocationType.S3;
this.location = location;
- } else {
+ break;
+ case FeConstants.FS_PREFIX_S3A:
+ locationType = LocationType.S3A;
+ this.location = convertToS3(location);
+ break;
+ case FeConstants.FS_PREFIX_S3N:
+ // include the check for multi locations and in a table,
such as both s3 and hdfs are in a table.
+ locationType = LocationType.S3N;
+ this.location = convertToS3(location);
+ break;
+ case FeConstants.FS_PREFIX_BOS:
+ locationType = LocationType.BOS;
+ // use s3 client to access
+ this.location = convertToS3(location);
+ break;
+ case FeConstants.FS_PREFIX_GCS:
+ locationType = LocationType.GCS;
+ // use s3 client to access
+ this.location = convertToS3(location);
+ break;
+ case FeConstants.FS_PREFIX_OSS:
+ if (isHdfsOnOssEndpoint(location)) {
+ locationType = LocationType.OSS_HDFS;
+ this.location = location;
+ } else {
+ if (useS3EndPoint(props)) {
+ this.location = convertToS3(location);
+ } else {
+ this.location = location;
+ }
+ locationType = LocationType.OSS;
+ }
+ break;
+ case FeConstants.FS_PREFIX_COS:
if (useS3EndPoint(props)) {
this.location = convertToS3(location);
} else {
this.location = location;
}
- locationType = LocationType.OSS;
- }
- break;
- case FeConstants.FS_PREFIX_COS:
- if (useS3EndPoint(props)) {
- this.location = convertToS3(location);
- } else {
+ locationType = LocationType.COS;
+ break;
+ case FeConstants.FS_PREFIX_OBS:
+ if (useS3EndPoint(props)) {
+ this.location = convertToS3(location);
+ } else {
+ this.location = location;
+ }
+ locationType = LocationType.OBS;
+ break;
+ case FeConstants.FS_PREFIX_OFS:
+ locationType = LocationType.OFS;
this.location = location;
- }
- locationType = LocationType.COS;
- break;
- case FeConstants.FS_PREFIX_OBS:
- if (useS3EndPoint(props)) {
- this.location = convertToS3(location);
- } else {
+ break;
+ case FeConstants.FS_PREFIX_JFS:
+ locationType = LocationType.JFS;
this.location = location;
- }
- locationType = LocationType.OBS;
- break;
- case FeConstants.FS_PREFIX_OFS:
- locationType = LocationType.OFS;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_JFS:
- locationType = LocationType.JFS;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_GFS:
- locationType = LocationType.GFS;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_COSN:
- // if treat cosn(tencent hadoop-cos) as a s3 file system, may
bring incompatible issues
- locationType = LocationType.COSN;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_VIEWFS:
- locationType = LocationType.VIEWFS;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_FILE:
- locationType = LocationType.LOCAL;
- this.location = location;
- break;
- default:
- locationType = LocationType.UNKNOWN;
- this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_GFS:
+ locationType = LocationType.GFS;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_COSN:
+ // if treat cosn(tencent hadoop-cos) as a s3 file system,
may bring incompatible issues
+ locationType = LocationType.COSN;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_VIEWFS:
+ locationType = LocationType.VIEWFS;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_FILE:
+ locationType = LocationType.LOCAL;
+ this.location = location;
+ break;
+ default:
+ locationType = LocationType.UNKNOWN;
+ this.location = location;
+ }
}
}
private static String parseScheme(String location) {
+ String scheme = "";
String[] schemeSplit = location.split(SCHEME_DELIM);
if (schemeSplit.length > 1) {
- return schemeSplit[0];
+ scheme = schemeSplit[0];
} else {
schemeSplit = location.split(NONSTANDARD_SCHEME_DELIM);
if (schemeSplit.length > 1) {
- return schemeSplit[0];
+ scheme = schemeSplit[0];
}
- throw new IllegalArgumentException("Fail to parse scheme, invalid
location: " + location);
}
+
+ // if not get scheme, need consider /path/to/local to no scheme
+ if (scheme.isEmpty()) {
+ try {
+ Paths.get(location);
+ } catch (InvalidPathException exception) {
+ throw new IllegalArgumentException("Fail to parse scheme,
invalid location: " + location);
+ }
+ }
+
+ return scheme;
}
private boolean useS3EndPoint(Map<String, String> props) {
@@ -196,6 +215,7 @@ public class LocationPath {
/**
* The converted path is used for FE to get metadata
+ *
* @param location origin location
* @return metadata location path. just convert when storage is compatible
with s3 client.
*/
@@ -219,7 +239,7 @@ public class LocationPath {
// Need to encode these characters before creating URI.
// But doesn't encode '/' and ':' so that we can get the correct
uri host.
location = URLEncoder.encode(location,
StandardCharsets.UTF_8.name())
- .replace("%2F", "/").replace("%3A", ":");
+ .replace("%2F", "/").replace("%3A", ":");
URI normalizedUri = new URI(location);
// compatible with 'hdfs:///' or 'hdfs:/'
if (StringUtils.isEmpty(normalizedUri.getHost())) {
@@ -336,6 +356,7 @@ public class LocationPath {
/**
* The converted path is used for BE
+ *
* @return BE scan range path
*/
public Path toScanRangeLocation() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index 7d055d5a8b1..2a0de32acbc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -352,11 +352,16 @@ public class IcebergScanNode extends FileQueryScanNode {
private String normalizeLocation(String location) {
Map<String, String> props = source.getCatalog().getProperties();
+ LocationPath locationPath = new LocationPath(location, props);
String icebergCatalogType =
props.get(IcebergExternalCatalog.ICEBERG_CATALOG_TYPE);
if ("hadoop".equalsIgnoreCase(icebergCatalogType)) {
- if (!location.startsWith(HdfsResource.HDFS_PREFIX)) {
+ // if no scheme info, fill will HADOOP_FS_NAME
+ // if no HADOOP_FS_NAME, then should be local file system
+ if (locationPath.getLocationType() ==
LocationPath.LocationType.NOSCHEME) {
String fsName = props.get(HdfsResource.HADOOP_FS_NAME);
- location = fsName + location;
+ if (fsName != null) {
+ location = fsName + location;
+ }
}
}
return location;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
index 71ee9100ffc..571826aa9c8 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
@@ -171,8 +171,22 @@ public class LocationPathTest {
LocationPath locationPath = new LocationPath("unknown://test.com",
rangeProps);
// FE
Assertions.assertTrue(locationPath.get().startsWith("unknown://"));
+ Assertions.assertTrue(locationPath.getLocationType() ==
LocationPath.LocationType.UNKNOWN);
// BE
String beLocation = locationPath.toScanRangeLocation().toString();
Assertions.assertTrue(beLocation.startsWith("unknown://"));
}
+
+ @Test
+ public void testNoSchemeLocation() {
+ // when use unknown location, pass to BE
+ Map<String, String> rangeProps = new HashMap<>();
+ LocationPath locationPath = new LocationPath("/path/to/local",
rangeProps);
+ // FE
+
Assertions.assertTrue(locationPath.get().equalsIgnoreCase("/path/to/local"));
+ Assertions.assertTrue(locationPath.getLocationType() ==
LocationPath.LocationType.NOSCHEME);
+ // BE
+ String beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.equalsIgnoreCase("/path/to/local"));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]