This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6100576e1a GH-34118: [C++][Python] Make # of S3 event loop threads 
configurable (#34134)
6100576e1a is described below

commit 6100576e1a4defd2a9bc6b07ecb850ce1d3a96a5
Author: Weston Pace <[email protected]>
AuthorDate: Wed Feb 22 08:48:54 2023 -0800

    GH-34118: [C++][Python] Make # of S3 event loop threads configurable 
(#34134)
    
    This also changes the default # of threads to 1 per advice in the linked PR.
    * Closes: #34118
    
    Authored-by: Weston Pace <[email protected]>
    Signed-off-by: Weston Pace <[email protected]>
---
 cpp/src/arrow/CMakeLists.txt                       | 10 ++++++++
 cpp/src/arrow/filesystem/s3fs.cc                   | 19 +++++++++++++++
 cpp/src/arrow/filesystem/s3fs.h                    |  7 ++++++
 .../arrow/filesystem/try_compile/check_s3fs_crt.cc | 28 ++++++++++++++++++++++
 python/pyarrow/_s3fs.pyx                           | 12 +++++++++-
 python/pyarrow/fs.py                               |  6 ++---
 python/pyarrow/includes/libarrow_fs.pxd            |  2 ++
 7 files changed, 80 insertions(+), 4 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 5431953d07..bcb8db401e 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -504,6 +504,16 @@ if(ARROW_FILESYSTEM)
     list(APPEND ARROW_SRCS filesystem/hdfs.cc)
   endif()
   if(ARROW_S3)
+    try_compile(S3_HAS_CRT ${CMAKE_CURRENT_BINARY_DIR}/try_compile
+                SOURCES 
"${CMAKE_CURRENT_SOURCE_DIR}/filesystem/try_compile/check_s3fs_crt.cc"
+                CMAKE_FLAGS 
"-DINCLUDE_DIRECTORIES=${CURRENT_INCLUDE_DIRECTORIES}"
+                LINK_LIBRARIES ${AWSSDK_LINK_LIBRARIES} CXX_STANDARD 17)
+
+    if(S3_HAS_CRT)
+      message(STATUS "AWS SDK is new enough to have CRT support")
+      add_definitions(-DARROW_S3_HAS_CRT)
+    endif()
+
     list(APPEND ARROW_SRCS filesystem/s3fs.cc)
     set_source_files_properties(filesystem/s3fs.cc
                                 PROPERTIES SKIP_PRECOMPILE_HEADERS ON
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 16ffe25266..3b5846f575 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -51,6 +51,11 @@
 #include <aws/core/utils/logging/ConsoleLogSystem.h>
 #include <aws/core/utils/stream/PreallocatedStreamBuf.h>
 #include <aws/core/utils/xml/XmlSerializer.h>
+#ifdef ARROW_S3_HAS_CRT
+#include <aws/crt/io/Bootstrap.h>
+#include <aws/crt/io/EventLoopGroup.h>
+#include <aws/crt/io/HostResolver.h>
+#endif
 #include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
 #include <aws/s3/S3Client.h>
 #include <aws/s3/model/AbortMultipartUploadRequest.h>
@@ -2591,6 +2596,20 @@ Status DoInitializeS3(const S3GlobalOptions& options) {
 
 #undef LOG_LEVEL_CASE
 
+#ifdef ARROW_S3_HAS_CRT
+  aws_options.ioOptions.clientBootstrap_create_fn =
+      [ev_threads = options.num_event_loop_threads]() {
+        // 
https://github.com/aws/aws-sdk-cpp/blob/1.11.15/src/aws-cpp-sdk-core/source/Aws.cpp#L65
+        Aws::Crt::Io::EventLoopGroup event_loop_group(ev_threads);
+        Aws::Crt::Io::DefaultHostResolver default_host_resolver(
+            event_loop_group, /*maxHosts=*/8, /*maxTTL=*/30);
+        auto client_bootstrap = Aws::MakeShared<Aws::Crt::Io::ClientBootstrap>(
+            "Aws_Init_Cleanup", event_loop_group, default_host_resolver);
+        client_bootstrap->EnableBlockingShutdown();
+        return client_bootstrap;
+      };
+#endif
+
   aws_options.loggingOptions.logLevel = aws_log_level;
   // By default the AWS SDK logs to files, log to console instead
   aws_options.loggingOptions.logger_create_fn = [] {
diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h
index ba642ebe61..2be16f869d 100644
--- a/cpp/src/arrow/filesystem/s3fs.h
+++ b/cpp/src/arrow/filesystem/s3fs.h
@@ -324,6 +324,13 @@ enum class S3LogLevel : int8_t { Off, Fatal, Error, Warn, 
Info, Debug, Trace };
 
 struct ARROW_EXPORT S3GlobalOptions {
   S3LogLevel log_level;
+  /// The number of threads to configure when creating AWS' I/O event loop
+  ///
+  /// Defaults to 1 as recommended by AWS' doc when the # of connections is
+  /// expected to be, at most, in the hundreds
+  ///
+  /// For more details see Aws::Crt::Io::EventLoopGroup
+  int num_event_loop_threads = 1;
 };
 
 /// Initialize the S3 APIs.  It is required to call this function at least once
diff --git a/cpp/src/arrow/filesystem/try_compile/check_s3fs_crt.cc 
b/cpp/src/arrow/filesystem/try_compile/check_s3fs_crt.cc
new file mode 100644
index 0000000000..83240effdb
--- /dev/null
+++ b/cpp/src/arrow/filesystem/try_compile/check_s3fs_crt.cc
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Dummy file for checking if IOOptions exists in SDKOptions.
+// This was introduced when the AWS SDK switched to using the
+// CRT for I/O.
+
+#include <aws/core/Aws.h>
+
+int main() {
+  Aws::SDKOptions aws_options;
+  auto io_options = aws_options.ioOptions;
+  return 0;
+}
diff --git a/python/pyarrow/_s3fs.pyx b/python/pyarrow/_s3fs.pyx
index c07e78858d..4d35ba92bf 100644
--- a/python/pyarrow/_s3fs.pyx
+++ b/python/pyarrow/_s3fs.pyx
@@ -36,7 +36,7 @@ cpdef enum S3LogLevel:
     Trace = <int8_t> CS3LogLevel_Trace
 
 
-def initialize_s3(S3LogLevel log_level=S3LogLevel.Fatal):
+def initialize_s3(S3LogLevel log_level=S3LogLevel.Fatal, int 
num_event_loop_threads=1):
     """
     Initialize S3 support
 
@@ -44,6 +44,8 @@ def initialize_s3(S3LogLevel log_level=S3LogLevel.Fatal):
     ----------
     log_level : S3LogLevel
         level of logging
+    num_event_loop_threads : int, default 1
+        how many threads to use for the AWS SDK's I/O event loop
 
     Examples
     --------
@@ -51,9 +53,17 @@ def initialize_s3(S3LogLevel log_level=S3LogLevel.Fatal):
     """
     cdef CS3GlobalOptions options
     options.log_level = <CS3LogLevel> log_level
+    options.num_event_loop_threads = num_event_loop_threads
     check_status(CInitializeS3(options))
 
 
+def ensure_s3_initialized():
+    """
+    Initialize S3 (with default options) if not already initialized
+    """
+    check_status(CEnsureS3Initialized())
+
+
 def finalize_s3():
     check_status(CFinalizeS3())
 
diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py
index 21db243528..e8e53225fb 100644
--- a/python/pyarrow/fs.py
+++ b/python/pyarrow/fs.py
@@ -53,12 +53,12 @@ except ImportError:
 try:
     from pyarrow._s3fs import (  # noqa
         AwsDefaultS3RetryStrategy, AwsStandardS3RetryStrategy,
-        S3FileSystem, S3LogLevel, S3RetryStrategy, finalize_s3,
-        initialize_s3, resolve_s3_region)
+        S3FileSystem, S3LogLevel, S3RetryStrategy, ensure_s3_initialized,
+        finalize_s3, initialize_s3, resolve_s3_region)
 except ImportError:
     _not_imported.append("S3FileSystem")
 else:
-    initialize_s3()
+    ensure_s3_initialized()
 
 
 def __getattr__(name):
diff --git a/python/pyarrow/includes/libarrow_fs.pxd 
b/python/pyarrow/includes/libarrow_fs.pxd
index bf22ead83e..bf9fd8aea6 100644
--- a/python/pyarrow/includes/libarrow_fs.pxd
+++ b/python/pyarrow/includes/libarrow_fs.pxd
@@ -129,6 +129,7 @@ cdef extern from "arrow/filesystem/api.h" namespace 
"arrow::fs" nogil:
 
     cdef struct CS3GlobalOptions "arrow::fs::S3GlobalOptions":
         CS3LogLevel log_level
+        int num_event_loop_threads
 
     cdef cppclass CS3ProxyOptions "arrow::fs::S3ProxyOptions":
         c_string scheme
@@ -208,6 +209,7 @@ cdef extern from "arrow/filesystem/api.h" namespace 
"arrow::fs" nogil:
 
     cdef CStatus CInitializeS3 "arrow::fs::InitializeS3"(
         const CS3GlobalOptions& options)
+    cdef CStatus CEnsureS3Initialized "arrow::fs::EnsureS3Initialized"()
     cdef CStatus CFinalizeS3 "arrow::fs::FinalizeS3"()
 
     cdef CResult[c_string] ResolveS3BucketRegion(const c_string& bucket)

Reply via email to