This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 28120ed99d7 branch-3.0 [opt](load) S3 Load and TVF support access
without AKSK (#53592) (#54040)
28120ed99d7 is described below
commit 28120ed99d767591a9727dddeb792adde02279e1
Author: Xin Liao <[email protected]>
AuthorDate: Tue Aug 12 10:07:57 2025 +0800
branch-3.0 [opt](load) S3 Load and TVF support access without AKSK (#53592)
(#54040)
---
be/src/util/s3_util.cpp | 19 +++-
.../java/org/apache/doris/common/util/S3Util.java | 6 ++
.../datasource/property/PropertyConverter.java | 8 ++
.../datasource/property/S3ClientBEProperties.java | 7 ++
.../property/constants/S3Properties.java | 2 +-
.../doris/tablefunction/S3TableValuedFunction.java | 4 +-
.../broker_load/test_s3_load_without_aksk.out | Bin 0 -> 611 bytes
.../data/load_p0/tvf/test_tvf_without_aksk.out | Bin 0 -> 611 bytes
...t_domain_connection_and_ak_sk_correction.groovy | 55 ----------
.../broker_load/test_s3_load_without_aksk.groovy | 111 +++++++++++++++++++++
.../load_p0/tvf/test_tvf_without_aksk.groovy | 60 +++++++++++
11 files changed, 209 insertions(+), 63 deletions(-)
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index f6505583337..d80e668c05c 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -82,12 +82,17 @@ doris::Status is_s3_conf_valid(const S3ClientConf& conf) {
}
if (conf.role_arn.empty()) {
- if (conf.ak.empty()) {
- return Status::InvalidArgument<false>("Invalid s3 conf, empty ak");
- }
- if (conf.sk.empty()) {
+ // Allow anonymous access when both ak and sk are empty
+ bool hasAk = !conf.ak.empty();
+ bool hasSk = !conf.sk.empty();
+
+ // Either both credentials are provided or both are empty (anonymous
access)
+ if (hasAk && conf.sk.empty()) {
return Status::InvalidArgument<false>("Invalid s3 conf, empty sk");
}
+ if (hasSk && conf.ak.empty()) {
+ return Status::InvalidArgument<false>("Invalid s3 conf, empty ak");
+ }
}
return Status::OK();
}
@@ -282,6 +287,12 @@ std::shared_ptr<Aws::Auth::AWSCredentialsProvider>
S3ClientFactory::get_aws_cred
s3_conf.role_arn, Aws::String(), s3_conf.external_id,
Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, stsClient);
}
+
+ // Support anonymous access for public datasets when no credentials are
provided
+ if (s3_conf.ak.empty() && s3_conf.sk.empty()) {
+ return std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>();
+ }
+
return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
index e204fab2e22..6ca4a99120b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
@@ -20,6 +20,7 @@ package org.apache.doris.common.util;
import org.apache.doris.common.credentials.CloudCredential;
import com.google.common.base.Strings;
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -144,6 +145,11 @@ public class S3Util {
}
}).build();
}
+
+ // For anonymous access (no credentials required)
+ if (Strings.isNullOrEmpty(accessKey) &&
Strings.isNullOrEmpty(secretKey)) {
+ return AnonymousCredentialsProvider.create();
+ }
return
AwsCredentialsProviderChain.of(SystemPropertyCredentialsProvider.create(),
EnvironmentVariableCredentialsProvider.create(),
WebIdentityTokenFileCredentialsProvider.create(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
index aa22a83e1b0..e0b8520d5ce 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
import org.apache.hadoop.fs.obs.OBSConstants;
import org.apache.hadoop.fs.obs.OBSFileSystem;
+import org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider;
@@ -318,6 +319,13 @@ public class PropertyConverter {
&&
!Strings.isNullOrEmpty(properties.get(S3Properties.EXTERNAL_ID))) {
LOG.warn("External ID is not supported for assumed role
credential provider");
}
+ } else if (Strings.isNullOrEmpty(credential.getAccessKey())
+ && Strings.isNullOrEmpty(credential.getSecretKey())) {
+ // if no access key and secret key, use anonymous credentials
+ s3Properties.put(Constants.AWS_CREDENTIALS_PROVIDER,
AnonymousAWSCredentialsProvider.class.getName());
+ // anonymous credentials should set max paging keys to avoid
exceeding the default limit
+ // of 1000, which may cause issues with some S3-compatible
services.
+ s3Properties.putIfAbsent(Constants.MAX_PAGING_KEYS, "1000");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java
index c4f4c01b0b0..3a4e8649faa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java
@@ -48,6 +48,13 @@ public class S3ClientBEProperties {
|| properties.containsKey(GCSProperties.ENDPOINT)
|| properties.containsKey(CosProperties.ENDPOINT)) {
return
getBeAWSPropertiesFromS3(S3Properties.prefixToS3(properties));
+ } else if (properties.containsKey(S3Properties.Env.ENDPOINT)) {
+ if (!properties.containsKey(S3Properties.Env.REGION)) {
+ String endpoint = properties.get(S3Properties.Env.ENDPOINT);
+ String region = PropertyConverter.checkRegion(endpoint,
properties.get(S3Properties.Env.REGION),
+ S3Properties.Env.REGION);
+ properties.put(S3Properties.Env.REGION, region);
+ }
}
return properties;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
index 82a71548b69..a727346e325 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
@@ -74,7 +74,7 @@ public class S3Properties extends BaseProperties {
public static final String VALIDITY_CHECK = "s3_validity_check";
public static final String PROVIDER = "provider";
public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT);
- public static final List<String> TVF_REQUIRED_FIELDS =
Arrays.asList(ACCESS_KEY, SECRET_KEY);
+ public static final List<String> TVF_REQUIRED_FIELDS = Arrays.asList();
public static final List<String> FS_KEYS = Arrays.asList(ENDPOINT, REGION,
ACCESS_KEY, SECRET_KEY, SESSION_TOKEN,
ROOT_PATH, BUCKET, MAX_CONNECTIONS, REQUEST_TIMEOUT_MS,
CONNECTION_TIMEOUT_MS);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
index 93dc28d84a3..3619821920e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
@@ -84,12 +84,10 @@ public class S3TableValuedFunction extends
ExternalFileTableValuedFunction {
// Azure could run without region
region = s3uri.getRegion().orElse("DUMMY-REGION");
} else {
- region = s3uri.getRegion().orElseThrow(() -> new
AnalysisException(
- String.format("Properties '%s' is required.",
S3Properties.REGION)));
+ region = PropertyConverter.checkRegion(endpoint,
s3uri.getRegion().orElse(""), S3Properties.REGION);
}
otherProps.put(S3Properties.REGION, region);
}
- checkNecessaryS3Properties(otherProps);
CloudCredentialWithEndpoint credential = new
CloudCredentialWithEndpoint(endpoint,
getOrDefaultAndRemove(otherProps, S3Properties.REGION, ""),
getOrDefaultAndRemove(otherProps, S3Properties.ACCESS_KEY, ""),
diff --git
a/regression-test/data/load_p0/broker_load/test_s3_load_without_aksk.out
b/regression-test/data/load_p0/broker_load/test_s3_load_without_aksk.out
new file mode 100644
index 00000000000..8b0290edbdd
Binary files /dev/null and
b/regression-test/data/load_p0/broker_load/test_s3_load_without_aksk.out differ
diff --git a/regression-test/data/load_p0/tvf/test_tvf_without_aksk.out
b/regression-test/data/load_p0/tvf/test_tvf_without_aksk.out
new file mode 100644
index 00000000000..8b0290edbdd
Binary files /dev/null and
b/regression-test/data/load_p0/tvf/test_tvf_without_aksk.out differ
diff --git
a/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
b/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
index 36073df4b07..dbefce7a8c3 100644
---
a/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
+++
b/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
@@ -105,61 +105,6 @@ suite("test_domain_connection_and_ak_sk_correction",
"load_p0") {
assertTrue(e.getMessage().contains("Failed to access object storage,
message="), e.getMessage())
}
- label = UUID.randomUUID().toString().replace("-", "")
- try {
- result = sql """
- LOAD LABEL ${label}
- (
- DATA
INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl")
- INTO TABLE ${tableName}
- COLUMNS TERMINATED BY "|"
- (p_partkey, p_name, p_mfgr, p_brand, p_type, p_size,
p_container, p_retailprice, p_comment, temp)
- )
- WITH S3
- (
- "AWS_ENDPOINT" = "${getS3Endpoint()}",
- "AWS_ACCESS_KEY" = "${getS3AK()}1",
- "AWS_SECRET_KEY" = "${getS3SK()}",
- "AWS_REGION" = "${getS3Region()}",
- "PROVIDER" = "${getS3Provider()}"
- );
- """
- logger.info("the third sql result is {}", result)
- assertTrue(false. "AK is wrong, so the correction of AKSK test should
fale")
- } catch (Exception e) {
- logger.info("the third sql exception result is {}", e.getMessage())
- assertTrue(e.getMessage().contains("Failed to access object storage,
message="), e.getMessage())
- }
-
- label = UUID.randomUUID().toString().replace("-", "")
- try {
- result = sql """
- LOAD LABEL ${label}
- (
- DATA
INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl")
- INTO TABLE ${tableName}
- COLUMNS TERMINATED BY "|"
- (p_partkey, p_name, p_mfgr, p_brand, p_type, p_size,
p_container, p_retailprice, p_comment, temp),
- DATA
INFILE("s3://${getS3BucketName()}1/regression/tpch/sf1/orders.tbl.1",
"s3://${getS3BucketName()}/regression/tpch/sf1/orders.tbl.2")
- INTO TABLE ${tableNameOrders}
- COLUMNS TERMINATED BY "|"
- (o_orderkey, o_custkey, o_orderstatus, o_totalprice,
o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, temp)
- )
- WITH S3
- (
- "AWS_ENDPOINT" = "${getS3Endpoint()}",
- "AWS_ACCESS_KEY" = "${getS3AK()}",
- "AWS_SECRET_KEY" = "${getS3SK()}",
- "AWS_REGION" = "${getS3Region()}",
- "PROVIDER" = "${getS3Provider()}"
- );
- """
- logger.info("the fourth sql result is {}", result)
- assertTrue(false. "in the second DATA INFILE, the first bucket is
wrong, so the sql should fail")
- } catch (Exception e) {
- logger.info("the fourth sql exception result is {}", e.getMessage())
- assertTrue(e.getMessage().contains("Failed to access object storage,
message="), e.getMessage())
- }
sql """ DROP TABLE IF EXISTS ${tableName} FORCE"""
sql """ DROP TABLE IF EXISTS ${tableNameOrders} FORCE"""
}
diff --git
a/regression-test/suites/load_p0/broker_load/test_s3_load_without_aksk.groovy
b/regression-test/suites/load_p0/broker_load/test_s3_load_without_aksk.groovy
new file mode 100644
index 00000000000..60b39c03657
--- /dev/null
+++
b/regression-test/suites/load_p0/broker_load/test_s3_load_without_aksk.groovy
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_s3_load_without_aksk", "load_p0") {
+ def tableName = "tbl_without_aksk"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+
+ sql """
+ CREATE TABLE ${tableName} (
+ user_id BIGINT NOT NULL COMMENT "用户 ID",
+ name VARCHAR(20) COMMENT "用户姓名",
+ age INT COMMENT "用户年龄"
+ ) DUPLICATE KEY(user_id)
+ DISTRIBUTED BY HASH(user_id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ def label = UUID.randomUUID().toString().replace("-", "0")
+
+ def sql_str = """
+ LOAD LABEL $label (
+ DATA
INFILE("s3://${s3BucketName}/regression/load/data/example_0.csv")
+ INTO TABLE $tableName
+ COLUMNS TERMINATED BY ","
+ )
+ WITH S3 (
+ "AWS_ENDPOINT" = "${getS3Endpoint()}",
+ "PROVIDER" = "${getS3Provider()}"
+ )
+ """
+ logger.info("submit sql: ${sql_str}");
+ sql """${sql_str}"""
+
+ def max_try_milli_secs = 600000
+ while (max_try_milli_secs > 0) {
+ String[][] result = sql """ show load where label="$label" order by
createtime desc limit 1; """
+ if (result[0][2].equals("FINISHED")) {
+ logger.info("Load FINISHED " + label)
+ break
+ }
+ if (result[0][2].equals("CANCELLED")) {
+ def reason = result[0][7]
+ logger.info("load failed, reason:$reason")
+ assertTrue(1 == 2)
+ break
+ }
+ Thread.sleep(1000)
+ max_try_milli_secs -= 1000
+ if(max_try_milli_secs <= 0) {
+ assertTrue(1 == 2, "load Timeout: $label")
+ }
+ }
+
+ qt_sql """ SELECT * FROM ${tableName} order by user_id """
+
+ label = UUID.randomUUID().toString().replace("-", "0")
+
+ sql_str = """
+ LOAD LABEL $label (
+ DATA
INFILE("s3://${s3BucketName}/regression/load/data/example_*.csv")
+ INTO TABLE $tableName
+ COLUMNS TERMINATED BY ","
+ )
+ WITH S3 (
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "PROVIDER" = "${getS3Provider()}"
+ )
+ """
+ logger.info("submit sql: ${sql_str}");
+ sql """${sql_str}"""
+
+ max_try_milli_secs = 600000
+ while (max_try_milli_secs > 0) {
+ String[][] result = sql """ show load where label="$label" order by
createtime desc limit 1; """
+ if (result[0][2].equals("FINISHED")) {
+ logger.info("Load FINISHED " + label)
+ break
+ }
+ if (result[0][2].equals("CANCELLED")) {
+ def reason = result[0][7]
+ logger.info("load failed, reason:$reason")
+ assertTrue(1 == 2)
+ break
+ }
+ Thread.sleep(1000)
+ max_try_milli_secs -= 1000
+ if(max_try_milli_secs <= 0) {
+ assertTrue(1 == 2, "load Timeout: $label")
+ }
+ }
+
+ qt_sql """ SELECT * FROM ${tableName} order by user_id """
+
+}
diff --git a/regression-test/suites/load_p0/tvf/test_tvf_without_aksk.groovy
b/regression-test/suites/load_p0/tvf/test_tvf_without_aksk.groovy
new file mode 100644
index 00000000000..622fd5b8276
--- /dev/null
+++ b/regression-test/suites/load_p0/tvf/test_tvf_without_aksk.groovy
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_tvf_without_aksk", "load_p0") {
+ def tableName = "tbl_without_aksk"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+
+ sql """
+ CREATE TABLE ${tableName} (
+ user_id BIGINT NOT NULL COMMENT "用户 ID",
+ name VARCHAR(20) COMMENT "用户姓名",
+ age INT COMMENT "用户年龄"
+ ) DUPLICATE KEY(user_id)
+ DISTRIBUTED BY HASH(user_id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ def label = UUID.randomUUID().toString().replace("-", "0")
+
+ sql """
+ INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_0.csv",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "column_separator" = ",",
+ "format" = "csv"
+ );
+ """
+ qt_sql """ SELECT * FROM ${tableName} order by user_id """
+
+ sql """
+ INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_*.csv",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "column_separator" = ",",
+ "format" = "csv"
+ );
+ """
+ qt_sql """ SELECT * FROM ${tableName} order by user_id """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]