This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new ba38973 use virtual hosted-style request to access object store
(#5894)
ba38973 is described below
commit ba38973209e70312e052f0220ef783180a1f4442
Author: Zhengguo Yang <[email protected]>
AuthorDate: Thu May 27 15:52:07 2021 +0800
use virtual hosted-style request to access object store (#5894)
* use virtual hosted-style access request object store
---
be/src/common/daemon.cpp | 18 +--
be/src/exec/s3_reader.cpp | 4 +-
be/src/exec/s3_reader.h | 2 +-
be/src/exec/s3_writer.cpp | 12 +-
be/src/exec/s3_writer.h | 2 +-
be/src/util/s3_storage_backend.cpp | 2 +-
be/src/util/s3_storage_backend.h | 2 +-
be/src/util/s3_util.cpp | 56 ++++---
be/src/util/s3_util.h | 14 ++
be/test/exec/s3_reader_test.cpp | 15 +-
.../java/org/apache/doris/backup/S3Storage.java | 167 ++++++++++++---------
.../java/org/apache/doris/common/util/S3URI.java | 28 +++-
.../org/apache/doris/persist/gson/GsonUtils.java | 4 +-
13 files changed, 192 insertions(+), 134 deletions(-)
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 5b21ea0..d70ee07 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -17,11 +17,9 @@
#include "common/daemon.h"
-#include <signal.h>
-
-#include <aws/core/Aws.h>
#include <gflags/gflags.h>
#include <gperftools/malloc_extension.h>
+#include <signal.h>
#include "common/config.h"
#include "exprs/bitmap_function.h"
@@ -68,8 +66,6 @@ namespace doris {
bool k_doris_exit = false;
-Aws::SDKOptions aws_options;
-
void Daemon::tcmalloc_gc_thread() {
while
(!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(10))) {
size_t used_size = 0;
@@ -268,14 +264,6 @@ void Daemon::init(int argc, char** argv, const
std::vector<StorePath>& paths) {
HllFunctions::init();
HashFunctions::init();
TopNFunctions::init();
- // disable EC2 metadata service
- setenv("AWS_EC2_METADATA_DISABLED", "true", false);
- Aws::Utils::Logging::LogLevel logLevel =
static_cast<Aws::Utils::Logging::LogLevel>(config::aws_log_level);
- aws_options.loggingOptions.logLevel = logLevel;
- aws_options.loggingOptions.logger_create_fn = [logLevel] {
- return std::make_shared<DorisAWSLogger>(logLevel);
- };
- Aws::InitAPI(aws_options);
LOG(INFO) << CpuInfo::debug_string();
LOG(INFO) << DiskInfo::debug_string();
@@ -303,7 +291,8 @@ void Daemon::start() {
if (config::enable_metric_calculator) {
CHECK(DorisMetrics::instance()->is_inited())
<< "enable metric calculator failed, maybe you set
enable_system_metrics to false "
- << " or there may be some hardware error which causes metric
init failed, please check log first;"
+ << " or there may be some hardware error which causes metric
init failed, please "
+ "check log first;"
<< " you can set enable_metric_calculator = false to quickly
recover ";
st = Thread::create(
@@ -325,7 +314,6 @@ void Daemon::stop() {
if (_calculate_metrics_thread) {
_calculate_metrics_thread->join();
}
- Aws::ShutdownAPI(aws_options);
}
} // namespace doris
diff --git a/be/src/exec/s3_reader.cpp b/be/src/exec/s3_reader.cpp
index ddc71c6..ae5cc3b 100644
--- a/be/src/exec/s3_reader.cpp
+++ b/be/src/exec/s3_reader.cpp
@@ -41,8 +41,8 @@ S3Reader::S3Reader(const std::map<std::string, std::string>&
properties, const s
_uri(path),
_cur_offset(start_offset),
_file_size(0),
- _closed(false) {
- _client = create_client(_properties);
+ _closed(false),
+ _client(ClientFactory::instance().create(_properties)) {
DCHECK(_client) << "init aws s3 client error.";
}
diff --git a/be/src/exec/s3_reader.h b/be/src/exec/s3_reader.h
index cd5c185..1676ba5 100644
--- a/be/src/exec/s3_reader.h
+++ b/be/src/exec/s3_reader.h
@@ -64,6 +64,6 @@ private:
int64_t _cur_offset;
int64_t _file_size;
bool _closed;
- std::unique_ptr<Aws::S3::S3Client> _client;
+ std::shared_ptr<Aws::S3::S3Client> _client;
};
} // end namespace doris
diff --git a/be/src/exec/s3_writer.cpp b/be/src/exec/s3_writer.cpp
index 557c004..a8cb377 100644
--- a/be/src/exec/s3_writer.cpp
+++ b/be/src/exec/s3_writer.cpp
@@ -41,12 +41,10 @@ S3Writer::S3Writer(const std::map<std::string,
std::string>& properties, const s
_path(path),
_uri(path),
_sync_needed(false),
- _temp_file(Aws::MakeShared<Aws::Utils::TempFile>(
- "S3WRITER",
- // "/tmp/doris_tmp_", "s3tmp",
+ _client(ClientFactory::instance().create(_properties)),
+ _temp_file(std::make_shared<Aws::Utils::TempFile>(
std::ios_base::binary | std::ios_base::trunc |
std::ios_base::in |
- std::ios_base::out)) {
- _client = create_client(_properties);
+ std::ios_base::out)) {
DCHECK(_client) << "init aws s3 client error.";
}
@@ -56,8 +54,8 @@ S3Writer::~S3Writer() {
Status S3Writer::open() {
CHECK_S3_CLIENT(_client);
- if (!_uri.parse()) {
- return Status::InvalidArgument("s3 uri is invalid: " + _path);
+ if (!_uri.parse()) {
+ return Status::InvalidArgument("s3 uri is invalid: " + _path);
}
Aws::S3::Model::HeadObjectRequest request;
request.WithBucket(_uri.get_bucket()).WithKey(_uri.get_key());
diff --git a/be/src/exec/s3_writer.h b/be/src/exec/s3_writer.h
index e070a99..663703f 100644
--- a/be/src/exec/s3_writer.h
+++ b/be/src/exec/s3_writer.h
@@ -53,8 +53,8 @@ private:
std::string _path;
S3URI _uri;
bool _sync_needed;
+ std::shared_ptr<Aws::S3::S3Client> _client;
std::shared_ptr<Aws::Utils::TempFile> _temp_file;
- std::unique_ptr<Aws::S3::S3Client> _client;
};
} // end namespace doris
diff --git a/be/src/util/s3_storage_backend.cpp
b/be/src/util/s3_storage_backend.cpp
index bd1a7d2..5c786a9 100644
--- a/be/src/util/s3_storage_backend.cpp
+++ b/be/src/util/s3_storage_backend.cpp
@@ -61,7 +61,7 @@ namespace doris {
S3StorageBackend::S3StorageBackend(const std::map<std::string, std::string>&
prop)
: _properties(prop) {
- _client = create_client(_properties);
+ _client = ClientFactory::instance().create(_properties);
DCHECK(_client) << "init aws s3 client error.";
}
diff --git a/be/src/util/s3_storage_backend.h b/be/src/util/s3_storage_backend.h
index 09c52f8..4518227 100644
--- a/be/src/util/s3_storage_backend.h
+++ b/be/src/util/s3_storage_backend.h
@@ -47,7 +47,7 @@ private:
template <typename AwsOutcome>
std::string error_msg(const AwsOutcome& outcome);
const std::map<std::string, std::string>& _properties;
- std::unique_ptr<Aws::S3::S3Client> _client;
+ std::shared_ptr<Aws::S3::S3Client> _client;
};
} // end namespace doris
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index 17baa89..364f580 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -17,12 +17,12 @@
#include "util/s3_util.h"
-#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/S3Client.h>
#include <util/string_util.h>
-#include "common/logging.h"
+#include "common/config.h"
+#include "util/logging.h"
namespace doris {
@@ -31,23 +31,43 @@ const static std::string S3_SK = "AWS_SECRET_KEY";
const static std::string S3_ENDPOINT = "AWS_ENDPOINT";
const static std::string S3_REGION = "AWS_REGION";
-std::unique_ptr<Aws::S3::S3Client> create_client(const std::map<std::string,
std::string>& prop) {
+ClientFactory::ClientFactory() {
+ _aws_options = Aws::SDKOptions{};
+ Aws::Utils::Logging::LogLevel logLevel =
+ static_cast<Aws::Utils::Logging::LogLevel>(config::aws_log_level);
+ _aws_options.loggingOptions.logLevel = logLevel;
+ _aws_options.loggingOptions.logger_create_fn = [logLevel] {
+ return std::make_shared<DorisAWSLogger>(logLevel);
+ };
+ Aws::InitAPI(_aws_options);
+}
+
+ClientFactory::~ClientFactory() {
+ Aws::ShutdownAPI(_aws_options);
+}
+
+ClientFactory& ClientFactory::instance() {
+ static ClientFactory ret;
+ return ret;
+}
+
+std::shared_ptr<Aws::S3::S3Client> ClientFactory::create(
+ const std::map<std::string, std::string>& prop) {
StringCaseMap<std::string> properties(prop.begin(), prop.end());
- Aws::Auth::AWSCredentials aws_cred;
- Aws::Client::ClientConfiguration aws_config;
- std::unique_ptr<Aws::S3::S3Client> client;
- if (properties.find(S3_AK) != properties.end() && properties.find(S3_SK)
!= properties.end() &&
- properties.find(S3_ENDPOINT) != properties.end() &&
- properties.find(S3_REGION) != properties.end()) {
- aws_cred.SetAWSAccessKeyId(properties.find(S3_AK)->second);
- aws_cred.SetAWSSecretKey(properties.find(S3_SK)->second);
- DCHECK(!aws_cred.IsExpiredOrEmpty());
- aws_config.endpointOverride = properties.find(S3_ENDPOINT)->second;
- aws_config.region = properties.find(S3_REGION)->second;
- client.reset(new Aws::S3::S3Client(aws_cred, aws_config));
- } else {
- client.reset(nullptr);
+ if (properties.find(S3_AK) == properties.end() || properties.find(S3_SK)
== properties.end() ||
+ properties.find(S3_ENDPOINT) == properties.end() ||
+ properties.find(S3_REGION) == properties.end()) {
+ DCHECK(false) << "aws properties is incorrect.";
+ LOG(ERROR) << "aws properties is incorrect.";
}
- return client;
+ Aws::Auth::AWSCredentials aws_cred(properties.find(S3_AK)->second,
+ properties.find(S3_SK)->second);
+ DCHECK(!aws_cred.IsExpiredOrEmpty());
+
+ Aws::Client::ClientConfiguration aws_config;
+ aws_config.endpointOverride = properties.find(S3_ENDPOINT)->second;
+ aws_config.region = properties.find(S3_REGION)->second;
+ return std::make_shared<Aws::S3::S3Client>(std::move(aws_cred),
std::move(aws_config));
}
+
} // end namespace doris
diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h
index 57a9314..5fe383c 100644
--- a/be/src/util/s3_util.h
+++ b/be/src/util/s3_util.h
@@ -17,6 +17,8 @@
#pragma once
+#include <aws/core/Aws.h>
+
#include <map>
#include <memory>
#include <string>
@@ -28,7 +30,19 @@ class S3Client;
} // namespace Aws
namespace doris {
+class ClientFactory {
+public:
+ ~ClientFactory();
+
+ static ClientFactory& instance();
+
+ std::shared_ptr<Aws::S3::S3Client> create(const std::map<std::string,
std::string>& prop);
+
+private:
+ ClientFactory();
+ Aws::SDKOptions _aws_options;
+};
std::unique_ptr<Aws::S3::S3Client> create_client(const std::map<std::string,
std::string>& prop);
} // end namespace doris
diff --git a/be/test/exec/s3_reader_test.cpp b/be/test/exec/s3_reader_test.cpp
index cc8ad75..a750f7b 100644
--- a/be/test/exec/s3_reader_test.cpp
+++ b/be/test/exec/s3_reader_test.cpp
@@ -42,7 +42,7 @@ public:
: _aws_properties({{"AWS_ACCESS_KEY", AK},
{"AWS_SECRET_KEY", SK},
{"AWS_ENDPOINT", ENDPOINT},
- {"AWS_REGION", "bj"}}) {
+ {"AWS_REGION", REGION}}) {
_s3_base_path = BUCKET + "s3/" + gen_uuid();
}
@@ -101,6 +101,8 @@ TEST_F(S3ReaderTest, normal) {
ASSERT_TRUE(st.ok());
ASSERT_EQ(_content, verification_contents);
ASSERT_EQ(_content.length(), total_read);
+ ASSERT_FALSE(eof);
+ st = reader->read((uint8_t*)&verification_contents[0], _content.length(),
&total_read, &eof);
ASSERT_TRUE(eof);
int64_t t = 0;
st = reader->tell(&t);
@@ -114,20 +116,9 @@ TEST_F(S3ReaderTest, normal) {
} // end namespace doris
int main(int argc, char** argv) {
- // std::string conffile = std::string(getenv("DORIS_HOME")) +
"/conf/be.conf";
- // if (!doris::config::init(conffile.c_str(), false)) {
- // fprintf(stderr, "error read config file. \n");
- // return -1;
- // }
- // doris::init_glog("be-test");
- // doris::CpuInfo::init();
::testing::InitGoogleTest(&argc, argv);
int ret = 0;
- Aws::SDKOptions options;
- options.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Debug;
- Aws::InitAPI(options);
// ak sk is secret
// ret = RUN_ALL_TESTS();
- Aws::ShutdownAPI(options);
return ret;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java
index a0e4dd5..e0bb077 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java
@@ -22,10 +22,12 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3URI;
import org.apache.commons.collections.map.CaseInsensitiveMap;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.http.HttpStatus;
+import org.apache.http.client.utils.URIBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -71,6 +73,7 @@ public class S3Storage extends BlobStorage {
private static final Logger LOG = LogManager.getLogger(S3Storage.class);
private final CaseInsensitiveMap caseInsensitiveProperties;
private S3Client client;
+ private boolean forceHostedStyle = false;
public S3Storage(Map<String, String> properties) {
caseInsensitiveProperties = new CaseInsensitiveMap();
@@ -84,8 +87,24 @@ public class S3Storage extends BlobStorage {
public void setProperties(Map<String, String> properties) {
super.setProperties(properties);
caseInsensitiveProperties.putAll(properties);
+ // Virtual hosted-sytle is recommended in the s3 protocol.
+ // The path-style has been abandoned, but for some unexplainable
reasons.
+ // The s3 client will determine whether the endpiont starts with `s3`
+ // when generating a virtual hosted-sytle request.
+ // If not, it will not be converted (
https://github.com/aws/aws-sdk-java-v2/pull/763),
+ // but the endpoints of many cloud service providers for object
storage do not start with s3,
+ // so they cannot be converted to virtual hosted-sytle.
+ // Some of them, such as aliyun's oss, only support virtual
hosted-sytle,
+ // so we need to do some additional conversion.
+
+ if
(!caseInsensitiveProperties.get(S3_ENDPOINT).toString().toLowerCase().startsWith("s3"))
{
+ forceHostedStyle = true;
+ } else {
+ forceHostedStyle = false;
+ }
}
+
private void checkS3() throws UserException {
if (!caseInsensitiveProperties.containsKey(S3_REGION)) {
throw new UserException("AWS_REGION not found.");
@@ -101,40 +120,46 @@ public class S3Storage extends BlobStorage {
}
}
- private S3Client getClient() throws UserException {
+ private S3Client getClient(String bucket) throws UserException {
if (client == null) {
checkS3();
- URI endpoint =
URI.create(caseInsensitiveProperties.get(S3_ENDPOINT).toString());
+ URI tmpEndpoint =
URI.create(caseInsensitiveProperties.get(S3_ENDPOINT).toString());
AwsBasicCredentials awsBasic = AwsBasicCredentials.create(
- caseInsensitiveProperties.get(S3_AK).toString(),
- caseInsensitiveProperties.get(S3_SK).toString());
+ caseInsensitiveProperties.get(S3_AK).toString(),
+ caseInsensitiveProperties.get(S3_SK).toString());
StaticCredentialsProvider scp =
StaticCredentialsProvider.create(awsBasic);
EqualJitterBackoffStrategy backoffStrategy =
EqualJitterBackoffStrategy
- .builder()
- .baseDelay(Duration.ofSeconds(1))
- .maxBackoffTime(Duration.ofMinutes(1))
- .build();
+ .builder()
+ .baseDelay(Duration.ofSeconds(1))
+ .maxBackoffTime(Duration.ofMinutes(1))
+ .build();
// retry 3 time with Equal backoff
RetryPolicy retryPolicy = RetryPolicy
- .builder()
- .numRetries(3)
- .backoffStrategy(backoffStrategy)
- .build();
+ .builder()
+ .numRetries(3)
+ .backoffStrategy(backoffStrategy)
+ .build();
ClientOverrideConfiguration clientConf =
ClientOverrideConfiguration
- .builder()
- // set retry policy
- .retryPolicy(retryPolicy)
- // using AwsS3V4Signer
- .putAdvancedOption(SdkAdvancedClientOption.SIGNER,
AwsS3V4Signer.create())
- .build();
+ .builder()
+ // set retry policy
+ .retryPolicy(retryPolicy)
+ // using AwsS3V4Signer
+ .putAdvancedOption(SdkAdvancedClientOption.SIGNER,
AwsS3V4Signer.create())
+ .build();
+ URI endpoint = StringUtils.isEmpty(bucket) ? tmpEndpoint :
+ URI.create(new URIBuilder(tmpEndpoint).setHost(bucket +
"." + tmpEndpoint.getHost()).toString());
client = S3Client.builder()
- .endpointOverride(endpoint)
- .credentialsProvider(scp)
-
.region(Region.of(caseInsensitiveProperties.get(S3_REGION).toString()))
- .overrideConfiguration(clientConf)
- // disable chunkedEncoding because of bos not supported
-
.serviceConfiguration(S3Configuration.builder().chunkedEncodingEnabled(false).build())
- .build();
+ .endpointOverride(endpoint)
+ .credentialsProvider(scp)
+
.region(Region.of(caseInsensitiveProperties.get(S3_REGION).toString()))
+ .overrideConfiguration(clientConf)
+ // disable chunkedEncoding because of bos not supported
+ // use virtual hosted-style access
+ .serviceConfiguration(S3Configuration.builder()
+ .chunkedEncodingEnabled(false)
+ .pathStyleAccessEnabled(false)
+ .build())
+ .build();
}
return client;
}
@@ -142,39 +167,37 @@ public class S3Storage extends BlobStorage {
@Override
public Status downloadWithFileSize(String remoteFilePath, String
localFilePath, long fileSize) {
long start = System.currentTimeMillis();
- S3URI uri = new S3URI(remoteFilePath);
+ S3URI uri = new S3URI(remoteFilePath, forceHostedStyle);
// Write the data to a local file
File localFile = new File(localFilePath);
if (localFile.exists()) {
try {
Files.walk(Paths.get(localFilePath),
FileVisitOption.FOLLOW_LINKS)
- .sorted(Comparator.reverseOrder())
- .map(Path::toFile)
- .forEach(File::delete);
+ .sorted(Comparator.reverseOrder())
+ .map(Path::toFile)
+ .forEach(File::delete);
} catch (IOException e) {
return new Status(
- Status.ErrCode.COMMON_ERROR, "failed to delete exist local
file: " + localFilePath);
+ Status.ErrCode.COMMON_ERROR, "failed to delete exist
local file: " + localFilePath);
}
}
try {
- GetObjectRequest getObjectRequest =
-
GetObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build();
- GetObjectResponse response =
getClient().getObject(getObjectRequest, localFile.toPath());
+ GetObjectResponse response =
getClient(uri.getVirtualBucket()).getObject(GetObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
localFile.toPath());
if (localFile.length() == fileSize) {
LOG.info(
- "finished to download from {} to {} with size: {}. cost {}
ms",
- remoteFilePath,
- localFilePath,
- fileSize,
- (System.currentTimeMillis() - start));
+ "finished to download from {} to {} with size: {}.
cost {} ms",
+ remoteFilePath,
+ localFilePath,
+ fileSize,
+ (System.currentTimeMillis() - start));
return Status.OK;
} else {
return new Status(Status.ErrCode.COMMON_ERROR,
response.toString());
}
} catch (S3Exception s3Exception) {
return new Status(
- Status.ErrCode.COMMON_ERROR,
- "get file from s3 error: " +
s3Exception.awsErrorDetails().errorMessage());
+ Status.ErrCode.COMMON_ERROR,
+ "get file from s3 error: " +
s3Exception.awsErrorDetails().errorMessage());
} catch (UserException ue) {
LOG.error("connect to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3
failed: " + ue.getMessage());
@@ -185,13 +208,13 @@ public class S3Storage extends BlobStorage {
@Override
public Status directUpload(String content, String remoteFile) {
- S3URI uri = new S3URI(remoteFile);
+ S3URI uri = new S3URI(remoteFile, forceHostedStyle);
try {
PutObjectResponse response =
- getClient()
- .putObject(
-
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
- RequestBody.fromBytes(content.getBytes()));
+ getClient(uri.getVirtualBucket())
+ .putObject(
+
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
+ RequestBody.fromBytes(content.getBytes()));
LOG.info("upload content success: " + response.eTag());
return Status.OK;
} catch (S3Exception e) {
@@ -205,15 +228,15 @@ public class S3Storage extends BlobStorage {
public Status copy(String origFilePath, String destFilePath) {
S3URI origUri = new S3URI(origFilePath);
- S3URI descUri = new S3URI(destFilePath);
+ S3URI descUri = new S3URI(destFilePath, forceHostedStyle);
try {
- getClient()
- .copyObject(
- CopyObjectRequest.builder()
- .copySource(origUri.getBucket() + "/" +
origUri.getKey())
- .destinationBucket(descUri.getBucket())
- .destinationKey(descUri.getKey())
- .build());
+ getClient(descUri.getVirtualBucket())
+ .copyObject(
+ CopyObjectRequest.builder()
+ .copySource(origUri.getBucket() + "/" +
origUri.getKey())
+ .destinationBucket(descUri.getBucket())
+ .destinationKey(descUri.getKey())
+ .build());
return Status.OK;
} catch (S3Exception e) {
LOG.error("copy file failed: ", e);
@@ -226,13 +249,13 @@ public class S3Storage extends BlobStorage {
@Override
public Status upload(String localPath, String remotePath) {
- S3URI uri = new S3URI(remotePath);
+ S3URI uri = new S3URI(remotePath, forceHostedStyle);
try {
PutObjectResponse response =
- getClient()
- .putObject(
-
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
- RequestBody.fromFile(new File(localPath)));
+ getClient(uri.getVirtualBucket())
+ .putObject(
+
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
+ RequestBody.fromFile(new File(localPath)));
LOG.info("upload file " + localPath + " success: " +
response.eTag());
return Status.OK;
} catch (S3Exception e) {
@@ -256,12 +279,12 @@ public class S3Storage extends BlobStorage {
@Override
public Status delete(String remotePath) {
- S3URI uri = new S3URI(remotePath);
+ S3URI uri = new S3URI(remotePath, forceHostedStyle);
try {
DeleteObjectResponse response =
- getClient()
- .deleteObject(
-
DeleteObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
+ getClient(uri.getVirtualBucket())
+ .deleteObject(
+
DeleteObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
LOG.info("delete file " + remotePath + " success: " +
response.toString());
return Status.OK;
} catch (S3Exception e) {
@@ -289,6 +312,7 @@ public class S3Storage extends BlobStorage {
String s3AK = caseInsensitiveProperties.get(S3_AK).toString();
String s3Sk = caseInsensitiveProperties.get(S3_SK).toString();
String s3Endpoint =
caseInsensitiveProperties.get(S3_ENDPOINT).toString();
+ System.setProperty("com.amazonaws.services.s3.enableV4", "true");
conf.set("fs.s3a.access.key", s3AK);
conf.set("fs.s3a.secret.key", s3Sk);
conf.set("fs.s3a.endpoint", s3Endpoint);
@@ -301,7 +325,7 @@ public class S3Storage extends BlobStorage {
return Status.OK;
}
for (FileStatus fileStatus : files) {
- RemoteFile remoteFile = new
RemoteFile(fileNameOnly?fileStatus.getPath().getName():fileStatus.getPath().toString(),
!fileStatus.isDirectory(), fileStatus.isDirectory()? -1:fileStatus.getLen());
+ RemoteFile remoteFile = new RemoteFile(fileNameOnly ?
fileStatus.getPath().getName() : fileStatus.getPath().toString(),
!fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen());
result.add(remoteFile);
}
} catch (FileNotFoundException e) {
@@ -319,13 +343,13 @@ public class S3Storage extends BlobStorage {
if (!remotePath.endsWith("/")) {
remotePath += "/";
}
- S3URI uri = new S3URI(remotePath);
+ S3URI uri = new S3URI(remotePath, forceHostedStyle);
try {
PutObjectResponse response =
- getClient()
- .putObject(
-
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
- RequestBody.empty());
+ getClient(uri.getVirtualBucket())
+ .putObject(
+
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
+ RequestBody.empty());
LOG.info("makeDir success: " + response.eTag());
return Status.OK;
} catch (S3Exception e) {
@@ -339,10 +363,10 @@ public class S3Storage extends BlobStorage {
@Override
public Status checkPathExist(String remotePath) {
- S3URI uri = new S3URI(remotePath);
+ S3URI uri = new S3URI(remotePath, forceHostedStyle);
try {
- getClient()
-
.headObject(HeadObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
+ getClient(uri.getVirtualBucket())
+
.headObject(HeadObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
return Status.OK;
} catch (S3Exception e) {
if (e.statusCode() == HttpStatus.SC_NOT_FOUND) {
@@ -362,3 +386,4 @@ public class S3Storage extends BlobStorage {
return StorageBackend.StorageType.S3;
}
}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
index e79d395..0aadfe4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
@@ -41,8 +41,10 @@ public class S3URI {
private String scheme;
private final String location;
+ private final String virtualBucket;
private final String bucket;
private final String key;
+ private boolean forceHosted;
/**
* Creates a new S3URI based on the bucket and key parsed from the
location as defined in:
@@ -54,8 +56,13 @@ public class S3URI {
* @param location fully qualified URI
*/
public S3URI(String location) {
+ this(location, false);
+ }
+
+ public S3URI(String location, boolean forceHosted) {
Preconditions.checkNotNull(location, "Location cannot be null.");
this.location = location;
+ this.forceHosted = forceHosted;
String[] schemeSplit = location.split(SCHEME_DELIM);
Preconditions.checkState(schemeSplit.length == 2, "Invalid S3 URI:
%s", location);
@@ -64,13 +71,24 @@ public class S3URI {
String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2);
Preconditions.checkState(authoritySplit.length == 2, "Invalid S3 URI:
%s", location);
Preconditions.checkState(!authoritySplit[1].trim().isEmpty(), "Invalid
S3 key: %s", location);
- this.bucket = authoritySplit[0];
-
// Strip query and fragment if they exist
String path = authoritySplit[1];
path = path.split(QUERY_DELIM)[0];
path = path.split(FRAGMENT_DELIM)[0];
- key = path;
+ if (forceHosted) {
+ this.virtualBucket = authoritySplit[0];
+ String[] paths = path.split("/", 2);
+ this.bucket = paths[0];
+ if (paths.length > 1) {
+ key = paths[1];
+ } else {
+ key = "";
+ }
+ } else {
+ this.virtualBucket = "";
+ this.bucket = authoritySplit[0];
+ key = path;
+ }
}
public List<String> expand(String path) {
@@ -85,6 +103,10 @@ public class S3URI {
return scheme + "://" + bucket;
}
+ public String getVirtualBucket() {
+ return virtualBucket;
+ }
+
/**
* @return S3 bucket
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 15436cf..5a2f973 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -67,7 +67,7 @@ import com.google.gson.annotations.SerializedName;
import com.google.gson.reflect.TypeToken;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
-import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
+import org.apache.commons.lang3.reflect.TypeUtils;
/*
* Some utilities about Gson.
@@ -363,7 +363,7 @@ public class GsonUtils {
final JsonDeserializationContext
context) throws JsonParseException
{
final Type type2 =
- ParameterizedTypeImpl.make(Map.class, ((ParameterizedType)
type).getActualTypeArguments(), null);
+ TypeUtils.parameterize(Map.class, ((ParameterizedType)
type).getActualTypeArguments());
final Map<?,?> map = context.deserialize(json, type2);
return ImmutableMap.copyOf(map);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]