This is an automated email from the ASF dual-hosted git repository.

yma pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 715fc495ee [GLUTEN-7964][VL] Support S3 Bucket Config (#8123)
715fc495ee is described below

commit 715fc495ee4937b135d8967f723057281e12064d
Author: Deepak Majeti <[email protected]>
AuthorDate: Wed Jan 1 02:43:18 2025 -0500

    [GLUTEN-7964][VL] Support S3 Bucket Config (#8123)
---
 cpp/velox/utils/ConfigExtractor.cc | 170 ++++++++++++++++++++-----------------
 1 file changed, 91 insertions(+), 79 deletions(-)

diff --git a/cpp/velox/utils/ConfigExtractor.cc 
b/cpp/velox/utils/ConfigExtractor.cc
index 78366b3756..9f2f8e0a95 100644
--- a/cpp/velox/utils/ConfigExtractor.cc
+++ b/cpp/velox/utils/ConfigExtractor.cc
@@ -28,17 +28,6 @@ namespace {
 
 const std::string kVeloxFileHandleCacheEnabled = 
"spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled";
 const bool kVeloxFileHandleCacheEnabledDefault = false;
-
-// Log granularity of AWS C++ SDK
-const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel";
-const std::string kVeloxAwsSdkLogLevelDefault = "FATAL";
-// Retry mode for AWS s3
-const std::string kVeloxS3RetryMode = "spark.gluten.velox.fs.s3a.retry.mode";
-const std::string kVeloxS3RetryModeDefault = "legacy";
-// Connection timeout for AWS s3
-const std::string kVeloxS3ConnectTimeout = 
"spark.gluten.velox.fs.s3a.connect.timeout";
-// Using default fs.s3a.connection.timeout value in hadoop
-const std::string kVeloxS3ConnectTimeoutDefault = "200s";
 } // namespace
 
 namespace gluten {
@@ -62,76 +51,99 @@ std::shared_ptr<facebook::velox::config::ConfigBase> 
getHiveConfig(
   std::unordered_map<std::string, std::string> hiveConfMap;
 
 #ifdef ENABLE_S3
-  std::string awsAccessKey = 
conf->get<std::string>("spark.hadoop.fs.s3a.access.key", "");
-  std::string awsSecretKey = 
conf->get<std::string>("spark.hadoop.fs.s3a.secret.key", "");
-  std::string awsEndpoint = 
conf->get<std::string>("spark.hadoop.fs.s3a.endpoint", "");
-  bool sslEnabled = 
conf->get<bool>("spark.hadoop.fs.s3a.connection.ssl.enabled", false);
-  bool pathStyleAccess = 
conf->get<bool>("spark.hadoop.fs.s3a.path.style.access", false);
-  bool useInstanceCredentials = 
conf->get<bool>("spark.hadoop.fs.s3a.use.instance.credentials", false);
-  std::string iamRole = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role", 
"");
-  std::string iamRoleSessionName = 
conf->get<std::string>("spark.hadoop.fs.s3a.iam.role.session.name", "");
-  std::string retryMaxAttempts = 
conf->get<std::string>("spark.hadoop.fs.s3a.retry.limit", "20");
-  std::string retryMode = conf->get<std::string>(kVeloxS3RetryMode, 
kVeloxS3RetryModeDefault);
-  std::string maxConnections = 
conf->get<std::string>("spark.hadoop.fs.s3a.connection.maximum", "15");
-  std::string connectTimeout = conf->get<std::string>(kVeloxS3ConnectTimeout, 
kVeloxS3ConnectTimeoutDefault);
-
-  std::string awsSdkLogLevel = conf->get<std::string>(kVeloxAwsSdkLogLevel, 
kVeloxAwsSdkLogLevelDefault);
-
-  const char* envAwsAccessKey = std::getenv("AWS_ACCESS_KEY_ID");
-  if (envAwsAccessKey != nullptr) {
-    awsAccessKey = std::string(envAwsAccessKey);
-  }
-  const char* envAwsSecretKey = std::getenv("AWS_SECRET_ACCESS_KEY");
-  if (envAwsSecretKey != nullptr) {
-    awsSecretKey = std::string(envAwsSecretKey);
-  }
-  const char* envAwsEndpoint = std::getenv("AWS_ENDPOINT");
-  if (envAwsEndpoint != nullptr) {
-    awsEndpoint = std::string(envAwsEndpoint);
-  }
-  const char* envRetryMaxAttempts = std::getenv("AWS_MAX_ATTEMPTS");
-  if (envRetryMaxAttempts != nullptr) {
-    retryMaxAttempts = std::string(envRetryMaxAttempts);
-  }
-  const char* envRetryMode = std::getenv("AWS_RETRY_MODE");
-  if (envRetryMode != nullptr) {
-    retryMode = std::string(envRetryMode);
-  }
-
-  if (useInstanceCredentials) {
-    hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
-        
facebook::velox::filesystems::S3Config::Keys::kUseInstanceCredentials)] = 
"true";
-  } else if (!iamRole.empty()) {
-    hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
-        facebook::velox::filesystems::S3Config::Keys::kIamRole)] = iamRole;
-    if (!iamRoleSessionName.empty()) {
-      hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
-          facebook::velox::filesystems::S3Config::Keys::kIamRoleSessionName)] 
= iamRoleSessionName;
+  using namespace facebook::velox::filesystems;
+  std::string_view kSparkHadoopPrefix = "spark.hadoop.fs.s3a.";
+  std::string_view kSparkHadoopBucketPrefix = "spark.hadoop.fs.s3a.bucket.";
+
+  // Log granularity of AWS C++ SDK
+  const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel";
+  const std::string kVeloxAwsSdkLogLevelDefault = "FATAL";
+
+  const std::unordered_map<S3Config::Keys, std::pair<std::string, 
std::optional<std::string>>> sparkSuffixes = {
+      {S3Config::Keys::kAccessKey, std::make_pair("access.key", std::nullopt)},
+      {S3Config::Keys::kSecretKey, std::make_pair("secret.key", std::nullopt)},
+      {S3Config::Keys::kEndpoint, std::make_pair("endpoint", std::nullopt)},
+      {S3Config::Keys::kSSLEnabled, std::make_pair("connection.ssl.enabled", 
"false")},
+      {S3Config::Keys::kPathStyleAccess, std::make_pair("path.style.access", 
"false")},
+      {S3Config::Keys::kMaxAttempts, std::make_pair("retry.limit", 
std::nullopt)},
+      {S3Config::Keys::kRetryMode, std::make_pair("retry.mode", "legacy")},
+      {S3Config::Keys::kMaxConnections, std::make_pair("connection.maximum", 
"15")},
+      {S3Config::Keys::kConnectTimeout, std::make_pair("connection.timeout", 
"200s")},
+      {S3Config::Keys::kUseInstanceCredentials, 
std::make_pair("instance.credentials", "false")},
+      {S3Config::Keys::kIamRole, std::make_pair("iam.role", std::nullopt)},
+      {S3Config::Keys::kIamRoleSessionName, 
std::make_pair("iam.role.session.name", "gluten-session")},
+  };
+
+  // get Velox S3 config key from Spark Suffix.
+  auto getVeloxKey = [&](std::string_view suffix) {
+    for (const auto& [key, value] : sparkSuffixes) {
+      if (value.first == suffix) {
+        return std::optional<S3Config::Keys>(key);
+      }
+    }
+    return std::optional<S3Config::Keys>(std::nullopt);
+  };
+
+  auto sparkBaseConfigValue = [&](S3Config::Keys key) {
+    std::stringstream ss;
+    auto keyValue = sparkSuffixes.find(key)->second;
+    ss << kSparkHadoopPrefix << keyValue.first;
+    auto sparkKey = ss.str();
+    if (conf->valueExists(sparkKey)) {
+      return 
static_cast<std::optional<std::string>>(conf->get<std::string>(sparkKey));
+    }
+    // Return default value.
+    return keyValue.second;
+  };
+
+  auto setConfigIfPresent = [&](S3Config::Keys key) {
+    auto sparkConfig = sparkBaseConfigValue(key);
+    if (sparkConfig.has_value()) {
+      hiveConfMap[S3Config::baseConfigKey(key)] = sparkConfig.value();
+    }
+  };
+
+  auto setFromEnvOrConfigIfPresent = [&](std::string_view envName, 
S3Config::Keys key) {
+    const char* envValue = std::getenv(envName.data());
+    if (envValue != nullptr) {
+      hiveConfMap[S3Config::baseConfigKey(key)] = std::string(envValue);
+    } else {
+      setConfigIfPresent(key);
+    }
+  };
+
+  setFromEnvOrConfigIfPresent("AWS_ENDPOINT", S3Config::Keys::kEndpoint);
+  setFromEnvOrConfigIfPresent("AWS_MAX_ATTEMPTS", 
S3Config::Keys::kMaxAttempts);
+  setFromEnvOrConfigIfPresent("AWS_RETRY_MODE", S3Config::Keys::kRetryMode);
+  setFromEnvOrConfigIfPresent("AWS_ACCESS_KEY_ID", S3Config::Keys::kAccessKey);
+  setFromEnvOrConfigIfPresent("AWS_SECRET_ACCESS_KEY", 
S3Config::Keys::kSecretKey);
+  setConfigIfPresent(S3Config::Keys::kUseInstanceCredentials);
+  setConfigIfPresent(S3Config::Keys::kIamRole);
+  setConfigIfPresent(S3Config::Keys::kIamRoleSessionName);
+  setConfigIfPresent(S3Config::Keys::kSSLEnabled);
+  setConfigIfPresent(S3Config::Keys::kPathStyleAccess);
+  setConfigIfPresent(S3Config::Keys::kMaxConnections);
+  setConfigIfPresent(S3Config::Keys::kConnectTimeout);
+
+  hiveConfMap[facebook::velox::filesystems::S3Config::kS3LogLevel] =
+      conf->get<std::string>(kVeloxAwsSdkLogLevel, 
kVeloxAwsSdkLogLevelDefault);
+  ;
+
+  // Convert all Spark bucket configs to Velox bucket configs.
+  for (const auto& [key, value] : conf->rawConfigs()) {
+    if (key.find(kSparkHadoopBucketPrefix) == 0) {
+      std::string_view skey = key;
+      auto remaining = skey.substr(kSparkHadoopBucketPrefix.size());
+      int dot = remaining.find(".");
+      auto bucketName = remaining.substr(0, dot);
+      auto suffix = remaining.substr(dot + 1);
+      auto veloxKey = getVeloxKey(suffix);
+
+      if (veloxKey.has_value()) {
+        hiveConfMap[S3Config::bucketConfigKey(veloxKey.value(), bucketName)] = 
value;
+      }
     }
-  } else {
-    hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
-        facebook::velox::filesystems::S3Config::Keys::kAccessKey)] = 
awsAccessKey;
-    hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
-        facebook::velox::filesystems::S3Config::Keys::kSecretKey)] = 
awsSecretKey;
-  }
-  // Only need to set s3 endpoint when not use instance credentials.
-  if (!useInstanceCredentials) {
-    hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
-        facebook::velox::filesystems::S3Config::Keys::kEndpoint)] = 
awsEndpoint;
   }
-  hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
-      facebook::velox::filesystems::S3Config::Keys::kSSLEnabled)] = sslEnabled 
? "true" : "false";
-  hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
-      facebook::velox::filesystems::S3Config::Keys::kPathStyleAccess)] = 
pathStyleAccess ? "true" : "false";
-  hiveConfMap[facebook::velox::filesystems::S3Config::kS3LogLevel] = 
awsSdkLogLevel;
-  hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
-      facebook::velox::filesystems::S3Config::Keys::kMaxAttempts)] = 
retryMaxAttempts;
-  hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
-      facebook::velox::filesystems::S3Config::Keys::kRetryMode)] = retryMode;
-  hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
-      facebook::velox::filesystems::S3Config::Keys::kMaxConnections)] = 
maxConnections;
-  hiveConfMap[facebook::velox::filesystems::S3Config::baseConfigKey(
-      facebook::velox::filesystems::S3Config::Keys::kConnectTimeout)] = 
connectTimeout;
 #endif
 
 #ifdef ENABLE_GCS


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to