This is an automated email from the ASF dual-hosted git repository.
felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new cf04f0fe3 [GLUTEN-5659][VL] Add more configs for AWS s3 (#5660)
cf04f0fe3 is described below
commit cf04f0fe3e338169492a28a35cb3562d5d29cdaa
Author: Yan Ma <[email protected]>
AuthorDate: Tue Jun 25 10:34:50 2024 +0800
[GLUTEN-5659][VL] Add more configs for AWS s3 (#5660)
Add more configs for AWS s3
spark.gluten.velox.fs.s3a.retry.mode
spark.gluten.velox.fs.s3a.connect.timeout
spark.hadoop.fs.s3a.retry.limit
spark.hadoop.fs.s3a.connection.maximum
---
cpp/velox/utils/ConfigExtractor.cc | 23 +++++++++++++++++
docs/Configuration.md | 2 ++
.../scala/org/apache/gluten/GlutenConfig.scala | 30 ++++++++++++++++++++++
3 files changed, 55 insertions(+)
diff --git a/cpp/velox/utils/ConfigExtractor.cc
b/cpp/velox/utils/ConfigExtractor.cc
index a71f14322..816166351 100644
--- a/cpp/velox/utils/ConfigExtractor.cc
+++ b/cpp/velox/utils/ConfigExtractor.cc
@@ -34,6 +34,13 @@ const bool kVeloxFileHandleCacheEnabledDefault = false;
// Log granularity of AWS C++ SDK
const std::string kVeloxAwsSdkLogLevel = "spark.gluten.velox.awsSdkLogLevel";
const std::string kVeloxAwsSdkLogLevelDefault = "FATAL";
+// Retry mode for AWS s3
+const std::string kVeloxS3RetryMode = "spark.gluten.velox.fs.s3a.retry.mode";
+const std::string kVeloxS3RetryModeDefault = "legacy";
+// Connection timeout for AWS s3
+const std::string kVeloxS3ConnectTimeout =
"spark.gluten.velox.fs.s3a.connect.timeout";
+// Using default fs.s3a.connection.timeout value in hadoop
+const std::string kVeloxS3ConnectTimeoutDefault = "200s";
} // namespace
namespace gluten {
@@ -64,6 +71,10 @@ std::shared_ptr<facebook::velox::core::MemConfig>
getHiveConfig(std::shared_ptr<
bool useInstanceCredentials =
conf->get<bool>("spark.hadoop.fs.s3a.use.instance.credentials", false);
std::string iamRole = conf->get<std::string>("spark.hadoop.fs.s3a.iam.role",
"");
std::string iamRoleSessionName =
conf->get<std::string>("spark.hadoop.fs.s3a.iam.role.session.name", "");
+ std::string retryMaxAttempts =
conf->get<std::string>("spark.hadoop.fs.s3a.retry.limit", "20");
+ std::string retryMode = conf->get<std::string>(kVeloxS3RetryMode,
kVeloxS3RetryModeDefault);
+ std::string maxConnections =
conf->get<std::string>("spark.hadoop.fs.s3a.connection.maximum", "15");
+ std::string connectTimeout = conf->get<std::string>(kVeloxS3ConnectTimeout,
kVeloxS3ConnectTimeoutDefault);
std::string awsSdkLogLevel = conf->get<std::string>(kVeloxAwsSdkLogLevel,
kVeloxAwsSdkLogLevelDefault);
@@ -79,6 +90,14 @@ std::shared_ptr<facebook::velox::core::MemConfig>
getHiveConfig(std::shared_ptr<
if (envAwsEndpoint != nullptr) {
awsEndpoint = std::string(envAwsEndpoint);
}
+ const char* envRetryMaxAttempts = std::getenv("AWS_MAX_ATTEMPTS");
+ if (envRetryMaxAttempts != nullptr) {
+ retryMaxAttempts = std::string(envRetryMaxAttempts);
+ }
+ const char* envRetryMode = std::getenv("AWS_RETRY_MODE");
+ if (envRetryMode != nullptr) {
+ retryMode = std::string(envRetryMode);
+ }
if (useInstanceCredentials) {
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3UseInstanceCredentials]
= "true";
@@ -98,6 +117,10 @@ std::shared_ptr<facebook::velox::core::MemConfig>
getHiveConfig(std::shared_ptr<
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3SSLEnabled] =
sslEnabled ? "true" : "false";
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3PathStyleAccess] =
pathStyleAccess ? "true" : "false";
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3LogLevel] =
awsSdkLogLevel;
+ hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3MaxAttempts] =
retryMaxAttempts;
+ hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3RetryMode] =
retryMode;
+ hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3MaxConnections]
= maxConnections;
+ hiveConfMap[facebook::velox::connector::hive::HiveConfig::kS3ConnectTimeout]
= connectTimeout;
#endif
#ifdef ENABLE_GCS
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 089675286..2c2bd4de1 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -89,6 +89,8 @@ The following configurations are related to Velox settings.
| spark.gluten.sql.columnar.backend.velox.maxCoalescedBytes | Set
the max coalesced bytes for velox file scan.
|
|
| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | Set
prefetch cache min pct for velox file scan.
|
|
| spark.gluten.velox.awsSdkLogLevel | Log
granularity of AWS C++ SDK in velox.
| FATAL
|
+| spark.gluten.velox.fs.s3a.retry.mode | Retry
mode for AWS s3 connection error, can be "legacy", "standard" and "adaptive".
| legacy
|
+| spark.gluten.velox.fs.s3a.connect.timeout |
Timeout for AWS s3 connection.
| 1s
|
| spark.gluten.sql.columnar.backend.velox.orc.scan.enabled |
Enable velox orc scan. If disabled, vanilla spark orc scan will be used.
| true
|
| spark.gluten.sql.complexType.scan.fallback.enabled | Force
fallback for complex type scan, including struct, map, array.
| true
|
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 4b4e29e7d..cc2d6ac5f 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -436,6 +436,10 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def awsSdkLogLevel: String = conf.getConf(AWS_SDK_LOG_LEVEL)
+ def awsS3RetryMode: String = conf.getConf(AWS_S3_RETRY_MODE)
+
+ def awsConnectionTimeout: String = conf.getConf(AWS_S3_CONNECT_TIMEOUT)
+
def enableCastAvgAggregateFunction: Boolean =
conf.getConf(COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED)
def enableGlutenCostEvaluator: Boolean = conf.getConf(COST_EVALUATOR_ENABLED)
@@ -488,6 +492,10 @@ object GlutenConfig {
val SPARK_S3_IAM: String = HADOOP_PREFIX + S3_IAM_ROLE
val S3_IAM_ROLE_SESSION_NAME = "fs.s3a.iam.role.session.name"
val SPARK_S3_IAM_SESSION_NAME: String = HADOOP_PREFIX +
S3_IAM_ROLE_SESSION_NAME
+ val S3_RETRY_MAX_ATTEMPTS = "fs.s3a.retry.limit"
+ val SPARK_S3_RETRY_MAX_ATTEMPTS: String = HADOOP_PREFIX +
S3_RETRY_MAX_ATTEMPTS
+ val S3_CONNECTION_MAXIMUM = "fs.s3a.connection.maximum"
+ val SPARK_S3_CONNECTION_MAXIMUM: String = HADOOP_PREFIX +
S3_CONNECTION_MAXIMUM
// Hardware acceleraters backend
val GLUTEN_SHUFFLE_CODEC_BACKEND =
"spark.gluten.sql.columnar.shuffle.codecBackend"
@@ -642,6 +650,10 @@ object GlutenConfig {
SPARK_S3_USE_INSTANCE_CREDENTIALS,
SPARK_S3_IAM,
SPARK_S3_IAM_SESSION_NAME,
+ SPARK_S3_RETRY_MAX_ATTEMPTS,
+ SPARK_S3_CONNECTION_MAXIMUM,
+ AWS_S3_CONNECT_TIMEOUT.key,
+ AWS_S3_RETRY_MODE.key,
AWS_SDK_LOG_LEVEL.key,
// gcs config
SPARK_GCS_STORAGE_ROOT_URL,
@@ -693,6 +705,10 @@ object GlutenConfig {
(SPARK_S3_USE_INSTANCE_CREDENTIALS, "false"),
(SPARK_S3_IAM, ""),
(SPARK_S3_IAM_SESSION_NAME, ""),
+ (SPARK_S3_RETRY_MAX_ATTEMPTS, "20"),
+ (SPARK_S3_CONNECTION_MAXIMUM, "15"),
+ (AWS_S3_CONNECT_TIMEOUT.key, AWS_S3_CONNECT_TIMEOUT.defaultValueString),
+ (AWS_S3_RETRY_MODE.key, AWS_S3_RETRY_MODE.defaultValueString),
(
COLUMNAR_VELOX_CONNECTOR_IO_THREADS.key,
conf.getOrElse(GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, "-1")),
@@ -1941,6 +1957,20 @@ object GlutenConfig {
.stringConf
.createWithDefault("FATAL")
+ val AWS_S3_RETRY_MODE =
+ buildConf("spark.gluten.velox.fs.s3a.retry.mode")
+ .internal()
+ .doc("Retry mode for AWS s3 connection error: legacy, standard and
adaptive.")
+ .stringConf
+ .createWithDefault("legacy")
+
+ val AWS_S3_CONNECT_TIMEOUT =
+ buildConf("spark.gluten.velox.fs.s3a.connect.timeout")
+ .internal()
+ .doc("Timeout for AWS s3 connection.")
+ .stringConf
+ .createWithDefault("200s")
+
val VELOX_ORC_SCAN_ENABLED =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.orc.scan.enabled")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]