This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f5af735fa6 [fix](multi-catalog)fix obj file cache and dlf iceberg
catalog (#21238)
f5af735fa6 is described below
commit f5af735fa65f99ecc60235fd9ac2ab4f0807c9df
Author: slothever <[email protected]>
AuthorDate: Sun Jul 2 21:08:41 2023 +0800
[fix](multi-catalog)fix obj file cache and dlf iceberg catalog (#21238)
1. fix storage prefix for obj file cache: oss/cos/obs don't need convert to
s3 prefix , just convert when create split
2. dlf iceberg catalog: support dlf iceberg table, use s3 file io.
---
.../java/org/apache/doris/common/util/S3Util.java | 97 ++++++++++++++++++++--
.../doris/datasource/hive/HiveMetaStoreCache.java | 12 ++-
.../datasource/iceberg/HiveCompatibleCatalog.java | 8 +-
.../doris/datasource/iceberg/dlf/DLFCatalog.java | 31 +++++++
.../datasource/property/PropertyConverter.java | 25 +++---
.../property/constants/S3Properties.java | 4 +
.../org/apache/doris/fs/FileSystemFactory.java | 5 +-
.../java/org/apache/doris/fs/obj/S3ObjStorage.java | 65 ++-------------
.../org/apache/doris/fs/remote/RemoteFile.java | 4 +
.../planner/external/iceberg/IcebergScanNode.java | 7 +-
10 files changed, 174 insertions(+), 84 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
index f1c58feed0..e3ae85461d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
@@ -18,30 +18,54 @@
package org.apache.doris.common.util;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.datasource.credentials.CloudCredential;
+import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.core.retry.RetryPolicy;
+import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3Configuration;
+
+import java.net.URI;
+import java.time.Duration;
public class S3Util {
private static final Logger LOG = LogManager.getLogger(S3Util.class);
public static boolean isObjStorage(String location) {
- return isS3CompatibleObjStorage(location) ||
location.startsWith(FeConstants.FS_PREFIX_OBS);
+ return isObjStorageUseS3Client(location)
+ || location.startsWith(FeConstants.FS_PREFIX_COS)
+ || location.startsWith(FeConstants.FS_PREFIX_OSS)
+ || location.startsWith(FeConstants.FS_PREFIX_OBS);
}
- private static boolean isS3CompatibleObjStorage(String location) {
+ private static boolean isObjStorageUseS3Client(String location) {
return location.startsWith(FeConstants.FS_PREFIX_S3)
|| location.startsWith(FeConstants.FS_PREFIX_S3A)
|| location.startsWith(FeConstants.FS_PREFIX_S3N)
|| location.startsWith(FeConstants.FS_PREFIX_GCS)
- || location.startsWith(FeConstants.FS_PREFIX_BOS)
- || location.startsWith(FeConstants.FS_PREFIX_COS)
- || location.startsWith(FeConstants.FS_PREFIX_OSS);
+ || location.startsWith(FeConstants.FS_PREFIX_BOS);
}
- public static String convertToS3IfNecessary(String location) {
+ /**
+ * The converted path is used for FE to get metadata
+ * @param location origin location
+ * @return metadata location path. just convert when storage is compatible
with s3 client.
+ */
+ public static String convertToS3IfNecessary(String location) {
LOG.debug("try convert location to s3 prefix: " + location);
- if (isS3CompatibleObjStorage(location)) {
+ if (isObjStorageUseS3Client(location)) {
int pos = location.indexOf("://");
if (pos == -1) {
throw new RuntimeException("No '://' found in location: " +
location);
@@ -51,4 +75,63 @@ public class S3Util {
return location;
}
+ /**
+ * The converted path is used for BE
+ * @param location origin split path
+ * @return BE scan range path
+ */
+ public static Path toScanRangeLocation(String location) {
+ // All storage will use s3 client on BE.
+ if (isObjStorage(location)) {
+ int pos = location.indexOf("://");
+ if (pos == -1) {
+ throw new RuntimeException("No '://' found in location: " +
location);
+ }
+ location = "s3" + location.substring(pos);
+ }
+ return new Path(location);
+ }
+
+ public static S3Client buildS3Client(URI endpoint, String region,
CloudCredential credential) {
+ StaticCredentialsProvider scp;
+ AwsCredentials awsCredential;
+ if (!credential.isTemporary()) {
+ awsCredential =
AwsBasicCredentials.create(credential.getAccessKey(),
credential.getSecretKey());
+ } else {
+ awsCredential =
AwsSessionCredentials.create(credential.getAccessKey(),
credential.getSecretKey(),
+ credential.getSessionToken());
+ }
+ scp = StaticCredentialsProvider.create(awsCredential);
+ EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy
+ .builder()
+ .baseDelay(Duration.ofSeconds(1))
+ .maxBackoffTime(Duration.ofMinutes(1))
+ .build();
+ // retry 3 time with Equal backoff
+ RetryPolicy retryPolicy = RetryPolicy
+ .builder()
+ .numRetries(3)
+ .backoffStrategy(backoffStrategy)
+ .build();
+ ClientOverrideConfiguration clientConf = ClientOverrideConfiguration
+ .builder()
+ // set retry policy
+ .retryPolicy(retryPolicy)
+ // using AwsS3V4Signer
+ .putAdvancedOption(SdkAdvancedClientOption.SIGNER,
AwsS3V4Signer.create())
+ .build();
+ return S3Client.builder()
+ .httpClient(UrlConnectionHttpClient.create())
+ .endpointOverride(endpoint)
+ .credentialsProvider(scp)
+ .region(Region.of(region))
+ .overrideConfiguration(clientConf)
+ // disable chunkedEncoding because of bos not supported
+ // use virtual hosted-style access
+ .serviceConfiguration(S3Configuration.builder()
+ .chunkedEncodingEnabled(false)
+ .pathStyleAccessEnabled(false)
+ .build())
+ .build();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index d64cdf477b..0267668ed3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -309,7 +309,14 @@ public class HiveMetaStoreCache {
// So we need to recursively list data location.
//
https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true,
true);
- locatedFiles.files().forEach(result::addFile);
+ for (RemoteFile remoteFile : locatedFiles.files()) {
+ Path srcPath = remoteFile.getPath();
+ Path convertedPath =
S3Util.toScanRangeLocation(srcPath.toString());
+ if (!convertedPath.toString().equals(srcPath.toString())) {
+ remoteFile.setPath(convertedPath);
+ }
+ result.addFile(remoteFile);
+ }
} catch (Exception e) {
// User may manually remove partition under HDFS, in this case,
// Hive doesn't aware that the removed partition is missing.
@@ -362,7 +369,8 @@ public class HiveMetaStoreCache {
for (int i = 0; i < splits.length; i++) {
org.apache.hadoop.mapred.FileSplit fs =
((org.apache.hadoop.mapred.FileSplit) splits[i]);
// todo: get modification time
- result.addSplit(new FileSplit(fs.getPath(),
fs.getStart(), fs.getLength(), -1, null, null));
+ Path splitFilePath =
S3Util.toScanRangeLocation(fs.getPath().toString());
+ result.addSplit(new FileSplit(splitFilePath,
fs.getStart(), fs.getLength(), -1, null, null));
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java
index 9c31f102a0..4cdba3523d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java
@@ -24,13 +24,13 @@ import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ClientPool;
-import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import shade.doris.hive.org.apache.thrift.TException;
@@ -57,7 +57,11 @@ public abstract class HiveCompatibleCatalog extends
BaseMetastoreCatalog impleme
protected FileIO initializeFileIO(Map<String, String> properties,
Configuration hadoopConf) {
String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
if (fileIOImpl == null) {
- FileIO io = new S3FileIO();
+ /* when use the S3FileIO, we need some custom configurations,
+ * so HadoopFileIO is used in the superclass by default
+ * we can add better implementations to derived class just like
the implementation in DLFCatalog.
+ */
+ FileIO io = new HadoopFileIO(hadoopConf);
io.initialize(properties);
return io;
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java
index 940a437dbe..24f2df5acd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java
@@ -17,12 +17,21 @@
package org.apache.doris.datasource.iceberg.dlf;
+import org.apache.doris.common.util.S3Util;
+import org.apache.doris.datasource.credentials.CloudCredential;
import org.apache.doris.datasource.iceberg.HiveCompatibleCatalog;
import org.apache.doris.datasource.iceberg.dlf.client.DLFCachedClientPool;
+import org.apache.doris.datasource.property.constants.OssProperties;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.aliyun.oss.Constants;
import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.FileIO;
+import java.net.URI;
import java.util.Map;
public class DLFCatalog extends HiveCompatibleCatalog {
@@ -38,4 +47,26 @@ public class DLFCatalog extends HiveCompatibleCatalog {
String tableName = tableIdentifier.name();
return new DLFTableOperations(this.conf, this.clients, this.fileIO,
this.uid, dbName, tableName);
}
+
+ protected FileIO initializeFileIO(Map<String, String> properties,
Configuration hadoopConf) {
+ // read from converted properties or default by old s3 aws properties
+ String endpoint = properties.getOrDefault(Constants.ENDPOINT_KEY,
properties.get(S3Properties.Env.ENDPOINT));
+ CloudCredential credential = new CloudCredential();
+
credential.setAccessKey(properties.getOrDefault(OssProperties.ACCESS_KEY,
+ properties.get(S3Properties.Env.ACCESS_KEY)));
+
credential.setSecretKey(properties.getOrDefault(OssProperties.SECRET_KEY,
+ properties.get(S3Properties.Env.SECRET_KEY)));
+ if (properties.containsKey(OssProperties.SESSION_TOKEN)
+ || properties.containsKey(S3Properties.Env.TOKEN)) {
+
credential.setSessionToken(properties.getOrDefault(OssProperties.SESSION_TOKEN,
+ properties.get(S3Properties.Env.TOKEN)));
+ }
+ String region = properties.getOrDefault(OssProperties.REGION,
properties.get(S3Properties.Env.REGION));
+ // s3 file io just supports s3-like endpoint
+ String s3Endpoint = endpoint.replace(region, "s3." + region);
+ URI endpointUri = URI.create(s3Endpoint);
+ FileIO io = new S3FileIO(() -> S3Util.buildS3Client(endpointUri,
region, credential));
+ io.initialize(properties);
+ return io;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
index 1c4769d206..094871d9f5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
@@ -314,29 +314,28 @@ public class PropertyConverter {
if (Strings.isNullOrEmpty(uid)) {
throw new IllegalArgumentException("Required dlf property: " +
DataLakeConfig.CATALOG_USER_ID);
}
- // access OSS by AWS client, so set s3 parameters
- getAWSPropertiesFromDLFConf(props, hiveConf);
+ getOSSPropertiesFromDLFConf(props, hiveConf);
}
- private static void getAWSPropertiesFromDLFConf(Map<String, String> props,
HiveConf hiveConf) {
+ private static void getOSSPropertiesFromDLFConf(Map<String, String> props,
HiveConf hiveConf) {
// get following properties from hive-site.xml
// 1. region and endpoint. eg: cn-beijing
String region = hiveConf.get(DataLakeConfig.CATALOG_REGION_ID);
if (!Strings.isNullOrEmpty(region)) {
// See: https://help.aliyun.com/document_detail/31837.html
// And add "-internal" to access oss within vpc
- props.put(S3Properties.REGION, "oss-" + region);
+ props.put(OssProperties.REGION, "oss-" + region);
String publicAccess = hiveConf.get("dlf.catalog.accessPublic",
"false");
- props.put(S3Properties.ENDPOINT, getOssEndpoint(region,
Boolean.parseBoolean(publicAccess)));
+ props.put(OssProperties.ENDPOINT, getOssEndpoint(region,
Boolean.parseBoolean(publicAccess)));
}
// 2. ak and sk
String ak = hiveConf.get(DataLakeConfig.CATALOG_ACCESS_KEY_ID);
String sk = hiveConf.get(DataLakeConfig.CATALOG_ACCESS_KEY_SECRET);
if (!Strings.isNullOrEmpty(ak)) {
- props.put(S3Properties.ACCESS_KEY, ak);
+ props.put(OssProperties.ACCESS_KEY, ak);
}
if (!Strings.isNullOrEmpty(sk)) {
- props.put(S3Properties.SECRET_KEY, sk);
+ props.put(OssProperties.SECRET_KEY, sk);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Get properties for oss in hive-site.xml: {}", props);
@@ -369,19 +368,19 @@ public class PropertyConverter {
if (Strings.isNullOrEmpty(uid)) {
throw new IllegalArgumentException("Required dlf property: " +
DataLakeConfig.CATALOG_USER_ID);
}
- // convert to s3 client property
+ // convert to oss property
if (credential.isWhole()) {
- props.put(S3Properties.ACCESS_KEY, credential.getAccessKey());
- props.put(S3Properties.SECRET_KEY, credential.getSecretKey());
+ props.put(OssProperties.ACCESS_KEY, credential.getAccessKey());
+ props.put(OssProperties.SECRET_KEY, credential.getSecretKey());
}
if (credential.isTemporary()) {
- props.put(S3Properties.SESSION_TOKEN,
credential.getSessionToken());
+ props.put(OssProperties.SESSION_TOKEN,
credential.getSessionToken());
}
String publicAccess =
props.getOrDefault(DLFProperties.Site.ACCESS_PUBLIC, "false");
String region = props.getOrDefault(DataLakeConfig.CATALOG_REGION_ID,
props.get(DLFProperties.REGION));
if (!Strings.isNullOrEmpty(region)) {
- props.put(S3Properties.REGION, "oss-" + region);
- props.put(S3Properties.ENDPOINT, getOssEndpoint(region,
Boolean.parseBoolean(publicAccess)));
+ props.put(OssProperties.REGION, "oss-" + region);
+ props.put(OssProperties.ENDPOINT, getOssEndpoint(region,
Boolean.parseBoolean(publicAccess)));
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
index 945cfc386f..b5a96ddcec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
@@ -126,6 +126,10 @@ public class S3Properties extends BaseProperties {
if (endpointSplit.length < 2) {
return null;
}
+ if (endpointSplit[0].startsWith("oss-")) {
+ // compatible with the endpoint: oss-cn-bejing.aliyuncs.com
+ return endpointSplit[0];
+ }
return endpointSplit[1];
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
index a27ccd88de..5ca3e1f21f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
@@ -56,7 +56,10 @@ public class FileSystemFactory {
// TODO: need optimize the method. the conf is converted many times.
Map<String, String> properties = new HashMap<>();
conf.iterator().forEachRemaining(e -> properties.put(e.getKey(),
e.getValue()));
- if (location.startsWith(FeConstants.FS_PREFIX_S3) ||
location.startsWith(FeConstants.FS_PREFIX_OBS)) {
+ if (location.startsWith(FeConstants.FS_PREFIX_S3)
+ || location.startsWith(FeConstants.FS_PREFIX_OSS)
+ || location.startsWith(FeConstants.FS_PREFIX_COS)
+ || location.startsWith(FeConstants.FS_PREFIX_OBS)) {
return new S3FileSystem(properties);
} else if (location.startsWith(FeConstants.FS_PREFIX_HDFS) ||
location.startsWith(FeConstants.FS_PREFIX_GFS)) {
return new DFSFileSystem(properties);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index 2bc59865ce..930988e54b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -21,6 +21,8 @@ import org.apache.doris.backup.Status;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3URI;
+import org.apache.doris.common.util.S3Util;
+import org.apache.doris.datasource.credentials.CloudCredential;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
@@ -31,19 +33,8 @@ import org.apache.http.client.utils.URIBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
-import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
-import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
-import software.amazon.awssdk.core.retry.RetryPolicy;
-import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
import software.amazon.awssdk.core.sync.RequestBody;
-import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
-import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
@@ -61,7 +52,6 @@ import software.amazon.awssdk.services.s3.model.S3Object;
import java.io.File;
import java.net.URI;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -132,52 +122,15 @@ public class S3ObjStorage implements ObjStorage<S3Client>
{
public S3Client getClient(String bucket) throws UserException {
if (client == null) {
URI tmpEndpoint =
URI.create(properties.get(S3Properties.ENDPOINT));
- StaticCredentialsProvider scp;
- if (!properties.containsKey(S3Properties.SESSION_TOKEN)) {
- AwsBasicCredentials awsBasic = AwsBasicCredentials.create(
- properties.get(S3Properties.ACCESS_KEY),
- properties.get(S3Properties.SECRET_KEY));
- scp = StaticCredentialsProvider.create(awsBasic);
- } else {
- AwsSessionCredentials awsSession =
AwsSessionCredentials.create(
- properties.get(S3Properties.ACCESS_KEY),
- properties.get(S3Properties.SECRET_KEY),
- properties.get(S3Properties.SESSION_TOKEN));
- scp = StaticCredentialsProvider.create(awsSession);
- }
- EqualJitterBackoffStrategy backoffStrategy =
EqualJitterBackoffStrategy
- .builder()
- .baseDelay(Duration.ofSeconds(1))
- .maxBackoffTime(Duration.ofMinutes(1))
- .build();
- // retry 3 time with Equal backoff
- RetryPolicy retryPolicy = RetryPolicy
- .builder()
- .numRetries(3)
- .backoffStrategy(backoffStrategy)
- .build();
- ClientOverrideConfiguration clientConf =
ClientOverrideConfiguration
- .builder()
- // set retry policy
- .retryPolicy(retryPolicy)
- // using AwsS3V4Signer
- .putAdvancedOption(SdkAdvancedClientOption.SIGNER,
AwsS3V4Signer.create())
- .build();
URI endpoint = StringUtils.isEmpty(bucket) ? tmpEndpoint :
URI.create(new URIBuilder(tmpEndpoint).setHost(bucket +
"." + tmpEndpoint.getHost()).toString());
- client = S3Client.builder()
- .httpClient(UrlConnectionHttpClient.create())
- .endpointOverride(endpoint)
- .credentialsProvider(scp)
- .region(Region.of(properties.get(S3Properties.REGION)))
- .overrideConfiguration(clientConf)
- // disable chunkedEncoding because of bos not supported
- // use virtual hosted-style access
- .serviceConfiguration(S3Configuration.builder()
- .chunkedEncodingEnabled(false)
- .pathStyleAccessEnabled(false)
- .build())
- .build();
+ CloudCredential credential = new CloudCredential();
+ credential.setAccessKey(properties.get(S3Properties.ACCESS_KEY));
+ credential.setSecretKey(properties.get(S3Properties.SECRET_KEY));
+ if (properties.containsKey(S3Properties.SESSION_TOKEN)) {
+
credential.setSessionToken(properties.get(S3Properties.SESSION_TOKEN));
+ }
+ client = S3Util.buildS3Client(endpoint,
properties.get(S3Properties.REGION), credential);
}
return client;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
index a8d918cffa..328247e814 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
@@ -75,6 +75,10 @@ public class RemoteFile {
return path;
}
+ public void setPath(Path path) {
+ this.path = path;
+ }
+
public boolean isFile() {
return isFile;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index 09320e6f59..bc23cec092 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -120,7 +120,8 @@ public class IcebergScanNode extends FileQueryScanNode {
} else {
for (IcebergDeleteFileFilter filter :
icebergSplit.getDeleteFileFilters()) {
TIcebergDeleteFileDesc deleteFileDesc = new
TIcebergDeleteFileDesc();
- deleteFileDesc.setPath(filter.getDeleteFilePath());
+ String deleteFilePath = filter.getDeleteFilePath();
+
deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath).toString());
if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
fileDesc.setContent(FileContent.POSITION_DELETES.id());
IcebergDeleteFileFilter.PositionDelete positionDelete =
@@ -182,8 +183,8 @@ public class IcebergScanNode extends FileQueryScanNode {
long fileSize = task.file().fileSizeInBytes();
for (FileScanTask splitTask : task.split(splitSize)) {
String dataFilePath = splitTask.file().path().toString();
- String finalDataFilePath =
S3Util.convertToS3IfNecessary(dataFilePath);
- IcebergSplit split = new IcebergSplit(new
Path(finalDataFilePath), splitTask.start(),
+ Path finalDataFilePath =
S3Util.toScanRangeLocation(dataFilePath);
+ IcebergSplit split = new IcebergSplit(finalDataFilePath,
splitTask.start(),
splitTask.length(), fileSize, new String[0]);
split.setFormatVersion(formatVersion);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]