This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch maint-6.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit f72e93a82920aa7b6800a298951868704de2b5bb
Author: Antoine Pitrou <[email protected]>
AuthorDate: Thu Nov 4 16:14:55 2021 +0100

    ARROW-14523: [C++] Fix potential data loss in S3 multipart upload
    
    Work around a critical bug in the AWS SDK for C++ that fails to detect 
errors returned by CompleteMultipartUpload in the body of a 200 OK response:
    https://github.com/aws/aws-sdk-cpp/issues/658
    
    Closes #11594 from pitrou/ARROW-14523-s3-cmu-error-fix
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/filesystem/s3fs.cc | 114 ++++++++++++++++++++++++++++++++++++---
 1 file changed, 106 insertions(+), 8 deletions(-)

diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 314abdf..49766d1 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -19,10 +19,13 @@
 
 #include <algorithm>
 #include <atomic>
+#include <chrono>
 #include <condition_variable>
 #include <functional>
+#include <memory>
 #include <mutex>
 #include <sstream>
+#include <thread>
 #include <unordered_map>
 #include <utility>
 
@@ -41,10 +44,12 @@
 #include <aws/core/auth/AWSCredentials.h>
 #include <aws/core/auth/AWSCredentialsProviderChain.h>
 #include <aws/core/auth/STSCredentialsProvider.h>
+#include <aws/core/client/DefaultRetryStrategy.h>
 #include <aws/core/client/RetryStrategy.h>
 #include <aws/core/http/HttpResponse.h>
 #include <aws/core/utils/logging/ConsoleLogSystem.h>
 #include <aws/core/utils/stream/PreallocatedStreamBuf.h>
+#include <aws/core/utils/xml/XmlSerializer.h>
 #include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
 #include <aws/s3/S3Client.h>
 #include <aws/s3/model/AbortMultipartUploadRequest.h>
@@ -563,6 +568,98 @@ class S3Client : public Aws::S3::S3Client {
     req.SetBucket(ToAwsString(bucket));
     return GetBucketRegion(req);
   }
+
+  S3Model::CompleteMultipartUploadOutcome 
CompleteMultipartUploadWithErrorFixup(
+      S3Model::CompleteMultipartUploadRequest&& request) const {
+    // CompletedMultipartUpload can return a 200 OK response with an error
+    // encoded in the response body, in which case we should either retry
+    // or propagate the error to the user (see
+    // 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html).
+    //
+    // Unfortunately the AWS SDK doesn't detect such situations but lets them
+    // return successfully (see https://github.com/aws/aws-sdk-cpp/issues/658).
+    //
+    // We work around the issue by registering a DataReceivedEventHandler
+    // which parses the XML response for embedded errors.
+
+    util::optional<AWSError<Aws::Client::CoreErrors>> aws_error;
+
+    auto handler = [&](const Aws::Http::HttpRequest* http_req,
+                       Aws::Http::HttpResponse* http_resp,
+                       long long) {  // NOLINT runtime/int
+      auto& stream = http_resp->GetResponseBody();
+      const auto pos = stream.tellg();
+      const auto doc = 
Aws::Utils::Xml::XmlDocument::CreateFromXmlStream(stream);
+      // Rewind stream for later
+      stream.clear();
+      stream.seekg(pos);
+
+      if (doc.WasParseSuccessful()) {
+        auto root = doc.GetRootElement();
+        if (!root.IsNull()) {
+          // Detect something that looks like an abnormal 
CompletedMultipartUpload
+          // response.
+          if (root.GetName() != "CompleteMultipartUploadResult" ||
+              !root.FirstChild("Error").IsNull() || 
!root.FirstChild("Errors").IsNull()) {
+            // Make sure the error marshaller doesn't see a 200 OK
+            http_resp->SetResponseCode(
+                Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR);
+            aws_error = GetErrorMarshaller()->Marshall(*http_resp);
+            // Rewind stream for later
+            stream.clear();
+            stream.seekg(pos);
+          }
+        }
+      }
+    };
+
+    request.SetDataReceivedEventHandler(std::move(handler));
+
+    // We don't have access to the configured AWS retry strategy
+    // (m_retryStrategy is a private member of AwsClient), so don't use that.
+    std::unique_ptr<Aws::Client::RetryStrategy> retry_strategy;
+    if (s3_retry_strategy_) {
+      retry_strategy.reset(new WrappedRetryStrategy(s3_retry_strategy_));
+    } else {
+      // Note that DefaultRetryStrategy, unlike StandardRetryStrategy,
+      // has empty definitions for RequestBookkeeping() and GetSendToken(),
+      // which simplifies the code below.
+      retry_strategy.reset(new Aws::Client::DefaultRetryStrategy());
+    }
+
+    for (int32_t retries = 0;; retries++) {
+      aws_error.reset();
+      auto outcome = 
Aws::S3::S3Client::S3Client::CompleteMultipartUpload(request);
+      if (!outcome.IsSuccess()) {
+        // Error returned in HTTP headers (or client failure)
+        return outcome;
+      }
+      if (!aws_error.has_value()) {
+        // Genuinely successful outcome
+        return outcome;
+      }
+
+      const bool should_retry = retry_strategy->ShouldRetry(*aws_error, 
retries);
+
+      ARROW_LOG(WARNING)
+          << "CompletedMultipartUpload got error embedded in a 200 OK 
response: "
+          << aws_error->GetExceptionName() << " (\"" << aws_error->GetMessage()
+          << "\"), retry = " << should_retry;
+
+      if (!should_retry) {
+        break;
+      }
+      const auto delay = std::chrono::milliseconds(
+          retry_strategy->CalculateDelayBeforeNextRetry(*aws_error, retries));
+      std::this_thread::sleep_for(delay);
+    }
+
+    DCHECK(aws_error.has_value());
+    auto s3_error = AWSError<S3Errors>(std::move(aws_error).value());
+    return S3Model::CompleteMultipartUploadOutcome(std::move(s3_error));
+  }
+
+  std::shared_ptr<S3RetryStrategy> s3_retry_strategy_;
 };
 
 // In AWS SDK < 1.8, Aws::Client::ClientConfiguration::followRedirects is a 
