This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f5af735fa6 [fix](multi-catalog)fix obj file cache and dlf iceberg 
catalog (#21238)
f5af735fa6 is described below

commit f5af735fa65f99ecc60235fd9ac2ab4f0807c9df
Author: slothever <[email protected]>
AuthorDate: Sun Jul 2 21:08:41 2023 +0800

    [fix](multi-catalog)fix obj file cache and dlf iceberg catalog (#21238)
    
    1. fix storage prefix for obj file cache: oss/cos/obs don't need convert to 
s3 prefix , just convert when create split
    2. dlf iceberg catalog: support dlf iceberg table, use s3 file io.
---
 .../java/org/apache/doris/common/util/S3Util.java  | 97 ++++++++++++++++++++--
 .../doris/datasource/hive/HiveMetaStoreCache.java  | 12 ++-
 .../datasource/iceberg/HiveCompatibleCatalog.java  |  8 +-
 .../doris/datasource/iceberg/dlf/DLFCatalog.java   | 31 +++++++
 .../datasource/property/PropertyConverter.java     | 25 +++---
 .../property/constants/S3Properties.java           |  4 +
 .../org/apache/doris/fs/FileSystemFactory.java     |  5 +-
 .../java/org/apache/doris/fs/obj/S3ObjStorage.java | 65 ++-------------
 .../org/apache/doris/fs/remote/RemoteFile.java     |  4 +
 .../planner/external/iceberg/IcebergScanNode.java  |  7 +-
 10 files changed, 174 insertions(+), 84 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
index f1c58feed0..e3ae85461d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
@@ -18,30 +18,54 @@
 package org.apache.doris.common.util;
 
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.datasource.credentials.CloudCredential;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.core.retry.RetryPolicy;
+import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3Configuration;
+
+import java.net.URI;
+import java.time.Duration;
 
 public class S3Util {
     private static final Logger LOG = LogManager.getLogger(S3Util.class);
 
     public static boolean isObjStorage(String location) {
-        return isS3CompatibleObjStorage(location) || 
location.startsWith(FeConstants.FS_PREFIX_OBS);
+        return isObjStorageUseS3Client(location)
+                || location.startsWith(FeConstants.FS_PREFIX_COS)
+                || location.startsWith(FeConstants.FS_PREFIX_OSS)
+                || location.startsWith(FeConstants.FS_PREFIX_OBS);
     }
 
-    private static boolean isS3CompatibleObjStorage(String location) {
+    private static boolean isObjStorageUseS3Client(String location) {
         return location.startsWith(FeConstants.FS_PREFIX_S3)
                 || location.startsWith(FeConstants.FS_PREFIX_S3A)
                 || location.startsWith(FeConstants.FS_PREFIX_S3N)
                 || location.startsWith(FeConstants.FS_PREFIX_GCS)
-                || location.startsWith(FeConstants.FS_PREFIX_BOS)
-                || location.startsWith(FeConstants.FS_PREFIX_COS)
-                || location.startsWith(FeConstants.FS_PREFIX_OSS);
+                || location.startsWith(FeConstants.FS_PREFIX_BOS);
     }
 
-    public static  String convertToS3IfNecessary(String location) {
+    /**
+     * The converted path is used for FE to get metadata
+     * @param location origin location
+     * @return metadata location path. just convert when storage is compatible 
with s3 client.
+     */
+    public static String convertToS3IfNecessary(String location) {
         LOG.debug("try convert location to s3 prefix: " + location);
-        if (isS3CompatibleObjStorage(location)) {
+        if (isObjStorageUseS3Client(location)) {
             int pos = location.indexOf("://");
             if (pos == -1) {
                 throw new RuntimeException("No '://' found in location: " + 
location);
@@ -51,4 +75,63 @@ public class S3Util {
         return location;
     }
 
+    /**
+     * The converted path is used for BE
+     * @param location origin split path
+     * @return BE scan range path
+     */
+    public static Path toScanRangeLocation(String location) {
+        // All storage will use s3 client on BE.
+        if (isObjStorage(location)) {
+            int pos = location.indexOf("://");
+            if (pos == -1) {
+                throw new RuntimeException("No '://' found in location: " + 
location);
+            }
+            location = "s3" + location.substring(pos);
+        }
+        return new Path(location);
+    }
+
+    public static S3Client buildS3Client(URI endpoint, String region, 
CloudCredential credential) {
+        StaticCredentialsProvider scp;
+        AwsCredentials awsCredential;
+        if (!credential.isTemporary()) {
+            awsCredential = 
AwsBasicCredentials.create(credential.getAccessKey(), 
credential.getSecretKey());
+        } else {
+            awsCredential = 
AwsSessionCredentials.create(credential.getAccessKey(), 
credential.getSecretKey(),
+                        credential.getSessionToken());
+        }
+        scp = StaticCredentialsProvider.create(awsCredential);
+        EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy
+                .builder()
+                .baseDelay(Duration.ofSeconds(1))
+                .maxBackoffTime(Duration.ofMinutes(1))
+                .build();
+        // retry 3 time with Equal backoff
+        RetryPolicy retryPolicy = RetryPolicy
+                .builder()
+                .numRetries(3)
+                .backoffStrategy(backoffStrategy)
+                .build();
+        ClientOverrideConfiguration clientConf = ClientOverrideConfiguration
+                .builder()
+                // set retry policy
+                .retryPolicy(retryPolicy)
+                // using AwsS3V4Signer
+                .putAdvancedOption(SdkAdvancedClientOption.SIGNER, 
AwsS3V4Signer.create())
+                .build();
+        return S3Client.builder()
+                .httpClient(UrlConnectionHttpClient.create())
+                .endpointOverride(endpoint)
+                .credentialsProvider(scp)
+                .region(Region.of(region))
+                .overrideConfiguration(clientConf)
+                // disable chunkedEncoding because of bos not supported
+                // use virtual hosted-style access
+                .serviceConfiguration(S3Configuration.builder()
+                        .chunkedEncodingEnabled(false)
+                        .pathStyleAccessEnabled(false)
+                        .build())
+                .build();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index d64cdf477b..0267668ed3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -309,7 +309,14 @@ public class HiveMetaStoreCache {
             // So we need to recursively list data location.
             // 
https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
             RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, 
true);
-            locatedFiles.files().forEach(result::addFile);
+            for (RemoteFile remoteFile : locatedFiles.files()) {
+                Path srcPath = remoteFile.getPath();
+                Path convertedPath = 
S3Util.toScanRangeLocation(srcPath.toString());
+                if (!convertedPath.toString().equals(srcPath.toString())) {
+                    remoteFile.setPath(convertedPath);
+                }
+                result.addFile(remoteFile);
+            }
         } catch (Exception e) {
             // User may manually remove partition under HDFS, in this case,
             // Hive doesn't aware that the removed partition is missing.
@@ -362,7 +369,8 @@ public class HiveMetaStoreCache {
                     for (int i = 0; i < splits.length; i++) {
                         org.apache.hadoop.mapred.FileSplit fs = 
((org.apache.hadoop.mapred.FileSplit) splits[i]);
                         // todo: get modification time
-                        result.addSplit(new FileSplit(fs.getPath(), 
fs.getStart(), fs.getLength(), -1, null, null));
+                        Path splitFilePath = 
S3Util.toScanRangeLocation(fs.getPath().toString());
+                        result.addSplit(new FileSplit(splitFilePath, 
fs.getStart(), fs.getLength(), -1, null, null));
                     }
                 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java
index 9c31f102a0..4cdba3523d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java
@@ -24,13 +24,13 @@ import org.apache.iceberg.BaseMetastoreCatalog;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.ClientPool;
-import org.apache.iceberg.aws.s3.S3FileIO;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.FileIO;
 import shade.doris.hive.org.apache.thrift.TException;
 
@@ -57,7 +57,11 @@ public abstract class HiveCompatibleCatalog extends 
BaseMetastoreCatalog impleme
     protected FileIO initializeFileIO(Map<String, String> properties, 
Configuration hadoopConf) {
         String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
         if (fileIOImpl == null) {
-            FileIO io = new S3FileIO();
+            /* when use the S3FileIO, we need some custom configurations,
+             * so HadoopFileIO is used in the superclass by default
+             * we can add better implementations to derived class just like 
the implementation in DLFCatalog.
+             */
+            FileIO io = new HadoopFileIO(hadoopConf);
             io.initialize(properties);
             return io;
         } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java
index 940a437dbe..24f2df5acd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java
@@ -17,12 +17,21 @@
 
 package org.apache.doris.datasource.iceberg.dlf;
 
+import org.apache.doris.common.util.S3Util;
+import org.apache.doris.datasource.credentials.CloudCredential;
 import org.apache.doris.datasource.iceberg.HiveCompatibleCatalog;
 import org.apache.doris.datasource.iceberg.dlf.client.DLFCachedClientPool;
+import org.apache.doris.datasource.property.constants.OssProperties;
+import org.apache.doris.datasource.property.constants.S3Properties;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.aliyun.oss.Constants;
 import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.s3.S3FileIO;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.FileIO;
 
+import java.net.URI;
 import java.util.Map;
 
 public class DLFCatalog extends HiveCompatibleCatalog {
@@ -38,4 +47,26 @@ public class DLFCatalog extends HiveCompatibleCatalog {
         String tableName = tableIdentifier.name();
         return new DLFTableOperations(this.conf, this.clients, this.fileIO, 
this.uid, dbName, tableName);
     }
+
+    protected FileIO initializeFileIO(Map<String, String> properties, 
Configuration hadoopConf) {
+        // read from converted properties or default by old s3 aws properties
+        String endpoint = properties.getOrDefault(Constants.ENDPOINT_KEY, 
properties.get(S3Properties.Env.ENDPOINT));
+        CloudCredential credential = new CloudCredential();
+        
credential.setAccessKey(properties.getOrDefault(OssProperties.ACCESS_KEY,
+                    properties.get(S3Properties.Env.ACCESS_KEY)));
+        
credential.setSecretKey(properties.getOrDefault(OssProperties.SECRET_KEY,
+                    properties.get(S3Properties.Env.SECRET_KEY)));
+        if (properties.containsKey(OssProperties.SESSION_TOKEN)
+                || properties.containsKey(S3Properties.Env.TOKEN)) {
+            
credential.setSessionToken(properties.getOrDefault(OssProperties.SESSION_TOKEN,
+                    properties.get(S3Properties.Env.TOKEN)));
+        }
+        String region = properties.getOrDefault(OssProperties.REGION, 
properties.get(S3Properties.Env.REGION));
+        // s3 file io just supports s3-like endpoint
+        String s3Endpoint = endpoint.replace(region, "s3." + region);
+        URI endpointUri = URI.create(s3Endpoint);
+        FileIO io = new S3FileIO(() -> S3Util.buildS3Client(endpointUri, 
region, credential));
+        io.initialize(properties);
+        return io;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
index 1c4769d206..094871d9f5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
@@ -314,29 +314,28 @@ public class PropertyConverter {
         if (Strings.isNullOrEmpty(uid)) {
             throw new IllegalArgumentException("Required dlf property: " + 
DataLakeConfig.CATALOG_USER_ID);
         }
-        // access OSS by AWS client, so set s3 parameters
-        getAWSPropertiesFromDLFConf(props, hiveConf);
+        getOSSPropertiesFromDLFConf(props, hiveConf);
     }
 
-    private static void getAWSPropertiesFromDLFConf(Map<String, String> props, 
HiveConf hiveConf) {
+    private static void getOSSPropertiesFromDLFConf(Map<String, String> props, 
HiveConf hiveConf) {
         // get following properties from hive-site.xml
         // 1. region and endpoint. eg: cn-beijing
         String region = hiveConf.get(DataLakeConfig.CATALOG_REGION_ID);
         if (!Strings.isNullOrEmpty(region)) {
             // See: https://help.aliyun.com/document_detail/31837.html
             // And add "-internal" to access oss within vpc
-            props.put(S3Properties.REGION, "oss-" + region);
+            props.put(OssProperties.REGION, "oss-" + region);
             String publicAccess = hiveConf.get("dlf.catalog.accessPublic", 
"false");
-            props.put(S3Properties.ENDPOINT, getOssEndpoint(region, 
Boolean.parseBoolean(publicAccess)));
+            props.put(OssProperties.ENDPOINT, getOssEndpoint(region, 
Boolean.parseBoolean(publicAccess)));
         }
         // 2. ak and sk
         String ak = hiveConf.get(DataLakeConfig.CATALOG_ACCESS_KEY_ID);
         String sk = hiveConf.get(DataLakeConfig.CATALOG_ACCESS_KEY_SECRET);
         if (!Strings.isNullOrEmpty(ak)) {
-            props.put(S3Properties.ACCESS_KEY, ak);
+            props.put(OssProperties.ACCESS_KEY, ak);
         }
         if (!Strings.isNullOrEmpty(sk)) {
-            props.put(S3Properties.SECRET_KEY, sk);
+            props.put(OssProperties.SECRET_KEY, sk);
         }
         if (LOG.isDebugEnabled()) {
             LOG.debug("Get properties for oss in hive-site.xml: {}", props);
@@ -369,19 +368,19 @@ public class PropertyConverter {
         if (Strings.isNullOrEmpty(uid)) {
             throw new IllegalArgumentException("Required dlf property: " + 
DataLakeConfig.CATALOG_USER_ID);
         }
-        // convert to s3 client property
+        // convert to oss property
         if (credential.isWhole()) {
-            props.put(S3Properties.ACCESS_KEY, credential.getAccessKey());
-            props.put(S3Properties.SECRET_KEY, credential.getSecretKey());
+            props.put(OssProperties.ACCESS_KEY, credential.getAccessKey());
+            props.put(OssProperties.SECRET_KEY, credential.getSecretKey());
         }
         if (credential.isTemporary()) {
-            props.put(S3Properties.SESSION_TOKEN, 
credential.getSessionToken());
+            props.put(OssProperties.SESSION_TOKEN, 
credential.getSessionToken());
         }
         String publicAccess = 
props.getOrDefault(DLFProperties.Site.ACCESS_PUBLIC, "false");
         String region = props.getOrDefault(DataLakeConfig.CATALOG_REGION_ID, 
props.get(DLFProperties.REGION));
         if (!Strings.isNullOrEmpty(region)) {
-            props.put(S3Properties.REGION, "oss-" + region);
-            props.put(S3Properties.ENDPOINT, getOssEndpoint(region, 
Boolean.parseBoolean(publicAccess)));
+            props.put(OssProperties.REGION, "oss-" + region);
+            props.put(OssProperties.ENDPOINT, getOssEndpoint(region, 
Boolean.parseBoolean(publicAccess)));
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
index 945cfc386f..b5a96ddcec 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
@@ -126,6 +126,10 @@ public class S3Properties extends BaseProperties {
         if (endpointSplit.length < 2) {
             return null;
         }
+        if (endpointSplit[0].startsWith("oss-")) {
+            // compatible with the endpoint: oss-cn-bejing.aliyuncs.com
+            return endpointSplit[0];
+        }
         return endpointSplit[1];
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
index a27ccd88de..5ca3e1f21f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
@@ -56,7 +56,10 @@ public class FileSystemFactory {
         // TODO: need optimize the method. the conf is converted many times.
         Map<String, String> properties = new HashMap<>();
         conf.iterator().forEachRemaining(e -> properties.put(e.getKey(), 
e.getValue()));
-        if (location.startsWith(FeConstants.FS_PREFIX_S3) || 
location.startsWith(FeConstants.FS_PREFIX_OBS)) {
+        if (location.startsWith(FeConstants.FS_PREFIX_S3)
+                || location.startsWith(FeConstants.FS_PREFIX_OSS)
+                || location.startsWith(FeConstants.FS_PREFIX_COS)
+                || location.startsWith(FeConstants.FS_PREFIX_OBS)) {
             return new S3FileSystem(properties);
         } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS) || 
location.startsWith(FeConstants.FS_PREFIX_GFS)) {
             return new DFSFileSystem(properties);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index 2bc59865ce..930988e54b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -21,6 +21,8 @@ import org.apache.doris.backup.Status;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.S3URI;
+import org.apache.doris.common.util.S3Util;
+import org.apache.doris.datasource.credentials.CloudCredential;
 import org.apache.doris.datasource.property.PropertyConverter;
 import org.apache.doris.datasource.property.constants.S3Properties;
 
@@ -31,19 +33,8 @@ import org.apache.http.client.utils.URIBuilder;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.jetbrains.annotations.Nullable;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
-import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
-import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
-import software.amazon.awssdk.core.retry.RetryPolicy;
-import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
 import software.amazon.awssdk.core.sync.RequestBody;
-import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
-import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.S3Configuration;
 import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
 import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
 import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
@@ -61,7 +52,6 @@ import software.amazon.awssdk.services.s3.model.S3Object;
 
 import java.io.File;
 import java.net.URI;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -132,52 +122,15 @@ public class S3ObjStorage implements ObjStorage<S3Client> 
{
     public S3Client getClient(String bucket) throws UserException {
         if (client == null) {
             URI tmpEndpoint = 
URI.create(properties.get(S3Properties.ENDPOINT));
-            StaticCredentialsProvider scp;
-            if (!properties.containsKey(S3Properties.SESSION_TOKEN)) {
-                AwsBasicCredentials awsBasic = AwsBasicCredentials.create(
-                        properties.get(S3Properties.ACCESS_KEY),
-                        properties.get(S3Properties.SECRET_KEY));
-                scp = StaticCredentialsProvider.create(awsBasic);
-            } else {
-                AwsSessionCredentials awsSession = 
AwsSessionCredentials.create(
-                        properties.get(S3Properties.ACCESS_KEY),
-                        properties.get(S3Properties.SECRET_KEY),
-                        properties.get(S3Properties.SESSION_TOKEN));
-                scp = StaticCredentialsProvider.create(awsSession);
-            }
-            EqualJitterBackoffStrategy backoffStrategy = 
EqualJitterBackoffStrategy
-                    .builder()
-                    .baseDelay(Duration.ofSeconds(1))
-                    .maxBackoffTime(Duration.ofMinutes(1))
-                    .build();
-            // retry 3 time with Equal backoff
-            RetryPolicy retryPolicy = RetryPolicy
-                    .builder()
-                    .numRetries(3)
-                    .backoffStrategy(backoffStrategy)
-                    .build();
-            ClientOverrideConfiguration clientConf = 
ClientOverrideConfiguration
-                    .builder()
-                    // set retry policy
-                    .retryPolicy(retryPolicy)
-                    // using AwsS3V4Signer
-                    .putAdvancedOption(SdkAdvancedClientOption.SIGNER, 
AwsS3V4Signer.create())
-                    .build();
             URI endpoint = StringUtils.isEmpty(bucket) ? tmpEndpoint :
                     URI.create(new URIBuilder(tmpEndpoint).setHost(bucket + 
"." + tmpEndpoint.getHost()).toString());
-            client = S3Client.builder()
-                    .httpClient(UrlConnectionHttpClient.create())
-                    .endpointOverride(endpoint)
-                    .credentialsProvider(scp)
-                    .region(Region.of(properties.get(S3Properties.REGION)))
-                    .overrideConfiguration(clientConf)
-                    // disable chunkedEncoding because of bos not supported
-                    // use virtual hosted-style access
-                    .serviceConfiguration(S3Configuration.builder()
-                            .chunkedEncodingEnabled(false)
-                            .pathStyleAccessEnabled(false)
-                            .build())
-                    .build();
+            CloudCredential credential = new CloudCredential();
+            credential.setAccessKey(properties.get(S3Properties.ACCESS_KEY));
+            credential.setSecretKey(properties.get(S3Properties.SECRET_KEY));
+            if (properties.containsKey(S3Properties.SESSION_TOKEN)) {
+                
credential.setSessionToken(properties.get(S3Properties.SESSION_TOKEN));
+            }
+            client = S3Util.buildS3Client(endpoint, 
properties.get(S3Properties.REGION), credential);
         }
         return client;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
index a8d918cffa..328247e814 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
@@ -75,6 +75,10 @@ public class RemoteFile {
         return path;
     }
 
+    public void setPath(Path path) {
+        this.path = path;
+    }
+
     public boolean isFile() {
         return isFile;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index 09320e6f59..bc23cec092 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -120,7 +120,8 @@ public class IcebergScanNode extends FileQueryScanNode {
         } else {
             for (IcebergDeleteFileFilter filter : 
icebergSplit.getDeleteFileFilters()) {
                 TIcebergDeleteFileDesc deleteFileDesc = new 
TIcebergDeleteFileDesc();
-                deleteFileDesc.setPath(filter.getDeleteFilePath());
+                String deleteFilePath = filter.getDeleteFilePath();
+                
deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath).toString());
                 if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
                     fileDesc.setContent(FileContent.POSITION_DELETES.id());
                     IcebergDeleteFileFilter.PositionDelete positionDelete =
@@ -182,8 +183,8 @@ public class IcebergScanNode extends FileQueryScanNode {
             long fileSize = task.file().fileSizeInBytes();
             for (FileScanTask splitTask : task.split(splitSize)) {
                 String dataFilePath = splitTask.file().path().toString();
-                String finalDataFilePath = 
S3Util.convertToS3IfNecessary(dataFilePath);
-                IcebergSplit split = new IcebergSplit(new 
Path(finalDataFilePath), splitTask.start(),
+                Path finalDataFilePath = 
S3Util.toScanRangeLocation(dataFilePath);
+                IcebergSplit split = new IcebergSplit(finalDataFilePath, 
splitTask.start(),
                         splitTask.length(), fileSize, new String[0]);
                 split.setFormatVersion(formatVersion);
                 if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to