This is an automated email from the ASF dual-hosted git repository.

felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new cf04f0fe3 [GLUTEN-5659][VL] Add more configs for AWS s3 (#5660)
cf04f0fe3 is described below

commit cf04f0fe3e338169492a28a35cb3562d5d29cdaa
Author: Yan Ma <[email protected]>
AuthorDate: Tue Jun 25 10:34:50 2024 +0800

    [GLUTEN-5659][VL] Add more configs for AWS s3 (#5660)
    
    Add more configs for AWS s3
    
    spark.gluten.velox.fs.s3a.retry.mode
    spark.gluten.velox.fs.s3a.connect.timeout
    spark.hadoop.fs.s3a.retry.limit
    spark.hadoop.fs.s3a.connection.maximum
---
 cpp/velox/utils/ConfigExtractor.cc                 | 23 +++++++++++++++++
 docs/Configuration.md                              |  2 ++
 .../scala/org/apache/gluten/GlutenConfig.scala     | 30 ++++++++++++++++++++++
 3 files changed, 55 insertions(+)

diff --git a/cpp/velox/utils/ConfigExtractor.cc 
b/cpp/velox/utils/ConfigExtractor.cc
index a71f14322..816166351 100644
--- a/cpp/velox/utils/ConfigExtractor.cc
+++ b/cpp/velox/utils/ConfigExtractor.cc
@@ -34,6 +34,13 @@ const bool kVeloxFileHandleCacheEnabledDefault = false;
 // Log granularity of AWS C++ SDK
 const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel";
 const std::string kVeloxAwsSdkLogLevelDefault = "FATAL";
+// Retry mode for AWS s3
+const std::string kVeloxS3RetryMode = "spark.gluten.velox.fs.s3a.retry.mode";
+const std::string kVeloxS3RetryModeDefault = "legacy";
+// Connection timeout for AWS s3
+const std::string kVeloxS3ConnectTimeout = 
"spark.gluten.velox.fs.s3a.connect.timeout";
+// Using default fs.s3a.connection.timeout value in hadoop
+const std::string kVeloxS3ConnectTimeoutDefault = "200s";
 } // namespace
 
 namespace gluten {
@@ -64,6 +71,10 @@ std::shared_ptr<facebook::velox::core::MemConfig> 
getHiveConfig(std::shared_ptr<
   bool useInstanceCredentials = 
conf->get<bool>("spark.hadoop.fs.s3a.use.instance.credentials", false);
   std::string iamRole = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role", 
"");
   std::string iamRoleSessionName = 
conf->get<std::string>("spark.hadoop.fs.s3a.iam.role.session.name", "");
+  std::string retryMaxAttempts = 
conf->get<std::string>("spark.hadoop.fs.s3a.retry.limit", "20");
+  std::string retryMode = conf->get<std::string>(kVeloxS3RetryMode, 
kVeloxS3RetryModeDefault);
+  std::string maxConnections = 
conf->get<std::string>("spark.hadoop.fs.s3a.connection.maximum", "15");
+  std::string connectTimeout = conf->get<std::string>(kVeloxS3ConnectTimeout, 
kVeloxS3ConnectTimeoutDefault);
 
   std::string awsSdkLogLevel = conf->get<std::string>(kVeloxAwsSdkLogLevel, 
kVeloxAwsSdkLogLevelDefault);
 
@@ -79,6 +90,14 @@ std::shared_ptr<facebook::velox::core::MemConfig> 
getHiveConfig(std::shared_ptr<
   if (envAwsEndpoint != nullptr) {
     awsEndpoint = std::string(envAwsEndpoint);
   }
+  const char* envRetryMaxAttempts = std::getenv("AWS_MAX_ATTEMPTS");
+  if (envRetryMaxAttempts != nullptr) {
+    retryMaxAttempts = std::string(envRetryMaxAttempts);
+  }
+  const char* envRetryMode = std::getenv("AWS_RETRY_MODE");
+  if (envRetryMode != nullptr) {
+    retryMode = std::string(envRetryMode);
+  }
 
   if (useInstanceCredentials) {
     
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3UseInstanceCredentials]
 = "true";
@@ -98,6 +117,10 @@ std::shared_ptr<facebook::velox::core::MemConfig> 
getHiveConfig(std::shared_ptr<
   hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3SSLEnabled] = 
sslEnabled ? "true" : "false";
   
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3PathStyleAccess] = 
pathStyleAccess ? "true" : "false";
   hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3LogLevel] = 
awsSdkLogLevel;
+  hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3MaxAttempts] = 
retryMaxAttempts;
+  hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3RetryMode] = 
retryMode;
+  hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3MaxConnections] 
= maxConnections;
+  hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3ConnectTimeout] 
= connectTimeout;
 #endif
 
 #ifdef ENABLE_GCS
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 089675286..2c2bd4de1 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -89,6 +89,8 @@ The following configurations are related to Velox settings.
 | spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes            | Set 
the max coalesced bytes for velox file scan.                                    
                                                               |                
   |
 | spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct          | Set 
