This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e431c2b9801 [Improvement](multi-catalog)make location easier to
modified, decoupling all storage with single location class (#27874)
e431c2b9801 is described below
commit e431c2b980174b8a90f0500d59a559903fc36eb8
Author: slothever <[email protected]>
AuthorDate: Wed Dec 6 00:13:54 2023 +0800
[Improvement](multi-catalog)make location easier to modified, decoupling
all storage with single location class (#27874)
decoupling all storage with single location class
---
.../java/org/apache/doris/common/FeConstants.java | 30 +-
.../org/apache/doris/common/util/LocationPath.java | 380 +++++++++++++++++++++
.../java/org/apache/doris/common/util/S3Util.java | 138 --------
.../doris/datasource/hive/HiveMetaStoreCache.java | 34 +-
.../datasource/property/PropertyConverter.java | 6 +-
.../org/apache/doris/fs/FileSystemFactory.java | 34 --
.../doris/planner/external/FileQueryScanNode.java | 32 --
.../doris/planner/external/HiveScanNode.java | 4 +-
.../doris/planner/external/hudi/HudiScanNode.java | 11 +-
.../planner/external/iceberg/IcebergScanNode.java | 12 +-
.../apache/doris/common/util/LocationPathTest.java | 178 ++++++++++
.../org/apache/doris/common/util/S3UtilTest.java | 104 ------
12 files changed, 609 insertions(+), 354 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 9d264b0c0ab..b9604009bed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -64,21 +64,21 @@ public class FeConstants {
// use \N to indicate NULL
public static String null_string = "\\N";
- public static String FS_PREFIX_S3 = "s3";
- public static String FS_PREFIX_S3A = "s3a";
- public static String FS_PREFIX_S3N = "s3n";
- public static String FS_PREFIX_OSS = "oss";
- public static String FS_PREFIX_GCS = "gs";
- public static String FS_PREFIX_BOS = "bos";
- public static String FS_PREFIX_COS = "cos";
- public static String FS_PREFIX_COSN = "cosn";
- public static String FS_PREFIX_OBS = "obs";
- public static String FS_PREFIX_OFS = "ofs";
- public static String FS_PREFIX_GFS = "gfs";
- public static String FS_PREFIX_JFS = "jfs";
- public static String FS_PREFIX_HDFS = "hdfs";
- public static String FS_PREFIX_VIEWFS = "viewfs";
- public static String FS_PREFIX_FILE = "file";
+ public static final String FS_PREFIX_S3 = "s3";
+ public static final String FS_PREFIX_S3A = "s3a";
+ public static final String FS_PREFIX_S3N = "s3n";
+ public static final String FS_PREFIX_OSS = "oss";
+ public static final String FS_PREFIX_GCS = "gs";
+ public static final String FS_PREFIX_BOS = "bos";
+ public static final String FS_PREFIX_COS = "cos";
+ public static final String FS_PREFIX_COSN = "cosn";
+ public static final String FS_PREFIX_OBS = "obs";
+ public static final String FS_PREFIX_OFS = "ofs";
+ public static final String FS_PREFIX_GFS = "gfs";
+ public static final String FS_PREFIX_JFS = "jfs";
+ public static final String FS_PREFIX_HDFS = "hdfs";
+ public static final String FS_PREFIX_VIEWFS = "viewfs";
+ public static final String FS_PREFIX_FILE = "file";
public static final String INTERNAL_DB_NAME = "__internal_schema";
public static String TEMP_MATERIZLIZE_DVIEW_PREFIX =
"internal_tmp_materialized_view_";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
new file mode 100644
index 00000000000..d56e67bb0d1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.catalog.HdfsResource;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.property.constants.CosProperties;
+import org.apache.doris.datasource.property.constants.ObsProperties;
+import org.apache.doris.datasource.property.constants.OssProperties;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.fs.FileSystemType;
+import org.apache.doris.thrift.TFileType;
+
+import com.google.common.base.Strings;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class LocationPath {
+ private static final Logger LOG = LogManager.getLogger(LocationPath.class);
+ private static final String SCHEME_DELIM = "://";
+ private static final String NONSTANDARD_SCHEME_DELIM = ":/";
+ private final LocationType locationType;
+ private final String location;
+
+ enum LocationType {
+ HDFS,
+ LOCAL, // Local File
+ BOS, // Baidu
+ GCS, // Google,
+ OBS, // Huawei,
+ COS, // Tencent
+ COSN, // Tencent
+ OFS, // Tencent CHDFS
+ GFS, // Tencent GooseFs,
+ OSS, // Alibaba,
+ OSS_HDFS, // JindoFS on OSS
+ JFS, // JuiceFS,
+ S3,
+ S3A,
+ S3N,
+ VIEWFS,
+ UNKNOWN
+ }
+
+ private LocationPath(String location) {
+ this(location, new HashMap<>());
+ }
+
+ public LocationPath(String location, Map<String, String> props) {
+ String scheme = parseScheme(location).toLowerCase();
+ switch (scheme) {
+ case FeConstants.FS_PREFIX_HDFS:
+ locationType = LocationType.HDFS;
+ // Need add hdfs host to location
+ String host = props.get(HdfsResource.DSF_NAMESERVICES);
+ this.location = normalizedHdfsPath(location, host);
+ break;
+ case FeConstants.FS_PREFIX_S3:
+ locationType = LocationType.S3;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_S3A:
+ locationType = LocationType.S3A;
+ this.location = convertToS3(location);
+ break;
+ case FeConstants.FS_PREFIX_S3N:
+ // include the check for multi locations and in a table, such
as both s3 and hdfs are in a table.
+ locationType = LocationType.S3N;
+ this.location = convertToS3(location);
+ break;
+ case FeConstants.FS_PREFIX_BOS:
+ locationType = LocationType.BOS;
+ // use s3 client to access
+ this.location = convertToS3(location);
+ break;
+ case FeConstants.FS_PREFIX_GCS:
+ locationType = LocationType.GCS;
+ // use s3 client to access
+ this.location = convertToS3(location);
+ break;
+ case FeConstants.FS_PREFIX_OSS:
+ if (isHdfsOnOssEndpoint(location)) {
+ locationType = LocationType.OSS_HDFS;
+ this.location = location;
+ } else {
+ if (useS3EndPoint(props)) {
+ this.location = convertToS3(location);
+ } else {
+ this.location = location;
+ }
+ locationType = LocationType.OSS;
+ }
+ break;
+ case FeConstants.FS_PREFIX_COS:
+ if (useS3EndPoint(props)) {
+ this.location = convertToS3(location);
+ } else {
+ this.location = location;
+ }
+ locationType = LocationType.COS;
+ break;
+ case FeConstants.FS_PREFIX_OBS:
+ if (useS3EndPoint(props)) {
+ this.location = convertToS3(location);
+ } else {
+ this.location = location;
+ }
+ locationType = LocationType.OBS;
+ break;
+ case FeConstants.FS_PREFIX_OFS:
+ locationType = LocationType.OFS;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_JFS:
+ locationType = LocationType.JFS;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_GFS:
+ locationType = LocationType.GFS;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_COSN:
+ // if treat cosn(tencent hadoop-cos) as a s3 file system, may
bring incompatible issues
+ locationType = LocationType.COSN;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_VIEWFS:
+ locationType = LocationType.VIEWFS;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_FILE:
+ locationType = LocationType.LOCAL;
+ this.location = location;
+ break;
+ default:
+ locationType = LocationType.UNKNOWN;
+ this.location = location;
+ }
+ }
+
+ private static String parseScheme(String location) {
+ String[] schemeSplit = location.split(SCHEME_DELIM);
+ if (schemeSplit.length > 1) {
+ return schemeSplit[0];
+ } else {
+ schemeSplit = location.split(NONSTANDARD_SCHEME_DELIM);
+ if (schemeSplit.length > 1) {
+ return schemeSplit[0];
+ }
+ throw new IllegalArgumentException("Fail to parse scheme, invalid
location: " + location);
+ }
+ }
+
+ private boolean useS3EndPoint(Map<String, String> props) {
+ if (props.containsKey(ObsProperties.ENDPOINT)
+ || props.containsKey(OssProperties.ENDPOINT)
+ || props.containsKey(CosProperties.ENDPOINT)) {
+ return false;
+ }
+ // wide check range for the compatibility of s3 properties
+ return (props.containsKey(S3Properties.ENDPOINT) ||
props.containsKey(S3Properties.Env.ENDPOINT));
+ }
+
+ public static boolean isHdfsOnOssEndpoint(String location) {
+ // example: cn-shanghai.oss-dls.aliyuncs.com contains the
"oss-dls.aliyuncs".
+ //
https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen
+ return location.contains("oss-dls.aliyuncs");
+ }
+
+ /**
+ * The converted path is used for FE to get metadata
+ * @param location origin location
+ * @return metadata location path. just convert when storage is compatible
with s3 client.
+ */
+ private static String convertToS3(String location) {
+ LOG.debug("try convert location to s3 prefix: " + location);
+ int pos = findDomainPos(location);
+ return "s3" + location.substring(pos);
+ }
+
+ private static int findDomainPos(String rangeLocation) {
+ int pos = rangeLocation.indexOf("://");
+ if (pos == -1) {
+ throw new RuntimeException("No '://' found in location: " +
rangeLocation);
+ }
+ return pos;
+ }
+
+ private static String normalizedHdfsPath(String location, String host) {
+ try {
+ // Hive partition may contain special characters such as ' ', '<',
'>' and so on.
+ // Need to encode these characters before creating URI.
+ // But doesn't encode '/' and ':' so that we can get the correct
uri host.
+ location = URLEncoder.encode(location,
StandardCharsets.UTF_8.name())
+ .replace("%2F", "/").replace("%3A", ":");
+ URI normalizedUri = new URI(location);
+ // compatible with 'hdfs:///' or 'hdfs:/'
+ if (StringUtils.isEmpty(normalizedUri.getHost())) {
+ location = URLDecoder.decode(location,
StandardCharsets.UTF_8.name());
+ String normalizedPrefix = HdfsResource.HDFS_PREFIX + "//";
+ String brokenPrefix = HdfsResource.HDFS_PREFIX + "/";
+ if (location.startsWith(brokenPrefix) &&
!location.startsWith(normalizedPrefix)) {
+ location = location.replace(brokenPrefix,
normalizedPrefix);
+ }
+ if (StringUtils.isNotEmpty(host)) {
+ // Replace 'hdfs://key/' to 'hdfs://name_service/key/'
+ // Or hdfs:///abc to hdfs://name_service/abc
+ return location.replace(normalizedPrefix, normalizedPrefix
+ host + "/");
+ } else {
+ // 'hdfs://null/' equals the 'hdfs:///'
+ if (location.startsWith(HdfsResource.HDFS_PREFIX + "///"))
{
+ // Do not support hdfs:///location
+ throw new RuntimeException("Invalid location with
empty host: " + location);
+ } else {
+ // Replace 'hdfs://key/' to '/key/', try access local
NameNode on BE.
+ return location.replace(normalizedPrefix, "/");
+ }
+ }
+ }
+ return URLDecoder.decode(location, StandardCharsets.UTF_8.name());
+ } catch (URISyntaxException | UnsupportedEncodingException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ public static Pair<FileSystemType, String> getFSIdentity(String location,
String bindBrokerName) {
+ LocationPath locationPath = new LocationPath(location);
+ FileSystemType fsType = (bindBrokerName != null) ?
FileSystemType.BROKER : locationPath.getFileSystemType();
+ URI uri = locationPath.getPath().toUri();
+ String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" +
Strings.nullToEmpty(uri.getAuthority());
+ return Pair.of(fsType, fsIdent);
+ }
+
+ private FileSystemType getFileSystemType() {
+ FileSystemType fsType;
+ switch (locationType) {
+ case S3:
+ case S3A:
+ case S3N:
+ case COS:
+ case OSS:
+ case OBS:
+ case BOS:
+ case GCS:
+ // All storage will use s3 client to access on BE, so need
convert to s3
+ fsType = FileSystemType.S3;
+ break;
+ case COSN:
+ case OFS:
+ // ofs:// and cosn:// use the same underlying file system:
Tencent Cloud HDFS, aka CHDFS)) {
+ fsType = FileSystemType.OFS;
+ break;
+ case HDFS:
+ case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib
to access oss.
+ case VIEWFS:
+ case GFS:
+ fsType = FileSystemType.DFS;
+ break;
+ case JFS:
+ fsType = FileSystemType.JFS;
+ break;
+ case LOCAL:
+ fsType = FileSystemType.FILE;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown file system
for location: " + location);
+ }
+ return fsType;
+ }
+
+ /**
+ * provide file type for BE.
+ * @param location the location is from fs.listFile
+ * @return on BE, we will use TFileType to get the suitable client to
access storage.
+ */
+ public static TFileType getTFileType(String location) {
+ if (location == null || location.isEmpty()) {
+ return null;
+ }
+ LocationPath locationPath = new LocationPath(location);
+ switch (locationPath.getLocationType()) {
+ case S3:
+ case S3A:
+ case S3N:
+ case COS:
+ case OSS:
+ case OBS:
+ case BOS:
+ case GCS:
+ // now we only support S3 client for object storage on BE
+ return TFileType.FILE_S3;
+ case HDFS:
+ case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib
to access oss.
+ case VIEWFS:
+ case COSN:
+ return TFileType.FILE_HDFS;
+ case GFS:
+ case JFS:
+ case OFS:
+ return TFileType.FILE_BROKER;
+ case LOCAL:
+ return TFileType.FILE_LOCAL;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * The converted path is used for BE
+ * @return BE scan range path
+ */
+ public Path toScanRangeLocation() {
+ switch (locationType) {
+ case S3:
+ case S3A:
+ case S3N:
+ case COS:
+ case OSS:
+ case OBS:
+ case BOS:
+ case GCS:
+ // All storage will use s3 client to access on BE, so need
convert to s3
+ return new Path(convertToS3(location));
+ case HDFS:
+ case OSS_HDFS:
+ case VIEWFS:
+ case COSN:
+ case GFS:
+ case JFS:
+ case OFS:
+ case LOCAL:
+ default:
+ return getPath();
+ }
+ }
+
+ public LocationType getLocationType() {
+ return locationType;
+ }
+
+ public String get() {
+ return location;
+ }
+
+ public Path getPath() {
+ return new Path(location);
+ }
+
+ @Override
+ public String toString() {
+ return get();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
index 98790bc9e83..2d40af321fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
@@ -17,18 +17,8 @@
package org.apache.doris.common.util;
-import org.apache.doris.catalog.HdfsResource;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.credentials.CloudCredential;
-import org.apache.doris.datasource.property.constants.CosProperties;
-import org.apache.doris.datasource.property.constants.ObsProperties;
-import org.apache.doris.datasource.property.constants.OssProperties;
-import org.apache.doris.datasource.property.constants.S3Properties;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.Path;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
@@ -43,138 +33,10 @@ import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
-import java.io.UnsupportedEncodingException;
import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
import java.time.Duration;
-import java.util.Map;
public class S3Util {
- private static final Logger LOG = LogManager.getLogger(S3Util.class);
-
- public static boolean isObjStorage(String location) {
- return isObjStorageUseS3Client(location)
- // if treat cosn(tencent hadoop-cos) as a s3 file system, may
bring incompatible issues
- || (location.startsWith(FeConstants.FS_PREFIX_COS) &&
!location.startsWith(FeConstants.FS_PREFIX_COSN))
- || location.startsWith(FeConstants.FS_PREFIX_OSS)
- || location.startsWith(FeConstants.FS_PREFIX_OBS);
- }
-
- private static boolean isObjStorageUseS3Client(String location) {
- return location.startsWith(FeConstants.FS_PREFIX_S3)
- || location.startsWith(FeConstants.FS_PREFIX_S3A)
- || location.startsWith(FeConstants.FS_PREFIX_S3N)
- || location.startsWith(FeConstants.FS_PREFIX_GCS)
- || location.startsWith(FeConstants.FS_PREFIX_BOS);
- }
-
- private static boolean isS3EndPoint(String location, Map<String, String>
props) {
- if (props.containsKey(ObsProperties.ENDPOINT)
- || props.containsKey(OssProperties.ENDPOINT)
- || props.containsKey(CosProperties.ENDPOINT)) {
- return false;
- }
- // wide check range for the compatibility of s3 properties
- return (props.containsKey(S3Properties.ENDPOINT) ||
props.containsKey(S3Properties.Env.ENDPOINT))
- && isObjStorage(location);
- }
-
- /**
- * The converted path is used for FE to get metadata
- * @param location origin location
- * @return metadata location path. just convert when storage is compatible
with s3 client.
- */
- public static String convertToS3IfNecessary(String location, Map<String,
String> props) {
- LOG.debug("try convert location to s3 prefix: " + location);
- // include the check for multi locations and in a table, such as both
s3 and hdfs are in a table.
- if (isS3EndPoint(location, props) ||
isObjStorageUseS3Client(location)) {
- int pos = location.indexOf("://");
- if (pos == -1) {
- throw new RuntimeException("No '://' found in location: " +
location);
- }
- return "s3" + location.substring(pos);
- }
- return normalizedLocation(location, props);
- }
-
- private static String normalizedLocation(String location, Map<String,
String> props) {
- try {
- if (location.startsWith(HdfsResource.HDFS_PREFIX)) {
- return normalizedHdfsPath(location, props);
- }
- return location;
- } catch (URISyntaxException | UnsupportedEncodingException e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- private static String normalizedHdfsPath(String location, Map<String,
String> props)
- throws URISyntaxException, UnsupportedEncodingException {
- // Hive partition may contain special characters such as ' ', '<', '>'
and so on.
- // Need to encode these characters before creating URI.
- // But doesn't encode '/' and ':' so that we can get the correct uri
host.
- location = URLEncoder.encode(location,
StandardCharsets.UTF_8.name()).replace("%2F", "/").replace("%3A", ":");
- URI normalizedUri = new URI(location);
- // compatible with 'hdfs:///' or 'hdfs:/'
- if (StringUtils.isEmpty(normalizedUri.getHost())) {
- location = URLDecoder.decode(location,
StandardCharsets.UTF_8.name());
- String normalizedPrefix = HdfsResource.HDFS_PREFIX + "//";
- String brokenPrefix = HdfsResource.HDFS_PREFIX + "/";
- if (location.startsWith(brokenPrefix) &&
!location.startsWith(normalizedPrefix)) {
- location = location.replace(brokenPrefix, normalizedPrefix);
- }
- // Need add hdfs host to location
- String host = props.get(HdfsResource.DSF_NAMESERVICES);
- if (StringUtils.isNotEmpty(host)) {
- // Replace 'hdfs://key/' to 'hdfs://name_service/key/'
- // Or hdfs:///abc to hdfs://name_service/abc
- // TODO: check host in path when the 'dfs.nameservices' has
multiple hosts
- return location.replace(normalizedPrefix, normalizedPrefix +
host + "/");
- } else {
- // 'hdfs://null/' equals the 'hdfs:///'
- if (location.startsWith(HdfsResource.HDFS_PREFIX + "///")) {
- // Do not support hdfs:///location
- throw new RuntimeException("Invalid location with empty
host: " + location);
- } else {
- // Replace 'hdfs://key/' to '/key/', try access local
NameNode on BE.
- return location.replace(normalizedPrefix, "/");
- }
- }
- }
- return URLDecoder.decode(location, StandardCharsets.UTF_8.name());
- }
-
- /**
- * The converted path is used for BE
- * @param location origin split path
- * @return BE scan range path
- */
- public static Path toScanRangeLocation(String location, Map<String,
String> props) {
- // All storage will use s3 client on BE.
- if (isObjStorage(location)) {
- int pos = location.indexOf("://");
- if (pos == -1) {
- throw new RuntimeException("No '://' found in location: " +
location);
- }
- if (isHdfsOnOssEndpoint(location)) {
- // if hdfs service is enabled on oss, use oss location
- // example:
oss://examplebucket.cn-shanghai.oss-dls.aliyuncs.com/dir/file/0000.orc
- location = "oss" + location.substring(pos);
- } else {
- location = "s3" + location.substring(pos);
- }
- }
- return new Path(normalizedLocation(location, props));
- }
-
- public static boolean isHdfsOnOssEndpoint(String location) {
- // example: cn-shanghai.oss-dls.aliyuncs.com contains the
"oss-dls.aliyuncs".
- //
https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen
- return location.contains("oss-dls.aliyuncs");
- }
public static S3Client buildS3Client(URI endpoint, String region,
CloudCredential credential) {
StaticCredentialsProvider scp;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 0ab5179ffa5..9ceb0066d30 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -34,14 +34,13 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.CacheBulkLoader;
-import org.apache.doris.common.util.S3Util;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.external.hive.util.HiveUtil;
import org.apache.doris.fs.FileSystemCache;
-import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.RemoteFiles;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
@@ -361,7 +360,7 @@ public class HiveMetaStoreCache {
String bindBrokerName) throws
UserException {
FileCacheValue result = new FileCacheValue();
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
- new
FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(
+ new
FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
location, bindBrokerName), jobConf, bindBrokerName));
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location,
jobConf));
try {
@@ -374,9 +373,10 @@ public class HiveMetaStoreCache {
//
https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true,
true);
for (RemoteFile remoteFile : locatedFiles.files()) {
- Path srcPath = remoteFile.getPath();
- Path convertedPath =
S3Util.toScanRangeLocation(srcPath.toString(), catalog.getProperties());
- if (!convertedPath.toString().equals(srcPath.toString())) {
+ String srcPath = remoteFile.getPath().toString();
+ LocationPath locationPath = new LocationPath(srcPath,
catalog.getProperties());
+ Path convertedPath = locationPath.toScanRangeLocation();
+ if (!convertedPath.toString().equals(srcPath)) {
remoteFile.setPath(convertedPath);
}
result.addFile(remoteFile);
@@ -400,13 +400,12 @@ public class HiveMetaStoreCache {
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
- String finalLocation = S3Util.convertToS3IfNecessary(key.location,
- catalog.getCatalogProperty().getProperties());
+ Map<String, String> props =
catalog.getCatalogProperty().getProperties();
+ LocationPath finalLocation = new LocationPath(key.location, props);
// disable the fs cache in FileSystem, or it will always from new
FileSystem
// and save it in cache when calling
FileInputFormat.setInputPaths().
try {
- Path path = new Path(finalLocation);
- URI uri = path.toUri();
+ URI uri = finalLocation.getPath().toUri();
if (uri.getScheme() != null) {
String scheme = uri.getScheme();
updateJobConf("fs." + scheme + ".impl.disable.cache",
"true");
@@ -419,13 +418,13 @@ public class HiveMetaStoreCache {
} catch (Exception e) {
LOG.warn("unknown scheme in path: " + finalLocation, e);
}
- FileInputFormat.setInputPaths(jobConf, finalLocation);
+ FileInputFormat.setInputPaths(jobConf, finalLocation.get());
try {
FileCacheValue result;
InputFormat<?, ?> inputFormat =
HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
// TODO: This is a temp config, will remove it after the
HiveSplitter is stable.
if (key.useSelfSplitter) {
- result = getFileCache(finalLocation, inputFormat, jobConf,
+ result = getFileCache(finalLocation.get(), inputFormat,
jobConf,
key.getPartitionValues(), key.bindBrokerName);
} else {
InputSplit[] splits;
@@ -442,8 +441,9 @@ public class HiveMetaStoreCache {
for (int i = 0; i < splits.length; i++) {
org.apache.hadoop.mapred.FileSplit fs =
((org.apache.hadoop.mapred.FileSplit) splits[i]);
// todo: get modification time
- Path splitFilePath =
S3Util.toScanRangeLocation(fs.getPath().toString(),
- catalog.getProperties());
+ String dataFilePath = fs.getPath().toString();
+ LocationPath locationPath = new
LocationPath(dataFilePath, catalog.getProperties());
+ Path splitFilePath =
locationPath.toScanRangeLocation();
result.addSplit(new FileSplit(splitFilePath,
fs.getStart(), fs.getLength(), -1, null, null));
}
}
@@ -812,7 +812,7 @@ public class HiveMetaStoreCache {
String acidVersionPath = new Path(baseOrDeltaPath,
"_orc_acid_version").toUri().toString();
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
-
FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString(),
+
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
bindBrokerName), jobConf,
bindBrokerName));
Status status = fs.exists(acidVersionPath);
if (status != Status.OK) {
@@ -835,7 +835,7 @@ public class HiveMetaStoreCache {
String location = delta.getPath().toString();
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
- FileSystemFactory.getFSIdentity(location,
bindBrokerName),
+ LocationPath.getFSIdentity(location,
bindBrokerName),
jobConf, bindBrokerName));
RemoteFiles locatedFiles = fs.listLocatedFiles(location,
true, false);
if (delta.isDeleteDelta()) {
@@ -855,7 +855,7 @@ public class HiveMetaStoreCache {
String location = directory.getBaseDirectory().toString();
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
- FileSystemFactory.getFSIdentity(location,
bindBrokerName),
+ LocationPath.getFSIdentity(location,
bindBrokerName),
jobConf, bindBrokerName));
RemoteFiles locatedFiles = fs.listLocatedFiles(location,
true, false);
locatedFiles.files().stream().filter(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
index 1b0e3b6d972..a6cad308839 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
@@ -17,7 +17,7 @@
package org.apache.doris.datasource.property;
-import org.apache.doris.common.util.S3Util;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.datasource.InitCatalogLog.Type;
@@ -301,7 +301,7 @@ public class PropertyConverter {
ossProperties.put("fs.oss.impl.disable.cache", "true");
ossProperties.put("fs.oss.impl", getHadoopFSImplByScheme("oss"));
boolean hdfsEnabled =
Boolean.parseBoolean(props.getOrDefault(OssProperties.OSS_HDFS_ENABLED,
"false"));
- if (S3Util.isHdfsOnOssEndpoint(endpoint) || hdfsEnabled) {
+ if (LocationPath.isHdfsOnOssEndpoint(endpoint) || hdfsEnabled) {
// use endpoint or enable hdfs
rewriteHdfsOnOssProperties(ossProperties, endpoint);
}
@@ -321,7 +321,7 @@ public class PropertyConverter {
}
private static void rewriteHdfsOnOssProperties(Map<String, String>
ossProperties, String endpoint) {
- if (!S3Util.isHdfsOnOssEndpoint(endpoint)) {
+ if (!LocationPath.isHdfsOnOssEndpoint(endpoint)) {
// just for robustness here, avoid wrong endpoint when oss-hdfs is
enabled.
// convert "oss-cn-beijing.aliyuncs.com" to
"cn-beijing.oss-dls.aliyuncs.com"
// reference link:
https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
index e54a73bbff3..63f552a8ab8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
@@ -18,9 +18,6 @@
package org.apache.doris.fs;
import org.apache.doris.analysis.StorageBackend;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.Pair;
-import org.apache.doris.common.util.S3Util;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.fs.remote.S3FileSystem;
@@ -28,12 +25,10 @@ import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.fs.remote.dfs.JFSFileSystem;
import org.apache.doris.fs.remote.dfs.OFSFileSystem;
-import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
-import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@@ -56,35 +51,6 @@ public class FileSystemFactory {
}
}
- public static Pair<FileSystemType, String> getFSIdentity(String location,
String bindBrokerName) {
- FileSystemType fsType;
- if (bindBrokerName != null) {
- fsType = FileSystemType.BROKER;
- } else if (S3Util.isObjStorage(location)) {
- if (S3Util.isHdfsOnOssEndpoint(location)) {
- // if hdfs service is enabled on oss, use hdfs lib to access
oss.
- fsType = FileSystemType.DFS;
- } else {
- fsType = FileSystemType.S3;
- }
- } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS) ||
location.startsWith(FeConstants.FS_PREFIX_GFS)
- || location.startsWith(FeConstants.FS_PREFIX_VIEWFS)) {
- fsType = FileSystemType.DFS;
- } else if (location.startsWith(FeConstants.FS_PREFIX_OFS) ||
location.startsWith(FeConstants.FS_PREFIX_COSN)) {
- // ofs:// and cosn:// use the same underlying file system: Tencent
Cloud HDFS, aka CHDFS)) {
- fsType = FileSystemType.OFS;
- } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
- fsType = FileSystemType.JFS;
- } else {
- throw new UnsupportedOperationException("Unknown file system for
location: " + location);
- }
-
- Path path = new Path(location);
- URI uri = path.toUri();
- String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" +
Strings.nullToEmpty(uri.getAuthority());
- return Pair.of(fsType, fsIdent);
- }
-
public static RemoteFileSystem getRemoteFileSystem(FileSystemType type,
Configuration conf,
String bindBrokerName) {
Map<String, String> properties = new HashMap<>();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index eac945ced35..ada9f1fda61 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -30,11 +30,9 @@ import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
-import org.apache.doris.common.util.S3Util;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.hive.AcidInfo;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
@@ -83,7 +81,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
/**
@@ -492,33 +489,4 @@ public abstract class FileQueryScanNode extends
FileScanNode {
protected abstract TableIf getTargetTable() throws UserException;
protected abstract Map<String, String> getLocationProperties() throws
UserException;
-
- protected static Optional<TFileType> getTFileType(String location) {
- if (location != null && !location.isEmpty()) {
- if (S3Util.isObjStorage(location)) {
- if (S3Util.isHdfsOnOssEndpoint(location)) {
- // if hdfs service is enabled on oss, use hdfs lib to
access oss.
- return Optional.of(TFileType.FILE_HDFS);
- }
- return Optional.of(TFileType.FILE_S3);
- } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) {
- return Optional.of(TFileType.FILE_HDFS);
- } else if (location.startsWith(FeConstants.FS_PREFIX_VIEWFS)) {
- return Optional.of(TFileType.FILE_HDFS);
- } else if (location.startsWith(FeConstants.FS_PREFIX_COSN)) {
- return Optional.of(TFileType.FILE_HDFS);
- } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
- return Optional.of(TFileType.FILE_LOCAL);
- } else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
- return Optional.of(TFileType.FILE_BROKER);
- } else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) {
- return Optional.of(TFileType.FILE_BROKER);
- } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
- return Optional.of(TFileType.FILE_BROKER);
- }
- }
- return Optional.empty();
- }
}
-
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index 943d30017e7..fbb7df4df2c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
@@ -66,6 +67,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.stream.Collectors;
@@ -334,7 +336,7 @@ public class HiveScanNode extends FileQueryScanNode {
if (bindBrokerName != null) {
return TFileType.FILE_BROKER;
}
- return getTFileType(location).orElseThrow(() ->
+ return
Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() ->
new DdlException("Unknown file location " + location + " for hms
table " + hmsTable.getName()));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
index 3921eb5cb89..f73947262e4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
@@ -26,7 +26,7 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.S3Util;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
@@ -43,7 +43,6 @@ import org.apache.doris.thrift.THudiFileDesc;
import org.apache.doris.thrift.TTableFormatFileDesc;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -287,9 +286,11 @@ public class HudiScanNode extends HiveScanNode {
noLogsSplitNum.incrementAndGet();
String filePath = baseFile.getPath();
long fileSize = baseFile.getFileSize();
- splits.add(new
FileSplit(S3Util.toScanRangeLocation(filePath, Maps.newHashMap()),
- 0, fileSize, fileSize, new String[0],
- partition.getPartitionValues()));
+ // Need add hdfs host to location
+ LocationPath locationPath = new LocationPath(filePath,
hmsTable.getCatalogProperties());
+ Path splitFilePath = locationPath.toScanRangeLocation();
+ splits.add(new FileSplit(splitFilePath, 0, fileSize,
fileSize,
+ new String[0], partition.getPartitionValues()));
});
} else {
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName,
queryInstant).forEach(fileSlice -> {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index c8a9437e243..0602da4a34d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -31,7 +31,7 @@ import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.catalog.external.IcebergExternalTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.S3Util;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.external.iceberg.util.IcebergUtils;
@@ -141,7 +141,9 @@ public class IcebergScanNode extends FileQueryScanNode {
for (IcebergDeleteFileFilter filter :
icebergSplit.getDeleteFileFilters()) {
TIcebergDeleteFileDesc deleteFileDesc = new
TIcebergDeleteFileDesc();
String deleteFilePath = filter.getDeleteFilePath();
-
deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath,
icebergSplit.getConfig()).toString());
+ LocationPath locationPath = new LocationPath(deleteFilePath,
icebergSplit.getConfig());
+ Path splitDeletePath = locationPath.toScanRangeLocation();
+ deleteFileDesc.setPath(splitDeletePath.toString());
if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
fileDesc.setContent(FileContent.POSITION_DELETES.id());
IcebergDeleteFileFilter.PositionDelete positionDelete =
@@ -221,8 +223,8 @@ public class IcebergScanNode extends FileQueryScanNode {
// Counts the number of partitions read
partitionPathSet.add(structLike.toString());
}
-
- Path finalDataFilePath =
S3Util.toScanRangeLocation(dataFilePath, source.getCatalog().getProperties());
+ LocationPath locationPath = new LocationPath(dataFilePath,
source.getCatalog().getProperties());
+ Path finalDataFilePath = locationPath.toScanRangeLocation();
IcebergSplit split = new IcebergSplit(
finalDataFilePath,
splitTask.start(),
@@ -323,7 +325,7 @@ public class IcebergScanNode extends FileQueryScanNode {
@Override
public TFileType getLocationType(String location) throws UserException {
final String fLocation = normalizeLocation(location);
- return getTFileType(fLocation).orElseThrow(() ->
+ return
Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() ->
new DdlException("Unknown file location " + fLocation + " for
iceberg table " + icebergTable.name()));
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
new file mode 100644
index 00000000000..277b6527a4f
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.fs.FileSystemType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class LocationPathTest {
+
+ @Test
+ public void testHdfsLocationConvert() {
+ // non HA
+ Map<String, String> rangeProps = new HashMap<>();
+ LocationPath locationPath = new LocationPath("hdfs://dir/file.path",
rangeProps);
+ Assertions.assertTrue(locationPath.get().startsWith("hdfs://"));
+
+ String beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("hdfs://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.DFS);
+
+ // HA props
+ Map<String, String> props = new HashMap<>();
+ props.put("dfs.nameservices", "ns");
+ locationPath = new LocationPath("hdfs:///dir/file.path", props);
+ Assertions.assertTrue(locationPath.get().startsWith("hdfs://")
+ && !locationPath.get().startsWith("hdfs:///"));
+
+ beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("hdfs://") &&
!beLocation.startsWith("hdfs:///"));
+
+ // nonstandard '/' for hdfs path
+ locationPath = new LocationPath("hdfs:/dir/file.path", props);
+ Assertions.assertTrue(locationPath.get().startsWith("hdfs://"));
+
+ beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("hdfs://"));
+
+ // empty ha nameservices
+ props.put("dfs.nameservices", "");
+ locationPath = new LocationPath("hdfs:/dir/file.path", props);
+
+ beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(locationPath.get().startsWith("/dir")
+ && !locationPath.get().startsWith("hdfs://"));
+ Assertions.assertTrue(beLocation.startsWith("/dir") &&
!beLocation.startsWith("hdfs://"));
+ }
+
+
+ @Test
+ public void testJFSLocationConvert() {
+ String loc;
+ Map<String, String> rangeProps = new HashMap<>();
+
+ LocationPath locationPath = new LocationPath("jfs://test.com",
rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("jfs://"));
+ // BE
+ loc = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(loc.startsWith("jfs://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(loc, null).first,
FileSystemType.JFS);
+ }
+
+ @Test
+ public void testGSLocationConvert() {
+ Map<String, String> rangeProps = new HashMap<>();
+
+ // use s3 client to access gs
+ LocationPath locationPath = new LocationPath("gs://test.com",
rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("s3://"));
+ // BE
+ String beLoc = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLoc.startsWith("s3://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLoc, null).first,
FileSystemType.S3);
+ }
+
+ @Test
+ public void testOSSLocationConvert() {
+ Map<String, String> rangeProps = new HashMap<>();
+ LocationPath locationPath = new LocationPath("oss://test.com",
rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("oss://"));
+ // BE
+ String beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("s3://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.S3);
+
+ locationPath = new
LocationPath("oss://test.oss-dls.aliyuncs.com/path", rangeProps);
+ // FE
+
Assertions.assertTrue(locationPath.get().startsWith("oss://test.oss-dls.aliyuncs"));
+ // BE
+ beLocation = locationPath.toScanRangeLocation().toString();
+
Assertions.assertTrue(beLocation.startsWith("oss://test.oss-dls.aliyuncs"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.DFS);
+
+ }
+
+ @Test
+ public void testCOSLocationConvert() {
+ Map<String, String> rangeProps = new HashMap<>();
+ LocationPath locationPath = new LocationPath("cos://test.com",
rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("cos://"));
+ String beLocation = locationPath.toScanRangeLocation().toString();
+ // BE
+ Assertions.assertTrue(beLocation.startsWith("s3://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.S3);
+
+ locationPath = new LocationPath("cosn://test.com", rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("cosn://"));
+ // BE
+ beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("cosn://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.OFS);
+
+ locationPath = new LocationPath("ofs://test.com", rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("ofs://"));
+ // BE
+ beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("ofs://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.OFS);
+
+ // GFS is now equals to DFS
+ locationPath = new LocationPath("gfs://test.com", rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("gfs://"));
+ // BE
+ beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("gfs://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.DFS);
+ }
+
+ @Test
+ public void testOBSLocationConvert() {
+ Map<String, String> rangeProps = new HashMap<>();
+ LocationPath locationPath = new LocationPath("obs://test.com",
rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("obs://"));
+ // BE
+ String beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("s3://"));
+ Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.S3);
+ }
+
+ @Test
+ public void testUnsupportedLocationConvert() {
+ // when use unknown location, pass to BE
+ Map<String, String> rangeProps = new HashMap<>();
+ LocationPath locationPath = new LocationPath("unknown://test.com",
rangeProps);
+ // FE
+ Assertions.assertTrue(locationPath.get().startsWith("unknown://"));
+ // BE
+ String beLocation = locationPath.toScanRangeLocation().toString();
+ Assertions.assertTrue(beLocation.startsWith("unknown://"));
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/S3UtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/S3UtilTest.java
deleted file mode 100644
index 70bad23e01f..00000000000
--- a/fe/fe-core/src/test/java/org/apache/doris/common/util/S3UtilTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.util;
-
-import org.apache.doris.fs.FileSystemFactory;
-import org.apache.doris.fs.FileSystemType;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class S3UtilTest {
- @Test
- public void testLocationConvert() {
- String loc;
- loc = S3Util.convertToS3IfNecessary("hdfs://dir/file.path", new
HashMap<>());
- Assertions.assertTrue(loc.startsWith("hdfs://"));
-
- Map<String, String> props = new HashMap<>();
- props.put("dfs.nameservices", "ns");
- loc = S3Util.convertToS3IfNecessary("hdfs:///dir/file.path", props);
- Assertions.assertTrue(loc.startsWith("hdfs://") &&
!loc.startsWith("hdfs:///"));
- loc = S3Util.convertToS3IfNecessary("hdfs:/dir/file.path", props);
- Assertions.assertTrue(loc.startsWith("hdfs://"));
- props.put("dfs.nameservices", "");
- loc = S3Util.convertToS3IfNecessary("hdfs:/dir/file.path", props);
- Assertions.assertTrue(loc.startsWith("/dir") &&
!loc.startsWith("hdfs://"));
-
- loc = S3Util.convertToS3IfNecessary("oss://test.com", props);
- Assertions.assertTrue(loc.startsWith("oss://"));
-
- loc = S3Util.convertToS3IfNecessary("gcs://test.com", props);
- Assertions.assertTrue(loc.startsWith("gcs://"));
-
- loc = S3Util.convertToS3IfNecessary("cos://test.com", props);
- Assertions.assertTrue(loc.startsWith("cos://"));
-
- loc = S3Util.convertToS3IfNecessary("cosn://test.com", props);
- Assertions.assertTrue(loc.startsWith("cosn://"));
-
- loc = S3Util.convertToS3IfNecessary("obs://test.com", props);
- Assertions.assertTrue(loc.startsWith("obs://"));
- }
-
-
- @Test
- public void testScanRangeLocationConvert() throws Exception {
- String loc;
- Map<String, String> rangeProps = new HashMap<>();
- loc = S3Util.toScanRangeLocation("hdfs://dir/file.path",
rangeProps).toString();
- Assertions.assertTrue(loc.startsWith("hdfs://"));
- Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc,
null).first, FileSystemType.DFS);
-
- Map<String, String> props = new HashMap<>();
- props.put("dfs.nameservices", "ns");
- loc = S3Util.toScanRangeLocation("hdfs:///dir/file.path",
props).toString();
- Assertions.assertTrue(loc.startsWith("hdfs://") &&
!loc.startsWith("hdfs:///"));
- loc = S3Util.toScanRangeLocation("hdfs:/dir/file.path",
props).toString();
- Assertions.assertTrue(loc.startsWith("hdfs://"));
- props.put("dfs.nameservices", "");
- loc = S3Util.toScanRangeLocation("hdfs:/dir/file.path",
props).toString();
- Assertions.assertTrue(loc.startsWith("/dir") &&
!loc.startsWith("hdfs://"));
-
- loc = S3Util.toScanRangeLocation("oss://test.com",
rangeProps).toString();
- Assertions.assertTrue(loc.startsWith("s3://"));
- Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc,
null).first, FileSystemType.S3);
-
- loc =
S3Util.toScanRangeLocation("oss://test.oss-dls.aliyuncs.com/path",
rangeProps).toString();
- Assertions.assertTrue(loc.startsWith("oss://test.oss-dls.aliyuncs"));
- Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc,
null).first, FileSystemType.DFS);
-
- loc = S3Util.toScanRangeLocation("cos://test.com",
rangeProps).toString();
- Assertions.assertTrue(loc.startsWith("s3://"));
- Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc,
null).first, FileSystemType.S3);
-
- loc = S3Util.toScanRangeLocation("cosn://test.com",
rangeProps).toString();
- Assertions.assertTrue(loc.startsWith("cosn://"));
- Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc,
null).first, FileSystemType.OFS);
-
- loc = S3Util.toScanRangeLocation("obs://test.com",
rangeProps).toString();
- Assertions.assertTrue(loc.startsWith("s3://"));
- Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc,
null).first, FileSystemType.S3);
-
- loc = S3Util.toScanRangeLocation("unknown://test.com",
rangeProps).toString();
- Assertions.assertTrue(loc.startsWith("unknown://"));
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]