This is an automated email from the ASF dual-hosted git repository.
lide pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7a40f2a5476 [branch-2.1](resource)fix check available fail when s3
aws_token is set and reset as, sk faild on be. (#34219)
7a40f2a5476 is described below
commit 7a40f2a547667de156dfe911e5f42961938835af
Author: huanghg1994 <[email protected]>
AuthorDate: Thu May 9 19:06:14 2024 +0800
[branch-2.1](resource)fix check available fail when s3 aws_token is set and
reset as, sk faild on be. (#34219)
---
be/src/agent/task_worker_pool.cpp | 3 ++-
be/src/io/fs/s3_file_system.cpp | 23 ++++++++++++++++++++++
be/src/io/fs/s3_file_system.h | 2 +-
.../java/org/apache/doris/catalog/S3Resource.java | 16 ++++++++++++---
.../property/constants/S3Properties.java | 1 +
gensrc/thrift/AgentService.thrift | 1 +
.../cold_heat_separation/policy/alter.groovy | 11 +++++++++++
7 files changed, 52 insertions(+), 5 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 1a53ec3b5ec..30c83fdb878 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1248,6 +1248,7 @@ void push_storage_policy_callback(StorageEngine& engine,
const TAgentTaskRequest
S3Conf s3_conf;
s3_conf.ak = std::move(resource.s3_storage_param.ak);
s3_conf.sk = std::move(resource.s3_storage_param.sk);
+ s3_conf.token = std::move(resource.s3_storage_param.token);
s3_conf.endpoint = std::move(resource.s3_storage_param.endpoint);
s3_conf.region = std::move(resource.s3_storage_param.region);
s3_conf.prefix = std::move(resource.s3_storage_param.root_path);
@@ -1263,7 +1264,7 @@ void push_storage_policy_callback(StorageEngine& engine,
const TAgentTaskRequest
st = io::S3FileSystem::create(s3_conf,
std::to_string(resource.id), &fs);
} else {
fs =
std::static_pointer_cast<io::S3FileSystem>(existed_resource.fs);
- fs->set_conf(s3_conf);
+ st = fs->set_conf(s3_conf);
}
if (!st.ok()) {
LOG(WARNING) << "update s3 resource failed: " << st;
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index 7b443ca6804..947020c89c5 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -97,6 +97,29 @@ namespace io {
RETURN_IF_ERROR(get_key(path, &key));
#endif
+// Guarded by external lock.
+Status S3FileSystem::set_conf(S3Conf s3_conf) {
+ if (s3_conf.ak == _s3_conf.ak && s3_conf.sk == _s3_conf.sk &&
s3_conf.token == _s3_conf.token) {
+ return Status::OK(); // Same conf
+ }
+
+ auto reset_conf = _s3_conf;
+ reset_conf.ak = s3_conf.ak;
+ reset_conf.sk = s3_conf.sk;
+ reset_conf.token = s3_conf.token;
+ auto client = S3ClientFactory::instance().create(s3_conf);
+ if (!client) {
+ return Status::InternalError("failed to init s3 client with {}",
_s3_conf.to_string());
+ }
+
+ {
+ std::lock_guard lock(_client_mu);
+ _client = std::move(client);
+ }
+ _s3_conf = std::move(reset_conf);
+ return Status::OK();
+}
+
std::string S3FileSystem::full_path(std::string_view key) const {
return fmt::format("{}/{}/{}", _s3_conf.endpoint, _s3_conf.bucket, key);
}
diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h
index ddae28442dd..6bd1ae4ff17 100644
--- a/be/src/io/fs/s3_file_system.h
+++ b/be/src/io/fs/s3_file_system.h
@@ -59,7 +59,7 @@ public:
static Status create(S3Conf s3_conf, std::string id,
std::shared_ptr<S3FileSystem>* fs);
~S3FileSystem() override;
// Guarded by external lock.
- void set_conf(S3Conf s3_conf) { _s3_conf = std::move(s3_conf); }
+ Status set_conf(S3Conf s3_conf);
const S3Conf& s3_conf() const { return _s3_conf; }
std::shared_ptr<Aws::S3::S3Client> get_client() const {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
index 5bcb5123c64..a2603897047 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
@@ -105,7 +105,8 @@ public class S3Resource extends Resource {
properties.putIfAbsent(S3Properties.REGION, region);
String ak = properties.get(S3Properties.ACCESS_KEY);
String sk = properties.get(S3Properties.SECRET_KEY);
- CloudCredentialWithEndpoint credential = new
CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk);
+ String token = properties.get(S3Properties.SESSION_TOKEN);
+ CloudCredentialWithEndpoint credential = new
CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk, token);
if (needCheck) {
String bucketName = properties.get(S3Properties.BUCKET);
@@ -123,6 +124,7 @@ public class S3Resource extends Resource {
Map<String, String> propertiesPing = new HashMap<>();
propertiesPing.put(S3Properties.Env.ACCESS_KEY,
credential.getAccessKey());
propertiesPing.put(S3Properties.Env.SECRET_KEY,
credential.getSecretKey());
+ propertiesPing.put(S3Properties.Env.TOKEN,
credential.getSessionToken());
propertiesPing.put(S3Properties.Env.ENDPOINT,
credential.getEndpoint());
propertiesPing.put(S3Properties.Env.REGION, credential.getRegion());
propertiesPing.put(PropertyConverter.USE_PATH_STYLE,
@@ -188,6 +190,10 @@ public class S3Resource extends Resource {
writeLock();
for (Map.Entry<String, String> kv : properties.entrySet()) {
replaceIfEffectiveValue(this.properties, kv.getKey(),
kv.getValue());
+ if (kv.getKey().equals(S3Properties.Env.TOKEN)
+ || kv.getKey().equals(S3Properties.SESSION_TOKEN)) {
+ this.properties.put(kv.getKey(), kv.getValue());
+ }
}
++version;
writeUnlock();
@@ -197,11 +203,13 @@ public class S3Resource extends Resource {
private CloudCredentialWithEndpoint getS3PingCredentials(Map<String,
String> properties) {
String ak = properties.getOrDefault(S3Properties.ACCESS_KEY,
this.properties.get(S3Properties.ACCESS_KEY));
String sk = properties.getOrDefault(S3Properties.SECRET_KEY,
this.properties.get(S3Properties.SECRET_KEY));
+ String token = properties.getOrDefault(S3Properties.SESSION_TOKEN,
+ this.properties.get(S3Properties.SESSION_TOKEN));
String endpoint = properties.getOrDefault(S3Properties.ENDPOINT,
this.properties.get(S3Properties.ENDPOINT));
String pingEndpoint = "http://" + endpoint;
String region = S3Properties.getRegionOfEndpoint(pingEndpoint);
properties.putIfAbsent(S3Properties.REGION, region);
- return new CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk);
+ return new CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk,
token);
}
private boolean isNeedCheck(Map<String, String> newProperties) {
@@ -231,7 +239,9 @@ public class S3Resource extends Resource {
// it's dangerous to show password in show odbc resource,
// so we use empty string to replace the real password
if (entry.getKey().equals(S3Properties.Env.SECRET_KEY)
- || entry.getKey().equals(S3Properties.SECRET_KEY)) {
+ || entry.getKey().equals(S3Properties.SECRET_KEY)
+ || entry.getKey().equals(S3Properties.Env.TOKEN)
+ || entry.getKey().equals(S3Properties.SESSION_TOKEN)) {
result.addRow(Lists.newArrayList(name, lowerCaseType,
entry.getKey(), "******"));
} else {
result.addRow(Lists.newArrayList(name, lowerCaseType,
entry.getKey(), entry.getValue()));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
index 3297a4bd550..747da72a95b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
@@ -261,6 +261,7 @@ public class S3Properties extends BaseProperties {
s3Info.setRegion(properties.get(S3Properties.REGION));
s3Info.setAk(properties.get(S3Properties.ACCESS_KEY));
s3Info.setSk(properties.get(S3Properties.SECRET_KEY));
+ s3Info.setToken(properties.get(S3Properties.SESSION_TOKEN));
s3Info.setRootPath(properties.get(S3Properties.ROOT_PATH));
s3Info.setBucket(properties.get(S3Properties.BUCKET));
diff --git a/gensrc/thrift/AgentService.thrift
b/gensrc/thrift/AgentService.thrift
index a59717cfa4a..9eb4ece8d72 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -73,6 +73,7 @@ struct TS3StorageParam {
8: optional string root_path
9: optional string bucket
10: optional bool use_path_style = false
+ 11: optional string token
}
struct TStoragePolicy {
diff --git a/regression-test/suites/cold_heat_separation/policy/alter.groovy
b/regression-test/suites/cold_heat_separation/policy/alter.groovy
index 672f7942763..a04387774a6 100644
--- a/regression-test/suites/cold_heat_separation/policy/alter.groovy
+++ b/regression-test/suites/cold_heat_separation/policy/alter.groovy
@@ -39,6 +39,7 @@ suite("alter_policy") {
"AWS_ROOT_PATH" = "path/to/rootaaaa",
"AWS_ACCESS_KEY" = "bbba",
"AWS_SECRET_KEY" = "aaaa",
+ "AWS_TOKEN" = "session_token",
"AWS_MAX_CONNECTIONS" = "50",
"AWS_REQUEST_TIMEOUT_MS" = "3000",
"AWS_CONNECTION_TIMEOUT_MS" = "1000",
@@ -70,6 +71,10 @@ suite("alter_policy") {
ALTER RESOURCE "${resource_name}"
PROPERTIES("AWS_REQUEST_TIMEOUT_MS" = "7777");
"""
+ def alter_result_succ_8 = try_sql """
+ ALTER RESOURCE "${resource_name}" PROPERTIES("AWS_TOKEN" =
"new_session_token");
+ """
+
// errCode = 2, detailMessage = current not support modify property :
AWS_REGION
def alter_result_fail_1 = try_sql """
ALTER RESOURCE "${resource_name}" PROPERTIES("AWS_REGION" =
"8888");
@@ -112,6 +117,7 @@ suite("alter_policy") {
// [has_resouce_policy_alter, s3, AWS_REQUEST_TIMEOUT_MS, 7777],
// [has_resouce_policy_alter, s3, AWS_ROOT_PATH, path/to/rootaaaa],
// [has_resouce_policy_alter, s3, AWS_SECRET_KEY, ******],
+ // [has_resouce_policy_alter, s3, AWS_TOKEN, ******],
// [has_resouce_policy_alter, s3, id, {id}],
// [has_resouce_policy_alter, s3, type, s3]
// [has_resouce_policy_alter, s3, version, {version}]]
@@ -133,6 +139,8 @@ suite("alter_policy") {
assertEquals(show_alter_result[8][3], "10101010")
// AWS_SECRET_KEY
assertEquals(show_alter_result[9][3], "******")
+ // AWS_TOKEN
+ assertEquals(show_alter_result[10][3], "******")
}
def check_alter_resource_result_with_policy = { resource_name ->
@@ -151,6 +159,7 @@ suite("alter_policy") {
// [has_resouce_policy_alter, s3, AWS_REQUEST_TIMEOUT_MS, 7777],
// [has_resouce_policy_alter, s3, AWS_ROOT_PATH, path/to/rootaaaa],
// [has_resouce_policy_alter, s3, AWS_SECRET_KEY, ******],
+ // [has_resouce_policy_alter, s3, AWS_TOKEN, ******],
// [has_resouce_policy_alter, s3, id, {id}],
// [has_resouce_policy_alter, s3, type, s3]
// [has_resouce_policy_alter, s3, version, {version}]]
@@ -172,6 +181,8 @@ suite("alter_policy") {
assertEquals(show_alter_result[8][3], "path/to/rootaaaa")
// AWS_SECRET_KEY
assertEquals(show_alter_result[9][3], "******")
+ // AWS_TOKEN
+ assertEquals(show_alter_result[10][3], "******")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]