bool.
@@ -617,7 +714,7 @@ class ClientBuilder {
 
     const bool use_virtual_addressing = options_.endpoint_override.empty();
 
-    /// Set proxy options if provided
+    // Set proxy options if provided
     if (!options_.proxy_options.scheme.empty()) {
       if (options_.proxy_options.scheme == "http") {
         client_config_.proxyScheme = Aws::Http::Scheme::HTTP;
@@ -641,10 +738,12 @@ class ClientBuilder {
       client_config_.proxyPassword = 
ToAwsString(options_.proxy_options.password);
     }
 
-    return std::make_shared<S3Client>(
+    auto client = std::make_shared<S3Client>(
         credentials_provider_, client_config_,
         Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
         use_virtual_addressing);
+    client->s3_retry_strategy_ = options_.retry_strategy;
+    return client;
   }
 
   const S3Options& options() const { return options_; }
@@ -1021,9 +1120,8 @@ class ObjectOutputStream final : public io::OutputStream {
   struct UploadState;
 
  public:
-  ObjectOutputStream(std::shared_ptr<Aws::S3::S3Client> client,
-                     const io::IOContext& io_context, const S3Path& path,
-                     const S3Options& options,
+  ObjectOutputStream(std::shared_ptr<S3Client> client, const io::IOContext& 
io_context,
+                     const S3Path& path, const S3Options& options,
                      const std::shared_ptr<const KeyValueMetadata>& metadata)
       : client_(std::move(client)),
         io_context_(io_context),
@@ -1118,7 +1216,7 @@ class ObjectOutputStream final : public io::OutputStream {
     req.SetUploadId(upload_id_);
     req.SetMultipartUpload(std::move(completed_upload));
 
-    auto outcome = client_->CompleteMultipartUpload(req);
+    auto outcome = 
client_->CompleteMultipartUploadWithErrorFixup(std::move(req));
     if (!outcome.IsSuccess()) {
       return ErrorToStatus(
           std::forward_as_tuple("When completing multiple part upload for key 
'",
@@ -1314,7 +1412,7 @@ class ObjectOutputStream final : public io::OutputStream {
   }
 
  protected:
-  std::shared_ptr<Aws::S3::S3Client> client_;
+  std::shared_ptr<S3Client> client_;
   const io::IOContext io_context_;
   const S3Path path_;
   const std::shared_ptr<const KeyValueMetadata> metadata_;
@@ -1503,7 +1601,7 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
  public:
   ClientBuilder builder_;
   io::IOContext io_context_;
-  std::shared_ptr<Aws::S3::S3Client> client_;
+  std::shared_ptr<S3Client> client_;
   util::optional<S3Backend> backend_;
 
   const int32_t kListObjectsMaxKeys = 1000;

Reply via email to