This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 6100576e1a GH-34118: [C++][Python] Make # of S3 event loop threads
configurable (#34134)
6100576e1a is described below
commit 6100576e1a4defd2a9bc6b07ecb850ce1d3a96a5
Author: Weston Pace <[email protected]>
AuthorDate: Wed Feb 22 08:48:54 2023 -0800
GH-34118: [C++][Python] Make # of S3 event loop threads configurable
(#34134)
This also changes the default # of threads to 1 per advice in the linked PR.
* Closes: #34118
Authored-by: Weston Pace <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
---
cpp/src/arrow/CMakeLists.txt | 10 ++++++++
cpp/src/arrow/filesystem/s3fs.cc | 19 +++++++++++++++
cpp/src/arrow/filesystem/s3fs.h | 7 ++++++
.../arrow/filesystem/try_compile/check_s3fs_crt.cc | 28 ++++++++++++++++++++++
python/pyarrow/_s3fs.pyx | 12 +++++++++-
python/pyarrow/fs.py | 6 ++---
python/pyarrow/includes/libarrow_fs.pxd | 2 ++
7 files changed, 80 insertions(+), 4 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 5431953d07..bcb8db401e 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -504,6 +504,16 @@ if(ARROW_FILESYSTEM)
list(APPEND ARROW_SRCS filesystem/hdfs.cc)
endif()
if(ARROW_S3)
+ try_compile(S3_HAS_CRT ${CMAKE_CURRENT_BINARY_DIR}/try_compile
+ SOURCES
"${CMAKE_CURRENT_SOURCE_DIR}/filesystem/try_compile/check_s3fs_crt.cc"
+ CMAKE_FLAGS
"-DINCLUDE_DIRECTORIES=${CURRENT_INCLUDE_DIRECTORIES}"
+ LINK_LIBRARIES ${AWSSDK_LINK_LIBRARIES} CXX_STANDARD 17)
+
+ if(S3_HAS_CRT)
+ message(STATUS "AWS SDK is new enough to have CRT support")
+ add_definitions(-DARROW_S3_HAS_CRT)
+ endif()
+
list(APPEND ARROW_SRCS filesystem/s3fs.cc)
set_source_files_properties(filesystem/s3fs.cc
PROPERTIES SKIP_PRECOMPILE_HEADERS ON
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 16ffe25266..3b5846f575 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -51,6 +51,11 @@
#include <aws/core/utils/logging/ConsoleLogSystem.h>
#include <aws/core/utils/stream/PreallocatedStreamBuf.h>
#include <aws/core/utils/xml/XmlSerializer.h>
+#ifdef ARROW_S3_HAS_CRT
+#include <aws/crt/io/Bootstrap.h>
+#include <aws/crt/io/EventLoopGroup.h>
+#include <aws/crt/io/HostResolver.h>
+#endif
#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/AbortMultipartUploadRequest.h>
@@ -2591,6 +2596,20 @@ Status DoInitializeS3(const S3GlobalOptions& options) {
#undef LOG_LEVEL_CASE
+#ifdef ARROW_S3_HAS_CRT
+ aws_options.ioOptions.clientBootstrap_create_fn =
+ [ev_threads = options.num_event_loop_threads]() {
+ //
https://github.com/aws/aws-sdk-cpp/blob/1.11.15/src/aws-cpp-sdk-core/source/Aws.cpp#L65
+ Aws::Crt::Io::EventLoopGroup event_loop_group(ev_threads);
+ Aws::Crt::Io::DefaultHostResolver default_host_resolver(
+ event_loop_group, /*maxHosts=*/8, /*maxTTL=*/30);
+ auto client_bootstrap = Aws::MakeShared<Aws::Crt::Io::ClientBootstrap>(
+ "Aws_Init_Cleanup", event_loop_group, default_host_resolver);
+ client_bootstrap->EnableBlockingShutdown();
+ return client_bootstrap;
+ };
+#endif
+
aws_options.loggingOptions.logLevel = aws_log_level;
// By default the AWS SDK logs to files, log to console instead
aws_options.loggingOptions.logger_create_fn = [] {
diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h
index ba642ebe61..2be16f869d 100644
--- a/cpp/src/arrow/filesystem/s3fs.h
+++ b/cpp/src/arrow/filesystem/s3fs.h
@@ -324,6 +324,13 @@ enum class S3LogLevel : int8_t { Off, Fatal, Error, Warn,
Info, Debug, Trace };
struct ARROW_EXPORT S3GlobalOptions {
S3LogLevel log_level;
+ /// The number of threads to configure when creating AWS' I/O event loop
+ ///
+ /// Defaults to 1 as recommended by AWS' doc when the # of connections is
+ /// expected to be, at most, in the hundreds
+ ///
+ /// For more details see Aws::Crt::Io::EventLoopGroup
+ int num_event_loop_threads = 1;
};
/// Initialize the S3 APIs. It is required to call this function at least once
diff --git a/cpp/src/arrow/filesystem/try_compile/check_s3fs_crt.cc
b/cpp/src/arrow/filesystem/try_compile/check_s3fs_crt.cc
new file mode 100644
index 0000000000..83240effdb
--- /dev/null
+++ b/cpp/src/arrow/filesystem/try_compile/check_s3fs_crt.cc
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Dummy file for checking if IOOptions exists in SDKOptions.
+// This was introduced when the AWS SDK switched to using the
+// CRT for I/O.
+
+#include <aws/core/Aws.h>
+
+int main() {
+ Aws::SDKOptions aws_options;
+ auto io_options = aws_options.ioOptions;
+ return 0;
+}
diff --git a/python/pyarrow/_s3fs.pyx b/python/pyarrow/_s3fs.pyx
index c07e78858d..4d35ba92bf 100644
--- a/python/pyarrow/_s3fs.pyx
+++ b/python/pyarrow/_s3fs.pyx
@@ -36,7 +36,7 @@ cpdef enum S3LogLevel:
Trace = <int8_t> CS3LogLevel_Trace
-def initialize_s3(S3LogLevel log_level=S3LogLevel.Fatal):
+def initialize_s3(S3LogLevel log_level=S3LogLevel.Fatal, int
num_event_loop_threads=1):
"""
Initialize S3 support
@@ -44,6 +44,8 @@ def initialize_s3(S3LogLevel log_level=S3LogLevel.Fatal):
----------
log_level : S3LogLevel
level of logging
+ num_event_loop_threads : int, default 1
+ how many threads to use for the AWS SDK's I/O event loop
Examples
--------
@@ -51,9 +53,17 @@ def initialize_s3(S3LogLevel log_level=S3LogLevel.Fatal):
"""
cdef CS3GlobalOptions options
options.log_level = <CS3LogLevel> log_level
+ options.num_event_loop_threads = num_event_loop_threads
check_status(CInitializeS3(options))
+def ensure_s3_initialized():
+ """
+ Initialize S3 (with default options) if not already initialized
+ """
+ check_status(CEnsureS3Initialized())
+
+
def finalize_s3():
check_status(CFinalizeS3())
diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py
index 21db243528..e8e53225fb 100644
--- a/python/pyarrow/fs.py
+++ b/python/pyarrow/fs.py
@@ -53,12 +53,12 @@ except ImportError:
try:
from pyarrow._s3fs import ( # noqa
AwsDefaultS3RetryStrategy, AwsStandardS3RetryStrategy,
- S3FileSystem, S3LogLevel, S3RetryStrategy, finalize_s3,
- initialize_s3, resolve_s3_region)
+ S3FileSystem, S3LogLevel, S3RetryStrategy, ensure_s3_initialized,
+ finalize_s3, initialize_s3, resolve_s3_region)
except ImportError:
_not_imported.append("S3FileSystem")
else:
- initialize_s3()
+ ensure_s3_initialized()
def __getattr__(name):
diff --git a/python/pyarrow/includes/libarrow_fs.pxd
b/python/pyarrow/includes/libarrow_fs.pxd
index bf22ead83e..bf9fd8aea6 100644
--- a/python/pyarrow/includes/libarrow_fs.pxd
+++ b/python/pyarrow/includes/libarrow_fs.pxd
@@ -129,6 +129,7 @@ cdef extern from "arrow/filesystem/api.h" namespace
"arrow::fs" nogil:
cdef struct CS3GlobalOptions "arrow::fs::S3GlobalOptions":
CS3LogLevel log_level
+ int num_event_loop_threads
cdef cppclass CS3ProxyOptions "arrow::fs::S3ProxyOptions":
c_string scheme
@@ -208,6 +209,7 @@ cdef extern from "arrow/filesystem/api.h" namespace
"arrow::fs" nogil:
cdef CStatus CInitializeS3 "arrow::fs::InitializeS3"(
const CS3GlobalOptions& options)
+ cdef CStatus CEnsureS3Initialized "arrow::fs::EnsureS3Initialized"()
cdef CStatus CFinalizeS3 "arrow::fs::FinalizeS3"()
cdef CResult[c_string] ResolveS3BucketRegion(const c_string& bucket)