This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ba38973  use virtual hosted-style request to access object store 
(#5894)
ba38973 is described below

commit ba38973209e70312e052f0220ef783180a1f4442
Author: Zhengguo Yang <[email protected]>
AuthorDate: Thu May 27 15:52:07 2021 +0800

    use virtual hosted-style request to access object store (#5894)
    
    * use virtual hosted-style access request object store
---
 be/src/common/daemon.cpp                           |  18 +--
 be/src/exec/s3_reader.cpp                          |   4 +-
 be/src/exec/s3_reader.h                            |   2 +-
 be/src/exec/s3_writer.cpp                          |  12 +-
 be/src/exec/s3_writer.h                            |   2 +-
 be/src/util/s3_storage_backend.cpp                 |   2 +-
 be/src/util/s3_storage_backend.h                   |   2 +-
 be/src/util/s3_util.cpp                            |  56 ++++---
 be/src/util/s3_util.h                              |  14 ++
 be/test/exec/s3_reader_test.cpp                    |  15 +-
 .../java/org/apache/doris/backup/S3Storage.java    | 167 ++++++++++++---------
 .../java/org/apache/doris/common/util/S3URI.java   |  28 +++-
 .../org/apache/doris/persist/gson/GsonUtils.java   |   4 +-
 13 files changed, 192 insertions(+), 134 deletions(-)

diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 5b21ea0..d70ee07 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -17,11 +17,9 @@
 
 #include "common/daemon.h"
 
-#include <signal.h>
-
-#include <aws/core/Aws.h>
 #include <gflags/gflags.h>
 #include <gperftools/malloc_extension.h>
+#include <signal.h>
 
 #include "common/config.h"
 #include "exprs/bitmap_function.h"
@@ -68,8 +66,6 @@ namespace doris {
 
 bool k_doris_exit = false;
 
-Aws::SDKOptions aws_options;
-
 void Daemon::tcmalloc_gc_thread() {
     while 
(!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(10))) {
         size_t used_size = 0;
@@ -268,14 +264,6 @@ void Daemon::init(int argc, char** argv, const 
std::vector<StorePath>& paths) {
     HllFunctions::init();
     HashFunctions::init();
     TopNFunctions::init();
-    // disable EC2 metadata service
-    setenv("AWS_EC2_METADATA_DISABLED", "true", false);
-    Aws::Utils::Logging::LogLevel logLevel = 
static_cast<Aws::Utils::Logging::LogLevel>(config::aws_log_level);
-    aws_options.loggingOptions.logLevel = logLevel;
-    aws_options.loggingOptions.logger_create_fn = [logLevel] {
-        return std::make_shared<DorisAWSLogger>(logLevel);
-    };
-    Aws::InitAPI(aws_options);
 
     LOG(INFO) << CpuInfo::debug_string();
     LOG(INFO) << DiskInfo::debug_string();
@@ -303,7 +291,8 @@ void Daemon::start() {
     if (config::enable_metric_calculator) {
         CHECK(DorisMetrics::instance()->is_inited())
                 << "enable metric calculator failed, maybe you set 
enable_system_metrics to false "
-                << " or there may be some hardware error which causes metric 
init failed, please check log first;"
+                << " or there may be some hardware error which causes metric 
init failed, please "
+                   "check log first;"
                 << " you can set enable_metric_calculator = false to quickly 
recover ";
 
         st = Thread::create(
@@ -325,7 +314,6 @@ void Daemon::stop() {
     if (_calculate_metrics_thread) {
         _calculate_metrics_thread->join();
     }
-    Aws::ShutdownAPI(aws_options);
 }
 
 } // namespace doris
diff --git a/be/src/exec/s3_reader.cpp b/be/src/exec/s3_reader.cpp
index ddc71c6..ae5cc3b 100644
--- a/be/src/exec/s3_reader.cpp
+++ b/be/src/exec/s3_reader.cpp
@@ -41,8 +41,8 @@ S3Reader::S3Reader(const std::map<std::string, std::string>& 
properties, const s
           _uri(path),
           _cur_offset(start_offset),
           _file_size(0),
-          _closed(false) {
-    _client = create_client(_properties);
+          _closed(false),
+          _client(ClientFactory::instance().create(_properties)) {
     DCHECK(_client) << "init aws s3 client error.";
 }
 
diff --git a/be/src/exec/s3_reader.h b/be/src/exec/s3_reader.h
index cd5c185..1676ba5 100644
--- a/be/src/exec/s3_reader.h
+++ b/be/src/exec/s3_reader.h
@@ -64,6 +64,6 @@ private:
     int64_t _cur_offset;
     int64_t _file_size;
     bool _closed;
-    std::unique_ptr<Aws::S3::S3Client> _client;
+    std::shared_ptr<Aws::S3::S3Client> _client;
 };
 } // end namespace doris
diff --git a/be/src/exec/s3_writer.cpp b/be/src/exec/s3_writer.cpp
index 557c004..a8cb377 100644
--- a/be/src/exec/s3_writer.cpp
+++ b/be/src/exec/s3_writer.cpp
@@ -41,12 +41,10 @@ S3Writer::S3Writer(const std::map<std::string, 
std::string>& properties, const s
           _path(path),
           _uri(path),
           _sync_needed(false),
-          _temp_file(Aws::MakeShared<Aws::Utils::TempFile>(
-                  "S3WRITER",
-                  // "/tmp/doris_tmp_", "s3tmp",
+          _client(ClientFactory::instance().create(_properties)),
+          _temp_file(std::make_shared<Aws::Utils::TempFile>(
                   std::ios_base::binary | std::ios_base::trunc | 
std::ios_base::in |
-                          std::ios_base::out)) {
-    _client = create_client(_properties);
+                  std::ios_base::out)) {
     DCHECK(_client) << "init aws s3 client error.";
 }
 
@@ -56,8 +54,8 @@ S3Writer::~S3Writer() {
 
 Status S3Writer::open() {
     CHECK_S3_CLIENT(_client);
-    if (!_uri.parse()) {                                             
-        return Status::InvalidArgument("s3 uri is invalid: " + _path); 
+    if (!_uri.parse()) {
+        return Status::InvalidArgument("s3 uri is invalid: " + _path);
     }
     Aws::S3::Model::HeadObjectRequest request;
     request.WithBucket(_uri.get_bucket()).WithKey(_uri.get_key());
diff --git a/be/src/exec/s3_writer.h b/be/src/exec/s3_writer.h
index e070a99..663703f 100644
--- a/be/src/exec/s3_writer.h
+++ b/be/src/exec/s3_writer.h
@@ -53,8 +53,8 @@ private:
     std::string _path;
     S3URI _uri;
     bool _sync_needed;
+    std::shared_ptr<Aws::S3::S3Client> _client;
     std::shared_ptr<Aws::Utils::TempFile> _temp_file;
-    std::unique_ptr<Aws::S3::S3Client> _client;
 };
 
 } // end namespace doris
diff --git a/be/src/util/s3_storage_backend.cpp 
b/be/src/util/s3_storage_backend.cpp
index bd1a7d2..5c786a9 100644
--- a/be/src/util/s3_storage_backend.cpp
+++ b/be/src/util/s3_storage_backend.cpp
@@ -61,7 +61,7 @@ namespace doris {
 
 S3StorageBackend::S3StorageBackend(const std::map<std::string, std::string>& 
prop)
         : _properties(prop) {
-    _client = create_client(_properties);
+    _client = ClientFactory::instance().create(_properties);
     DCHECK(_client) << "init aws s3 client error.";
 }
 
diff --git a/be/src/util/s3_storage_backend.h b/be/src/util/s3_storage_backend.h
index 09c52f8..4518227 100644
--- a/be/src/util/s3_storage_backend.h
+++ b/be/src/util/s3_storage_backend.h
@@ -47,7 +47,7 @@ private:
     template <typename AwsOutcome>
     std::string error_msg(const AwsOutcome& outcome);
     const std::map<std::string, std::string>& _properties;
-    std::unique_ptr<Aws::S3::S3Client> _client;
+    std::shared_ptr<Aws::S3::S3Client> _client;
 };
 
 } // end namespace doris
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index 17baa89..364f580 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -17,12 +17,12 @@
 
 #include "util/s3_util.h"
 
-#include <aws/core/Aws.h>
 #include <aws/core/auth/AWSCredentials.h>
 #include <aws/s3/S3Client.h>
 #include <util/string_util.h>
 
-#include "common/logging.h"
+#include "common/config.h"
+#include "util/logging.h"
 
 namespace doris {
 
@@ -31,23 +31,43 @@ const static std::string S3_SK = "AWS_SECRET_KEY";
 const static std::string S3_ENDPOINT = "AWS_ENDPOINT";
 const static std::string S3_REGION = "AWS_REGION";
 
-std::unique_ptr<Aws::S3::S3Client> create_client(const std::map<std::string, 
std::string>& prop) {
+ClientFactory::ClientFactory() {
+    _aws_options = Aws::SDKOptions{};
+    Aws::Utils::Logging::LogLevel logLevel =
+            static_cast<Aws::Utils::Logging::LogLevel>(config::aws_log_level);
+    _aws_options.loggingOptions.logLevel = logLevel;
+    _aws_options.loggingOptions.logger_create_fn = [logLevel] {
+        return std::make_shared<DorisAWSLogger>(logLevel);
+    };
+    Aws::InitAPI(_aws_options);
+}
+
+ClientFactory::~ClientFactory() {
+    Aws::ShutdownAPI(_aws_options);
+}
+
+ClientFactory& ClientFactory::instance() {
+    static ClientFactory ret;
+    return ret;
+}
+
+std::shared_ptr<Aws::S3::S3Client> ClientFactory::create(
+        const std::map<std::string, std::string>& prop) {
     StringCaseMap<std::string> properties(prop.begin(), prop.end());
-    Aws::Auth::AWSCredentials aws_cred;
-    Aws::Client::ClientConfiguration aws_config;
-    std::unique_ptr<Aws::S3::S3Client> client;
-    if (properties.find(S3_AK) != properties.end() && properties.find(S3_SK) 
!= properties.end() &&
-        properties.find(S3_ENDPOINT) != properties.end() &&
-        properties.find(S3_REGION) != properties.end()) {
-        aws_cred.SetAWSAccessKeyId(properties.find(S3_AK)->second);
-        aws_cred.SetAWSSecretKey(properties.find(S3_SK)->second);
-        DCHECK(!aws_cred.IsExpiredOrEmpty());
-        aws_config.endpointOverride = properties.find(S3_ENDPOINT)->second;
-        aws_config.region = properties.find(S3_REGION)->second;
-        client.reset(new Aws::S3::S3Client(aws_cred, aws_config));
-    } else {
-        client.reset(nullptr);
+    if (properties.find(S3_AK) == properties.end() || properties.find(S3_SK) 
== properties.end() ||
+        properties.find(S3_ENDPOINT) == properties.end() ||
+        properties.find(S3_REGION) == properties.end()) {
+        DCHECK(false) << "aws properties is incorrect.";
+        LOG(ERROR) << "aws properties is incorrect.";
     }
-    return client;
+    Aws::Auth::AWSCredentials aws_cred(properties.find(S3_AK)->second,
+                                       properties.find(S3_SK)->second);
+    DCHECK(!aws_cred.IsExpiredOrEmpty());
+
+    Aws::Client::ClientConfiguration aws_config;
+    aws_config.endpointOverride = properties.find(S3_ENDPOINT)->second;
+    aws_config.region = properties.find(S3_REGION)->second;
+    return std::make_shared<Aws::S3::S3Client>(std::move(aws_cred), 
std::move(aws_config));
 }
+
 } // end namespace doris
diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h
index 57a9314..5fe383c 100644
--- a/be/src/util/s3_util.h
+++ b/be/src/util/s3_util.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <aws/core/Aws.h>
+
 #include <map>
 #include <memory>
 #include <string>
@@ -28,7 +30,19 @@ class S3Client;
 } // namespace Aws
 
 namespace doris {
+class ClientFactory {
+public:
+    ~ClientFactory();
+
+    static ClientFactory& instance();
+
+    std::shared_ptr<Aws::S3::S3Client> create(const std::map<std::string, 
std::string>& prop);
+
+private:
+    ClientFactory();
 
+    Aws::SDKOptions _aws_options;
+};
 std::unique_ptr<Aws::S3::S3Client> create_client(const std::map<std::string, 
std::string>& prop);
 
 } // end namespace doris
diff --git a/be/test/exec/s3_reader_test.cpp b/be/test/exec/s3_reader_test.cpp
index cc8ad75..a750f7b 100644
--- a/be/test/exec/s3_reader_test.cpp
+++ b/be/test/exec/s3_reader_test.cpp
@@ -42,7 +42,7 @@ public:
             : _aws_properties({{"AWS_ACCESS_KEY", AK},
                                {"AWS_SECRET_KEY", SK},
                                {"AWS_ENDPOINT", ENDPOINT},
-                               {"AWS_REGION", "bj"}}) {
+                               {"AWS_REGION", REGION}}) {
         _s3_base_path = BUCKET + "s3/" + gen_uuid();
     }
 
@@ -101,6 +101,8 @@ TEST_F(S3ReaderTest, normal) {
     ASSERT_TRUE(st.ok());
     ASSERT_EQ(_content, verification_contents);
     ASSERT_EQ(_content.length(), total_read);
+    ASSERT_FALSE(eof);
+    st = reader->read((uint8_t*)&verification_contents[0], _content.length(), 
&total_read, &eof);
     ASSERT_TRUE(eof);
     int64_t t = 0;
     st = reader->tell(&t);
@@ -114,20 +116,9 @@ TEST_F(S3ReaderTest, normal) {
 } // end namespace doris
 
 int main(int argc, char** argv) {
-    // std::string conffile = std::string(getenv("DORIS_HOME")) + 
"/conf/be.conf";
-    // if (!doris::config::init(conffile.c_str(), false)) {
-    //     fprintf(stderr, "error read config file. \n");
-    //     return -1;
-    // }
-    // doris::init_glog("be-test");
-    // doris::CpuInfo::init();
     ::testing::InitGoogleTest(&argc, argv);
     int ret = 0;
-    Aws::SDKOptions options;
-    options.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Debug;
-    Aws::InitAPI(options);
     // ak sk is secret
     // ret = RUN_ALL_TESTS();
-    Aws::ShutdownAPI(options);
     return ret;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java
index a0e4dd5..e0bb077 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java
@@ -22,10 +22,12 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.S3URI;
 
 import org.apache.commons.collections.map.CaseInsensitiveMap;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.http.HttpStatus;
+import org.apache.http.client.utils.URIBuilder;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -71,6 +73,7 @@ public class S3Storage extends BlobStorage {
     private static final Logger LOG = LogManager.getLogger(S3Storage.class);
     private final CaseInsensitiveMap caseInsensitiveProperties;
     private S3Client client;
+    private boolean forceHostedStyle = false;
 
     public S3Storage(Map<String, String> properties) {
         caseInsensitiveProperties = new CaseInsensitiveMap();
@@ -84,8 +87,24 @@ public class S3Storage extends BlobStorage {
     public void setProperties(Map<String, String> properties) {
         super.setProperties(properties);
         caseInsensitiveProperties.putAll(properties);
+        // Virtual hosted-sytle is recommended in the s3 protocol.
+        // The path-style has been abandoned, but for some unexplainable 
reasons.
+        // The s3 client will determine whether the endpiont starts with `s3`
+        // when generating a virtual hosted-sytle request.
+        // If not, it will not be converted ( 
https://github.com/aws/aws-sdk-java-v2/pull/763),
+        // but the endpoints of many cloud service providers for object 
storage do not start with s3,
+        // so they cannot be converted to virtual hosted-sytle.
+        // Some of them, such as aliyun's oss, only support virtual 
hosted-sytle,
+        // so we need to do some additional conversion.
+
+        if 
(!caseInsensitiveProperties.get(S3_ENDPOINT).toString().toLowerCase().startsWith("s3"))
 {
+            forceHostedStyle = true;
+        } else {
+            forceHostedStyle = false;
+        }
 
     }
+
     private void checkS3() throws UserException {
         if (!caseInsensitiveProperties.containsKey(S3_REGION)) {
             throw new UserException("AWS_REGION not found.");
@@ -101,40 +120,46 @@ public class S3Storage extends BlobStorage {
         }
     }
 
-    private S3Client getClient() throws UserException {
+    private S3Client getClient(String bucket) throws UserException {
         if (client == null) {
             checkS3();
-            URI endpoint = 
URI.create(caseInsensitiveProperties.get(S3_ENDPOINT).toString());
+            URI tmpEndpoint = 
URI.create(caseInsensitiveProperties.get(S3_ENDPOINT).toString());
             AwsBasicCredentials awsBasic = AwsBasicCredentials.create(
-                caseInsensitiveProperties.get(S3_AK).toString(),
-                caseInsensitiveProperties.get(S3_SK).toString());
+                    caseInsensitiveProperties.get(S3_AK).toString(),
+                    caseInsensitiveProperties.get(S3_SK).toString());
             StaticCredentialsProvider scp = 
StaticCredentialsProvider.create(awsBasic);
             EqualJitterBackoffStrategy backoffStrategy = 
EqualJitterBackoffStrategy
-                .builder()
-                .baseDelay(Duration.ofSeconds(1))
-                .maxBackoffTime(Duration.ofMinutes(1))
-                .build();
+                    .builder()
+                    .baseDelay(Duration.ofSeconds(1))
+                    .maxBackoffTime(Duration.ofMinutes(1))
+                    .build();
             // retry 3 time with Equal backoff
             RetryPolicy retryPolicy = RetryPolicy
-                .builder()
-                .numRetries(3)
-                .backoffStrategy(backoffStrategy)
-                .build();
+                    .builder()
+                    .numRetries(3)
+                    .backoffStrategy(backoffStrategy)
+                    .build();
             ClientOverrideConfiguration clientConf = 
ClientOverrideConfiguration
-                .builder()
-                // set retry policy
-                .retryPolicy(retryPolicy)
-                // using AwsS3V4Signer
-                .putAdvancedOption(SdkAdvancedClientOption.SIGNER, 
AwsS3V4Signer.create())
-                .build();
+                    .builder()
+                    // set retry policy
+                    .retryPolicy(retryPolicy)
+                    // using AwsS3V4Signer
+                    .putAdvancedOption(SdkAdvancedClientOption.SIGNER, 
AwsS3V4Signer.create())
+                    .build();
+            URI endpoint = StringUtils.isEmpty(bucket) ? tmpEndpoint :
+                    URI.create(new URIBuilder(tmpEndpoint).setHost(bucket + 
"." + tmpEndpoint.getHost()).toString());
             client = S3Client.builder()
-                .endpointOverride(endpoint)
-                .credentialsProvider(scp)
-                
.region(Region.of(caseInsensitiveProperties.get(S3_REGION).toString()))
-                .overrideConfiguration(clientConf)
-                // disable chunkedEncoding because of bos not supported
-                
.serviceConfiguration(S3Configuration.builder().chunkedEncodingEnabled(false).build())
-                .build();
+                    .endpointOverride(endpoint)
+                    .credentialsProvider(scp)
+                    
.region(Region.of(caseInsensitiveProperties.get(S3_REGION).toString()))
+                    .overrideConfiguration(clientConf)
+                    // disable chunkedEncoding because of bos not supported
+                    // use virtual hosted-style access
+                    .serviceConfiguration(S3Configuration.builder()
+                            .chunkedEncodingEnabled(false)
+                            .pathStyleAccessEnabled(false)
+                            .build())
+                    .build();
         }
         return client;
     }
@@ -142,39 +167,37 @@ public class S3Storage extends BlobStorage {
     @Override
     public Status downloadWithFileSize(String remoteFilePath, String 
localFilePath, long fileSize) {
         long start = System.currentTimeMillis();
-        S3URI uri = new S3URI(remoteFilePath);
+        S3URI uri = new S3URI(remoteFilePath, forceHostedStyle);
         // Write the data to a local file
         File localFile = new File(localFilePath);
         if (localFile.exists()) {
             try {
                 Files.walk(Paths.get(localFilePath), 
FileVisitOption.FOLLOW_LINKS)
-                    .sorted(Comparator.reverseOrder())
-                    .map(Path::toFile)
-                    .forEach(File::delete);
+                        .sorted(Comparator.reverseOrder())
+                        .map(Path::toFile)
+                        .forEach(File::delete);
             } catch (IOException e) {
                 return new Status(
-                    Status.ErrCode.COMMON_ERROR, "failed to delete exist local 
file: " + localFilePath);
+                        Status.ErrCode.COMMON_ERROR, "failed to delete exist 
local file: " + localFilePath);
             }
         }
         try {
-            GetObjectRequest getObjectRequest =
-                
GetObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build();
-            GetObjectResponse response = 
getClient().getObject(getObjectRequest, localFile.toPath());
+            GetObjectResponse response = 
getClient(uri.getVirtualBucket()).getObject(GetObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
 localFile.toPath());
             if (localFile.length() == fileSize) {
                 LOG.info(
-                    "finished to download from {} to {} with size: {}. cost {} 
ms",
-                    remoteFilePath,
-                    localFilePath,
-                    fileSize,
-                    (System.currentTimeMillis() - start));
+                        "finished to download from {} to {} with size: {}. 
cost {} ms",
+                        remoteFilePath,
+                        localFilePath,
+                        fileSize,
+                        (System.currentTimeMillis() - start));
                 return Status.OK;
             } else {
                 return new Status(Status.ErrCode.COMMON_ERROR, 
response.toString());
             }
         } catch (S3Exception s3Exception) {
             return new Status(
-                Status.ErrCode.COMMON_ERROR,
-                "get file from s3 error: " + 
s3Exception.awsErrorDetails().errorMessage());
+                    Status.ErrCode.COMMON_ERROR,
+                    "get file from s3 error: " + 
s3Exception.awsErrorDetails().errorMessage());
         } catch (UserException ue) {
             LOG.error("connect to s3 failed: ", ue);
             return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 
failed: " + ue.getMessage());
@@ -185,13 +208,13 @@ public class S3Storage extends BlobStorage {
 
     @Override
     public Status directUpload(String content, String remoteFile) {
-        S3URI uri = new S3URI(remoteFile);
+        S3URI uri = new S3URI(remoteFile, forceHostedStyle);
         try {
             PutObjectResponse response =
-                getClient()
-                    .putObject(
-                        
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
-                        RequestBody.fromBytes(content.getBytes()));
+                    getClient(uri.getVirtualBucket())
+                            .putObject(
+                                    
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
+                                    RequestBody.fromBytes(content.getBytes()));
             LOG.info("upload content success: " + response.eTag());
             return Status.OK;
         } catch (S3Exception e) {
@@ -205,15 +228,15 @@ public class S3Storage extends BlobStorage {
 
     public Status copy(String origFilePath, String destFilePath) {
         S3URI origUri = new S3URI(origFilePath);
-        S3URI descUri = new S3URI(destFilePath);
+        S3URI descUri = new S3URI(destFilePath, forceHostedStyle);
         try {
-            getClient()
-                .copyObject(
-                    CopyObjectRequest.builder()
-                        .copySource(origUri.getBucket() + "/" + 
origUri.getKey())
-                        .destinationBucket(descUri.getBucket())
-                        .destinationKey(descUri.getKey())
-                        .build());
+            getClient(descUri.getVirtualBucket())
+                    .copyObject(
+                            CopyObjectRequest.builder()
+                                    .copySource(origUri.getBucket() + "/" + 
origUri.getKey())
+                                    .destinationBucket(descUri.getBucket())
+                                    .destinationKey(descUri.getKey())
+                                    .build());
             return Status.OK;
         } catch (S3Exception e) {
             LOG.error("copy file failed: ", e);
@@ -226,13 +249,13 @@ public class S3Storage extends BlobStorage {
 
     @Override
     public Status upload(String localPath, String remotePath) {
-        S3URI uri = new S3URI(remotePath);
+        S3URI uri = new S3URI(remotePath, forceHostedStyle);
         try {
             PutObjectResponse response =
-                getClient()
-                    .putObject(
-                        
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
-                        RequestBody.fromFile(new File(localPath)));
+                    getClient(uri.getVirtualBucket())
+                            .putObject(
+                                    
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
+                                    RequestBody.fromFile(new File(localPath)));
             LOG.info("upload file " + localPath + " success: " + 
response.eTag());
             return Status.OK;
         } catch (S3Exception e) {
@@ -256,12 +279,12 @@ public class S3Storage extends BlobStorage {
 
     @Override
     public Status delete(String remotePath) {
-        S3URI uri = new S3URI(remotePath);
+        S3URI uri = new S3URI(remotePath, forceHostedStyle);
         try {
             DeleteObjectResponse response =
-                getClient()
-                    .deleteObject(
-                        
DeleteObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
+                    getClient(uri.getVirtualBucket())
+                            .deleteObject(
+                                    
DeleteObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
             LOG.info("delete file " + remotePath + " success: " + 
response.toString());
             return Status.OK;
         } catch (S3Exception e) {
@@ -289,6 +312,7 @@ public class S3Storage extends BlobStorage {
             String s3AK = caseInsensitiveProperties.get(S3_AK).toString();
             String s3Sk = caseInsensitiveProperties.get(S3_SK).toString();
             String s3Endpoint = 
caseInsensitiveProperties.get(S3_ENDPOINT).toString();
+            System.setProperty("com.amazonaws.services.s3.enableV4", "true");
             conf.set("fs.s3a.access.key", s3AK);
             conf.set("fs.s3a.secret.key", s3Sk);
             conf.set("fs.s3a.endpoint", s3Endpoint);
@@ -301,7 +325,7 @@ public class S3Storage extends BlobStorage {
                 return Status.OK;
             }
             for (FileStatus fileStatus : files) {
-                RemoteFile remoteFile = new 
RemoteFile(fileNameOnly?fileStatus.getPath().getName():fileStatus.getPath().toString(),
 !fileStatus.isDirectory(), fileStatus.isDirectory()? -1:fileStatus.getLen());
+                RemoteFile remoteFile = new RemoteFile(fileNameOnly ? 
fileStatus.getPath().getName() : fileStatus.getPath().toString(), 
!fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen());
                 result.add(remoteFile);
             }
         } catch (FileNotFoundException e) {
@@ -319,13 +343,13 @@ public class S3Storage extends BlobStorage {
         if (!remotePath.endsWith("/")) {
             remotePath += "/";
         }
-        S3URI uri = new S3URI(remotePath);
+        S3URI uri = new S3URI(remotePath, forceHostedStyle);
         try {
             PutObjectResponse response =
-                getClient()
-                    .putObject(
-                        
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
-                        RequestBody.empty());
+                    getClient(uri.getVirtualBucket())
+                            .putObject(
+                                    
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
+                                    RequestBody.empty());
             LOG.info("makeDir success: " + response.eTag());
             return Status.OK;
         } catch (S3Exception e) {
@@ -339,10 +363,10 @@ public class S3Storage extends BlobStorage {
 
     @Override
     public Status checkPathExist(String remotePath) {
-        S3URI uri = new S3URI(remotePath);
+        S3URI uri = new S3URI(remotePath, forceHostedStyle);
         try {
-            getClient()
-                
.headObject(HeadObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
+            getClient(uri.getVirtualBucket())
+                    
.headObject(HeadObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
             return Status.OK;
         } catch (S3Exception e) {
             if (e.statusCode() == HttpStatus.SC_NOT_FOUND) {
@@ -362,3 +386,4 @@ public class S3Storage extends BlobStorage {
         return StorageBackend.StorageType.S3;
     }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
index e79d395..0aadfe4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
@@ -41,8 +41,10 @@ public class S3URI {
 
     private String scheme;
     private final String location;
+    private final String virtualBucket;
     private final String bucket;
     private final String key;
+    private boolean forceHosted;
 
     /**
      * Creates a new S3URI based on the bucket and key parsed from the 
location as defined in:
@@ -54,8 +56,13 @@ public class S3URI {
      * @param location fully qualified URI
      */
     public S3URI(String location) {
+        this(location, false);
+    }
+
+    public S3URI(String location, boolean forceHosted) {
         Preconditions.checkNotNull(location, "Location cannot be null.");
         this.location = location;
+        this.forceHosted = forceHosted;
         String[] schemeSplit = location.split(SCHEME_DELIM);
         Preconditions.checkState(schemeSplit.length == 2, "Invalid S3 URI: 
%s", location);
 
@@ -64,13 +71,24 @@ public class S3URI {
         String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2);
         Preconditions.checkState(authoritySplit.length == 2, "Invalid S3 URI: 
%s", location);
         Preconditions.checkState(!authoritySplit[1].trim().isEmpty(), "Invalid 
S3 key: %s", location);
-        this.bucket = authoritySplit[0];
-
         // Strip query and fragment if they exist
         String path = authoritySplit[1];
         path = path.split(QUERY_DELIM)[0];
         path = path.split(FRAGMENT_DELIM)[0];
-        key = path;
+        if (forceHosted) {
+            this.virtualBucket = authoritySplit[0];
+            String[] paths = path.split("/", 2);
+            this.bucket = paths[0];
+            if (paths.length > 1) {
+                key = paths[1];
+            } else {
+                key = "";
+            }
+        } else {
+            this.virtualBucket = "";
+            this.bucket = authoritySplit[0];
+            key = path;
+        }
     }
 
     public List<String> expand(String path) {
@@ -85,6 +103,10 @@ public class S3URI {
         return scheme + "://" + bucket;
     }
 
+    public String getVirtualBucket() {
+        return virtualBucket;
+    }
+
     /**
      * @return S3 bucket
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 15436cf..5a2f973 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -67,7 +67,7 @@ import com.google.gson.annotations.SerializedName;
 import com.google.gson.reflect.TypeToken;
 import com.google.gson.stream.JsonReader;
 import com.google.gson.stream.JsonWriter;
-import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
+import  org.apache.commons.lang3.reflect.TypeUtils;
 
 /*
  * Some utilities about Gson.
@@ -363,7 +363,7 @@ public class GsonUtils {
                                              final JsonDeserializationContext 
context) throws JsonParseException
         {
             final Type type2 =
-                    ParameterizedTypeImpl.make(Map.class, ((ParameterizedType) 
type).getActualTypeArguments(), null);
+                    TypeUtils.parameterize(Map.class, ((ParameterizedType) 
type).getActualTypeArguments());
             final Map<?,?> map = context.deserialize(json, type2);
             return ImmutableMap.copyOf(map);
         }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to