This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 95cb544b3d8 [Enhancement](s3-load) Add domain connection and aksk
correction check for S3 load (#36711)
95cb544b3d8 is described below
commit 95cb544b3d856f1beb07b51a5704f634840b6fa3
Author: Xin Liao <[email protected]>
AuthorDate: Tue Jul 2 21:44:24 2024 +0800
[Enhancement](s3-load) Add domain connection and aksk correction check for
S3 load (#36711)
Add domain connection and aksk correction check for S3 load before
actual execution.
---
.../java/org/apache/doris/analysis/LoadStmt.java | 88 +++++++++--
.../property/constants/S3Properties.java | 1 +
...t_domain_connection_and_ak_sk_correction.groovy | 161 +++++++++++++++++++++
3 files changed, 241 insertions(+), 9 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index d8d515fe6a4..1990078b46c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -21,10 +21,14 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.cloud.proto.Cloud.ObjectStoreInfoPB;
import org.apache.doris.cloud.security.SecurityChecker;
+import org.apache.doris.cloud.storage.RemoteBase;
+import org.apache.doris.cloud.storage.RemoteBase.ObjectInfo;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
@@ -500,7 +504,7 @@ public class LoadStmt extends DdlStmt {
}
} else if (brokerDesc != null) {
etlJobType = EtlJobType.BROKER;
- checkWhiteList();
+ checkS3Param();
} else if (isMysqlLoad) {
etlJobType = EtlJobType.LOCAL_FILE;
} else {
@@ -518,6 +522,26 @@ public class LoadStmt extends DdlStmt {
user = ConnectContext.get().getQualifiedUser();
}
+
+ private String getProviderFromEndpoint() {
+ Map<String, String> properties = brokerDesc.getProperties();
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ if (entry.getKey().equalsIgnoreCase(S3Properties.PROVIDER)) {
+ return entry.getValue();
+ }
+ }
+ return S3Properties.S3_PROVIDER;
+ }
+
+ private String getBucketFromFilePath(String filePath) throws Exception {
+ String[] parts = filePath.split("\\/\\/");
+ if (parts.length < 2) {
+ throw new Exception("filePath is not valid");
+ }
+ String buckt = parts[1].split("\\/")[0];
+ return buckt;
+ }
+
public String getComment() {
return comment;
}
@@ -597,7 +621,7 @@ public class LoadStmt extends DdlStmt {
}
}
- public void checkWhiteList() throws UserException {
+ public void checkS3Param() throws UserException {
Map<String, String> brokerDescProperties = brokerDesc.getProperties();
if (brokerDescProperties.containsKey(S3Properties.Env.ENDPOINT)
&&
brokerDescProperties.containsKey(S3Properties.Env.ACCESS_KEY)
@@ -606,17 +630,63 @@ public class LoadStmt extends DdlStmt {
String endpoint =
brokerDescProperties.get(S3Properties.Env.ENDPOINT);
endpoint = endpoint.replaceFirst("^http://", "");
endpoint = endpoint.replaceFirst("^https://", "");
- List<String> whiteList = new
ArrayList<>(Arrays.asList(Config.s3_load_endpoint_white_list));
- whiteList.removeIf(String::isEmpty);
- if (!whiteList.isEmpty() && !whiteList.contains(endpoint)) {
- throw new UserException("endpoint: " + endpoint
- + " is not in s3 load endpoint white list: " +
String.join(",", whiteList));
- }
brokerDescProperties.put(S3Properties.Env.ENDPOINT, endpoint);
- if (AzureProperties.checkAzureProviderPropertyExist(properties)) {
+ checkWhiteList(endpoint);
+ if
(AzureProperties.checkAzureProviderPropertyExist(brokerDescProperties)) {
return;
}
checkEndpoint(endpoint);
+ checkAkSk();
+ }
+ }
+
+ public void checkWhiteList(String endpoint) throws UserException {
+ List<String> whiteList = new
ArrayList<>(Arrays.asList(Config.s3_load_endpoint_white_list));
+ whiteList.removeIf(String::isEmpty);
+ if (!whiteList.isEmpty() && !whiteList.contains(endpoint)) {
+ throw new UserException("endpoint: " + endpoint
+ + " is not in s3 load endpoint white list: " +
String.join(",", whiteList));
}
}
+
+ private void checkAkSk() throws UserException {
+ RemoteBase remote = null;
+ ObjectInfo objectInfo = null;
+ try {
+ Map<String, String> brokerDescProperties =
brokerDesc.getProperties();
+ String provider = getProviderFromEndpoint();
+ for (DataDescription dataDescription : dataDescriptions) {
+ for (String filePath : dataDescription.getFilePaths()) {
+ String bucket = getBucketFromFilePath(filePath);
+ objectInfo = new
ObjectInfo(ObjectStoreInfoPB.Provider.valueOf(provider.toUpperCase()),
+
brokerDescProperties.get(S3Properties.Env.ACCESS_KEY),
+
brokerDescProperties.get(S3Properties.Env.SECRET_KEY),
+ bucket,
brokerDescProperties.get(S3Properties.Env.ENDPOINT),
+ brokerDescProperties.get(S3Properties.Env.REGION),
"");
+ remote = RemoteBase.newInstance(objectInfo);
+ // RemoteBase#headObject does not throw exception if key
does not exist.
+ remote.headObject("1");
+ remote.listObjects(null);
+ remote.close();
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed check object info={}", objectInfo, e);
+ String message = e.getMessage();
+ if (message != null) {
+ int index = message.indexOf("Error message=");
+ if (index != -1) {
+ message = message.substring(index);
+ }
+ }
+ throw new UserException(InternalErrorCode.GET_REMOTE_DATA_ERROR,
+ "Incorrect object storage info, " + message);
+ } finally {
+ if (remote != null) {
+ remote.close();
+ }
+ }
+
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
index d1b3b17e2da..a0ef74c7a96 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
@@ -58,6 +58,7 @@ public class S3Properties extends BaseProperties {
public static final String MAX_CONNECTIONS = "s3.connection.maximum";
public static final String REQUEST_TIMEOUT_MS =
"s3.connection.request.timeout";
public static final String CONNECTION_TIMEOUT_MS = "s3.connection.timeout";
+ public static final String S3_PROVIDER = "S3";
// required by storage policy
public static final String ROOT_PATH = "s3.root.path";
diff --git
a/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
b/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
new file mode 100644
index 00000000000..889da246d3b
--- /dev/null
+++
b/regression-test/suites/load_p0/broker_load/test_domain_connection_and_ak_sk_correction.groovy
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_domain_connection_and_ak_sk_correction", "load_p0") {
+ // create table
+ def tableName = 'test_domain_connection_and_ak_sk_correction'
+ def tableNameOrders = 'test_domain_connection_and_ak_sk_correction_orders'
+ sql """ DROP TABLE IF EXISTS ${tableName} FORCE"""
+ sql """ DROP TABLE IF EXISTS ${tableNameOrders} FORCE"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ P_PARTKEY INTEGER NOT NULL,
+ P_NAME VARCHAR(55) NOT NULL,
+ P_MFGR CHAR(25) NOT NULL,
+ P_BRAND CHAR(10) NOT NULL,
+ P_TYPE VARCHAR(25) NOT NULL,
+ P_SIZE INTEGER NOT NULL,
+ P_CONTAINER CHAR(10) NOT NULL,
+ P_RETAILPRICE DECIMAL(15,2) NOT NULL,
+ P_COMMENT VARCHAR(23) NOT NULL
+ )
+ DUPLICATE KEY(P_PARTKEY, P_NAME)
+ DISTRIBUTED BY HASH(P_PARTKEY) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableNameOrders} (
+ O_ORDERKEY INTEGER NOT NULL,
+ O_CUSTKEY INTEGER NOT NULL,
+ O_ORDERSTATUS CHAR(1) NOT NULL,
+ O_TOTALPRICE DECIMAL(15,2) NOT NULL,
+ O_ORDERDATE DATE NOT NULL,
+ O_ORDERPRIORITY CHAR(15) NOT NULL,
+ O_CLERK CHAR(15) NOT NULL,
+ O_SHIPPRIORITY INTEGER NOT NULL,
+ O_COMMENT VARCHAR(79) NOT NULL
+ )
+ DUPLICATE KEY(O_ORDERKEY, O_CUSTKEY)
+ DISTRIBUTED BY HASH(O_ORDERKEY) BUCKETS 32
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+
+ def label = UUID.randomUUID().toString().replace("-", "")
+ def result = sql """
+ LOAD LABEL ${label}
+ (
+ DATA
INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl")
+ INTO TABLE ${tableName}
+ COLUMNS TERMINATED BY "|"
+ (p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container,
p_retailprice, p_comment, temp)
+ )
+ WITH S3
+ (
+ "AWS_ENDPOINT" = "${getS3Endpoint()}",
+ "AWS_ACCESS_KEY" = "${getS3AK()}",
+ "AWS_SECRET_KEY" = "${getS3SK()}",
+ "AWS_REGION" = "${getS3Region()}"
+ );
+ """
+ logger.info("the first sql result is {}", result)
+
+ label = UUID.randomUUID().toString().replace("-", "")
+ try {
+ result = sql """
+ LOAD LABEL ${label}
+ (
+ DATA
INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl")
+ INTO TABLE ${tableName}
+ COLUMNS TERMINATED BY "|"
+ (p_partkey, p_name, p_mfgr, p_brand, p_type, p_size,
p_container, p_retailprice, p_comment, temp)
+ )
+ WITH S3
+ (
+ "AWS_ENDPOINT" = "${getS3Endpoint()}1",
+ "AWS_ACCESS_KEY" = "${getS3AK()}",
+ "AWS_SECRET_KEY" = "${getS3SK()}",
+ "AWS_REGION" = "${getS3Region()}"
+ );
+ """
+ logger.info("the second sql result is {}", result)
+ assertTrue(false. "The endpoint is wrong, so the connection test
should fale")
+ } catch (Exception e) {
+ logger.info("the second sql exception result is {}", e.getMessage())
+ assertTrue(e.getMessage().contains("Incorrect object storage info"),
e.getMessage())
+ }
+
+ label = UUID.randomUUID().toString().replace("-", "")
+ try {
+ result = sql """
+ LOAD LABEL ${label}
+ (
+ DATA
INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl")
+ INTO TABLE ${tableName}
+ COLUMNS TERMINATED BY "|"
+ (p_partkey, p_name, p_mfgr, p_brand, p_type, p_size,
p_container, p_retailprice, p_comment, temp)
+ )
+ WITH S3
+ (
+ "AWS_ENDPOINT" = "${getS3Endpoint()}",
+ "AWS_ACCESS_KEY" = "${getS3AK()}1",
+ "AWS_SECRET_KEY" = "${getS3SK()}",
+ "AWS_REGION" = "${getS3Region()}"
+ );
+ """
+ logger.info("the third sql result is {}", result)
+ assertTrue(false. "AK is wrong, so the correction of AKSK test should
fale")
+ } catch (Exception e) {
+ logger.info("the third sql exception result is {}", e.getMessage())
+ assertTrue(e.getMessage().contains("Incorrect object storage info"),
e.getMessage())
+ }
+
+ label = UUID.randomUUID().toString().replace("-", "")
+ try {
+ result = sql """
+ LOAD LABEL ${label}
+ (
+ DATA
INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/part.tbl")
+ INTO TABLE ${tableName}
+ COLUMNS TERMINATED BY "|"
+ (p_partkey, p_name, p_mfgr, p_brand, p_type, p_size,
p_container, p_retailprice, p_comment, temp),
+ DATA
INFILE("s3://${getS3BucketName()}1/regression/tpch/sf1/orders.tbl.1",
"s3://${getS3BucketName()}/regression/tpch/sf1/orders.tbl.2")
+ INTO TABLE ${tableNameOrders}
+ COLUMNS TERMINATED BY "|"
+ (o_orderkey, o_custkey, o_orderstatus, o_totalprice,
o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, temp)
+ )
+ WITH S3
+ (
+ "AWS_ENDPOINT" = "${getS3Endpoint()}",
+ "AWS_ACCESS_KEY" = "${getS3AK()}",
+ "AWS_SECRET_KEY" = "${getS3SK()}",
+ "AWS_REGION" = "${getS3Region()}"
+ );
+ """
+ logger.info("the fourth sql result is {}", result)
+ assertTrue(false. "in the second DATA INFILE, the first bucket is
wrong, so the sql should fail")
+ } catch (Exception e) {
+ logger.info("the fourth sql exception result is {}", e.getMessage())
+ assertTrue(e.getMessage().contains("Incorrect object storage info"),
e.getMessage())
+ }
+ sql """ DROP TABLE IF EXISTS ${tableName} FORCE"""
+ sql """ DROP TABLE IF EXISTS ${tableNameOrders} FORCE"""
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]