prefetch cache min pct for velox file scan.                                     
                                                               |                
   |
 | spark.gluten.velox.awsSdkLogLevel                                    | Log 
granularity of AWS C++ SDK in velox.                                            
                                                               | FATAL          
   |
+| spark.gluten.velox.fs.s3a.retry.mode                                 | Retry 
mode for AWS s3 connection error, can be "legacy", "standard" and "adaptive".   
                                                             | legacy           
 |
+| spark.gluten.velox.fs.s3a.connect.timeout                            | 
Timeout for AWS s3 connection.                                                  
                                                                   | 1s         
       |
 | spark.gluten.sql.columnar.backend.velox.orc.scan.enabled             | 
Enable velox orc scan. If disabled, vanilla spark orc scan will be used.        
                                                                   | true       
       |
 | spark.gluten.sql.complexType.scan.fallback.enabled                   | Force 
fallback for complex type scan, including struct, map, array.                   
                                                             | true             
 |
 
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 4b4e29e7d..cc2d6ac5f 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -436,6 +436,10 @@ class GlutenConfig(conf: SQLConf) extends Logging {
 
   def awsSdkLogLevel: String = conf.getConf(AWS_SDK_LOG_LEVEL)
 
+  def awsS3RetryMode: String = conf.getConf(AWS_S3_RETRY_MODE)
+
+  def awsConnectionTimeout: String = conf.getConf(AWS_S3_CONNECT_TIMEOUT)
+
   def enableCastAvgAggregateFunction: Boolean = 
conf.getConf(COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED)
 
   def enableGlutenCostEvaluator: Boolean = conf.getConf(COST_EVALUATOR_ENABLED)
@@ -488,6 +492,10 @@ object GlutenConfig {
   val SPARK_S3_IAM: String = HADOOP_PREFIX + S3_IAM_ROLE
   val S3_IAM_ROLE_SESSION_NAME = "fs.s3a.iam.role.session.name"
   val SPARK_S3_IAM_SESSION_NAME: String = HADOOP_PREFIX + 
S3_IAM_ROLE_SESSION_NAME
+  val S3_RETRY_MAX_ATTEMPTS = "fs.s3a.retry.limit"
+  val SPARK_S3_RETRY_MAX_ATTEMPTS: String = HADOOP_PREFIX + 
S3_RETRY_MAX_ATTEMPTS
+  val S3_CONNECTION_MAXIMUM = "fs.s3a.connection.maximum"
+  val SPARK_S3_CONNECTION_MAXIMUM: String = HADOOP_PREFIX + 
S3_CONNECTION_MAXIMUM
 
   // Hardware acceleraters backend
   val GLUTEN_SHUFFLE_CODEC_BACKEND = 
"spark.gluten.sql.columnar.shuffle.codecBackend"
@@ -642,6 +650,10 @@ object GlutenConfig {
       SPARK_S3_USE_INSTANCE_CREDENTIALS,
       SPARK_S3_IAM,
       SPARK_S3_IAM_SESSION_NAME,
+      SPARK_S3_RETRY_MAX_ATTEMPTS,
+      SPARK_S3_CONNECTION_MAXIMUM,
+      AWS_S3_CONNECT_TIMEOUT.key,
+      AWS_S3_RETRY_MODE.key,
       AWS_SDK_LOG_LEVEL.key,
       // gcs config
       SPARK_GCS_STORAGE_ROOT_URL,
@@ -693,6 +705,10 @@ object GlutenConfig {
       (SPARK_S3_USE_INSTANCE_CREDENTIALS, "false"),
       (SPARK_S3_IAM, ""),
       (SPARK_S3_IAM_SESSION_NAME, ""),
+      (SPARK_S3_RETRY_MAX_ATTEMPTS, "20"),
+      (SPARK_S3_CONNECTION_MAXIMUM, "15"),
+      (AWS_S3_CONNECT_TIMEOUT.key, AWS_S3_CONNECT_TIMEOUT.defaultValueString),
+      (AWS_S3_RETRY_MODE.key, AWS_S3_RETRY_MODE.defaultValueString),
       (
         COLUMNAR_VELOX_CONNECTOR_IO_THREADS.key,
         conf.getOrElse(GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, "-1")),
@@ -1941,6 +1957,20 @@ object GlutenConfig {
       .stringConf
       .createWithDefault("FATAL")
 
+  val AWS_S3_RETRY_MODE =
+    buildConf("spark.gluten.velox.fs.s3a.retry.mode")
+      .internal()
+      .doc("Retry mode for AWS s3 connection error: legacy, standard and 
adaptive.")
+      .stringConf
+      .createWithDefault("legacy")
+
+  val AWS_S3_CONNECT_TIMEOUT =
+    buildConf("spark.gluten.velox.fs.s3a.connect.timeout")
+      .internal()
+      .doc("Timeout for AWS s3 connection.")
+      .stringConf
+      .createWithDefault("200s")
+
   val VELOX_ORC_SCAN_ENABLED =
     buildStaticConf("spark.gluten.sql.columnar.backend.velox.orc.scan.enabled")
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to