This is an automated email from the ASF dual-hosted git repository.
yma pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 715fc495ee [GLUTEN-7964][VL] Support S3 Bucket Config (#8123)
715fc495ee is described below
commit 715fc495ee4937b135d8967f723057281e12064d
Author: Deepak Majeti <[email protected]>
AuthorDate: Wed Jan 1 02:43:18 2025 -0500
[GLUTEN-7964][VL] Support S3 Bucket Config (#8123)
---
cpp/velox/utils/ConfigExtractor.cc | 170 ++++++++++++++++++++-----------------
1 file changed, 91 insertions(+), 79 deletions(-)
diff --git a/cpp/velox/utils/ConfigExtractor.cc
b/cpp/velox/utils/ConfigExtractor.cc
index 78366b3756..9f2f8e0a95 100644
--- a/cpp/velox/utils/ConfigExtractor.cc
+++ b/cpp/velox/utils/ConfigExtractor.cc
@@ -28,17 +28,6 @@ namespace {
const std::string kVeloxFileHandleCacheEnabled =
"spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled";
const bool kVeloxFileHandleCacheEnabledDefault = false;
-
-// Log granularity of AWS C++ SDK
-const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel";
-const std::string kVeloxAwsSdkLogLevelDefault = "FATAL";
-// Retry mode for AWS s3
-const std::string kVeloxS3RetryMode = "spark.gluten.velox.fs.s3a.retry.mode";
-const std::string kVeloxS3RetryModeDefault = "legacy";
-// Connection timeout for AWS s3
-const std::string kVeloxS3ConnectTimeout =
"spark.gluten.velox.fs.s3a.connect.timeout";
-// Using default fs.s3a.connection.timeout value in hadoop
-const std::string kVeloxS3ConnectTimeoutDefault = "200s";
} // namespace
namespace gluten {
@@ -62,76 +51,99 @@ std::shared_ptr<facebook::velox::config::ConfigBase>
getHiveConfig(
std::unordered_map<std::string, std::string> hiveConfMap;
#ifdef ENABLE_S3
- std::string awsAccessKey =
conf->get<std::string>("spark.hadoop.fs.s3a.access.key", "");
- std::string awsSecretKey =
conf->get<std::string>("spark.hadoop.fs.s3a.secret.key", "");
- std::string awsEndpoint =
conf->get<std::string>("spark.hadoop.fs.s3a.endpoint", "");
- bool sslEnabled =
conf->get<bool>("spark.hadoop.fs.s3a.connection.ssl.enabled", false);
- bool pathStyleAccess =
conf->get<bool>("spark.hadoop.fs.s3a.path.style.access", false);
- bool useInstanceCredentials =
conf->get<bool>("spark.hadoop.fs.s3a.use.instance.credentials", false);
- std::string iamRole = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role",
"");
- std::string iamRoleSessionName =
conf->get<std::string>("spark.hadoop.fs.s3a.iam.role.session.name", "");
- std::string retryMaxAttempts =
conf->get<std::string>("spark.hadoop.fs.s3a.retry.limit", "20");
- std::string retryMode = conf->get<std::string>(kVeloxS3RetryMode,
kVeloxS3RetryModeDefault);
- std::string maxConnections =
conf->get<std::string>("spark.hadoop.fs.s3a.connection.maximum", "15");
- std::string connectTimeout = conf->get<std::string>(kVeloxS3ConnectTimeout,
kVeloxS3ConnectTimeoutDefault);
-
- std::string awsSdkLogLevel = conf->get<std::string>(kVeloxAwsSdkLogLevel,
kVeloxAwsSdkLogLevelDefault);
-
- const char* envAwsAccessKey = std::getenv("AWS_ACCESS_KEY_ID");
- if (envAwsAccessKey != nullptr) {
- awsAccessKey = std::string(envAwsAccessKey);
- }
- const char* envAwsSecretKey = std::getenv("AWS_SECRET_ACCESS_KEY");
- if (envAwsSecretKey != nullptr) {
- awsSecretKey = std::string(envAwsSecretKey);
- }
- const char* envAwsEndpoint = std::getenv("AWS_ENDPOINT");
- if (envAwsEndpoint != nullptr) {
- awsEndpoint = std::string(envAwsEndpoint);
- }
- const char* envRetryMaxAttempts = std::getenv("AWS_MAX_ATTEMPTS");
- if (envRetryMaxAttempts != nullptr) {
- retryMaxAttempts = std::string(envRetryMaxAttempts);
- }
- const char* envRetryMode = std::getenv("AWS_RETRY_MODE");
- if (envRetryMode != nullptr) {
- retryMode = std::string(envRetryMode);
- }
-
- if (useInstanceCredentials) {
- hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
-
facebook::velox::filesystems::S3Config::Keys::kUseInstanceCredentials)] =
"true";
- } else if (!iamRole.empty()) {
- hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
- facebook::velox::filesystems::S3Config::Keys::kIamRole)] = iamRole;
- if (!iamRoleSessionName.empty()) {
- hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
- facebook::velox::filesystems::S3Config::Keys::kIamRoleSessionName)]
= iamRoleSessionName;
+ using namespace facebook::velox::filesystems;
+ std::string_view kSparkHadoopPrefix = "spark.hadoop.fs.s3a.";
+ std::string_view kSparkHadoopBucketPrefix = "spark.hadoop.fs.s3a.bucket.";
+
+ // Log granularity of AWS C++ SDK
+ const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel";
+ const std::string kVeloxAwsSdkLogLevelDefault = "FATAL";
+
+ const std::unordered_map<S3Config::Keys, std::pair<std::string,
std::optional<std::string>>> sparkSuffixes = {
+ {S3Config::Keys::kAccessKey, std::make_pair("access.key", std::nullopt)},
+ {S3Config::Keys::kSecretKey, std::make_pair("secret.key", std::nullopt)},
+ {S3Config::Keys::kEndpoint, std::make_pair("endpoint", std::nullopt)},
+ {S3Config::Keys::kSSLEnabled, std::make_pair("connection.ssl.enabled",
"false")},
+ {S3Config::Keys::kPathStyleAccess, std::make_pair("path.style.access",
"false")},
+ {S3Config::Keys::kMaxAttempts, std::make_pair("retry.limit",
std::nullopt)},
+ {S3Config::Keys::kRetryMode, std::make_pair("retry.mode", "legacy")},
+ {S3Config::Keys::kMaxConnections, std::make_pair("connection.maximum",
"15")},
+ {S3Config::Keys::kConnectTimeout, std::make_pair("connection.timeout",
"200s")},
+ {S3Config::Keys::kUseInstanceCredentials,
std::make_pair("instance.credentials", "false")},
+ {S3Config::Keys::kIamRole, std::make_pair("iam.role", std::nullopt)},
+ {S3Config::Keys::kIamRoleSessionName,
std::make_pair("iam.role.session.name", "gluten-session")},
+ };
+
+ // get Velox S3 config key from Spark Suffix.
+ auto getVeloxKey = [&](std::string_view suffix) {
+ for (const auto& [key, value] : sparkSuffixes) {
+ if (value.first == suffix) {
+ return std::optional<S3Config::Keys>(key);
+ }
+ }
+ return std::optional<S3Config::Keys>(std::nullopt);
+ };
+
+ auto sparkBaseConfigValue = [&](S3Config::Keys key) {
+ std::stringstream ss;
+ auto keyValue = sparkSuffixes.find(key)->second;
+ ss << kSparkHadoopPrefix << keyValue.first;
+ auto sparkKey = ss.str();
+ if (conf->valueExists(sparkKey)) {
+ return
static_cast<std::optional<std::string>>(conf->get<std::string>(sparkKey));
+ }
+ // Return default value.
+ return keyValue.second;
+ };
+
+ auto setConfigIfPresent = [&](S3Config::Keys key) {
+ auto sparkConfig = sparkBaseConfigValue(key);
+ if (sparkConfig.has_value()) {
+ hiveConfMap[S3Config::baseConfigKey(key)] = sparkConfig.value();
+ }
+ };
+
+ auto setFromEnvOrConfigIfPresent = [&](std::string_view envName,
S3Config::Keys key) {
+ const char* envValue = std::getenv(envName.data());
+ if (envValue != nullptr) {
+ hiveConfMap[S3Config::baseConfigKey(key)] = std::string(envValue);
+ } else {
+ setConfigIfPresent(key);
+ }
+ };
+
+ setFromEnvOrConfigIfPresent("AWS_ENDPOINT", S3Config::Keys::kEndpoint);
+ setFromEnvOrConfigIfPresent("AWS_MAX_ATTEMPTS",
S3Config::Keys::kMaxAttempts);
+ setFromEnvOrConfigIfPresent("AWS_RETRY_MODE", S3Config::Keys::kRetryMode);
+ setFromEnvOrConfigIfPresent("AWS_ACCESS_KEY_ID", S3Config::Keys::kAccessKey);
+ setFromEnvOrConfigIfPresent("AWS_SECRET_ACCESS_KEY",
S3Config::Keys::kSecretKey);
+ setConfigIfPresent(S3Config::Keys::kUseInstanceCredentials);
+ setConfigIfPresent(S3Config::Keys::kIamRole);
+ setConfigIfPresent(S3Config::Keys::kIamRoleSessionName);
+ setConfigIfPresent(S3Config::Keys::kSSLEnabled);
+ setConfigIfPresent(S3Config::Keys::kPathStyleAccess);
+ setConfigIfPresent(S3Config::Keys::kMaxConnections);
+ setConfigIfPresent(S3Config::Keys::kConnectTimeout);
+
+ hiveConfMap[facebook::velox::filesystems::S3Config::kS3LogLevel] =
+ conf->get<std::string>(kVeloxAwsSdkLogLevel,
kVeloxAwsSdkLogLevelDefault);
+ ;
+
+ // Convert all Spark bucket configs to Velox bucket configs.
+ for (const auto& [key, value] : conf->rawConfigs()) {
+ if (key.find(kSparkHadoopBucketPrefix) == 0) {
+ std::string_view skey = key;
+ auto remaining = skey.substr(kSparkHadoopBucketPrefix.size());
+ int dot = remaining.find(".");
+ auto bucketName = remaining.substr(0, dot);
+ auto suffix = remaining.substr(dot + 1);
+ auto veloxKey = getVeloxKey(suffix);
+
+ if (veloxKey.has_value()) {
+ hiveConfMap[S3Config::bucketConfigKey(veloxKey.value(), bucketName)] =
value;
+ }
}
- } else {
- hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
- facebook::velox::filesystems::S3Config::Keys::kAccessKey)] =
awsAccessKey;
- hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
- facebook::velox::filesystems::S3Config::Keys::kSecretKey)] =
awsSecretKey;
- }
- // Only need to set s3 endpoint when not use instance credentials.
- if (!useInstanceCredentials) {
- hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
- facebook::velox::filesystems::S3Config::Keys::kEndpoint)] =
awsEndpoint;
}
- hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
- facebook::velox::filesystems::S3Config::Keys::kSSLEnabled)] = sslEnabled
? "true" : "false";
- hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
- facebook::velox::filesystems::S3Config::Keys::kPathStyleAccess)] =
pathStyleAccess ? "true" : "false";
- hiveConfMap[facebook::velox::filesystems::S3Config::kS3LogLevel] =
awsSdkLogLevel;
- hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
- facebook::velox::filesystems::S3Config::Keys::kMaxAttempts)] =
retryMaxAttempts;
- hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
- facebook::velox::filesystems::S3Config::Keys::kRetryMode)] = retryMode;
- hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
- facebook::velox::filesystems::S3Config::Keys::kMaxConnections)] =
maxConnections;
- hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
- facebook::velox::filesystems::S3Config::Keys::kConnectTimeout)] =
connectTimeout;
#endif
#ifdef ENABLE_GCS
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]