This is an automated email from the ASF dual-hosted git repository. kszucs pushed a commit to branch maint-6.0.x in repository https://gitbox.apache.org/repos/asf/arrow.git
commit f72e93a82920aa7b6800a298951868704de2b5bb Author: Antoine Pitrou <[email protected]> AuthorDate: Thu Nov 4 16:14:55 2021 +0100 ARROW-14523: [C++] Fix potential data loss in S3 multipart upload Work around a critical bug in the AWS SDK for C++ that fails to detect errors returned by CompleteMultipartUpload in the body of a 200 OK response: https://github.com/aws/aws-sdk-cpp/issues/658 Closes #11594 from pitrou/ARROW-14523-s3-cmu-error-fix Authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]> --- cpp/src/arrow/filesystem/s3fs.cc | 114 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 106 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 314abdf..49766d1 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -19,10 +19,13 @@ #include <algorithm> #include <atomic> +#include <chrono> #include <condition_variable> #include <functional> +#include <memory> #include <mutex> #include <sstream> +#include <thread> #include <unordered_map> #include <utility> @@ -41,10 +44,12 @@ #include <aws/core/auth/AWSCredentials.h> #include <aws/core/auth/AWSCredentialsProviderChain.h> #include <aws/core/auth/STSCredentialsProvider.h> +#include <aws/core/client/DefaultRetryStrategy.h> #include <aws/core/client/RetryStrategy.h> #include <aws/core/http/HttpResponse.h> #include <aws/core/utils/logging/ConsoleLogSystem.h> #include <aws/core/utils/stream/PreallocatedStreamBuf.h> +#include <aws/core/utils/xml/XmlSerializer.h> #include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h> #include <aws/s3/S3Client.h> #include <aws/s3/model/AbortMultipartUploadRequest.h> @@ -563,6 +568,98 @@ class S3Client : public Aws::S3::S3Client { req.SetBucket(ToAwsString(bucket)); return GetBucketRegion(req); } + + S3Model::CompleteMultipartUploadOutcome CompleteMultipartUploadWithErrorFixup( + S3Model::CompleteMultipartUploadRequest&& request) const { + // CompletedMultipartUpload can return a 200 OK response with an error + // encoded in the response body, in which case we should either retry + // or propagate the error to the user (see + // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html). + // + // Unfortunately the AWS SDK doesn't detect such situations but lets them + // return successfully (see https://github.com/aws/aws-sdk-cpp/issues/658). + // + // We work around the issue by registering a DataReceivedEventHandler + // which parses the XML response for embedded errors. + + util::optional<AWSError<Aws::Client::CoreErrors>> aws_error; + + auto handler = [&](const Aws::Http::HttpRequest* http_req, + Aws::Http::HttpResponse* http_resp, + long long) { // NOLINT runtime/int + auto& stream = http_resp->GetResponseBody(); + const auto pos = stream.tellg(); + const auto doc = Aws::Utils::Xml::XmlDocument::CreateFromXmlStream(stream); + // Rewind stream for later + stream.clear(); + stream.seekg(pos); + + if (doc.WasParseSuccessful()) { + auto root = doc.GetRootElement(); + if (!root.IsNull()) { + // Detect something that looks like an abnormal CompletedMultipartUpload + // response. + if (root.GetName() != "CompleteMultipartUploadResult" || + !root.FirstChild("Error").IsNull() || !root.FirstChild("Errors").IsNull()) { + // Make sure the error marshaller doesn't see a 200 OK + http_resp->SetResponseCode( + Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR); + aws_error = GetErrorMarshaller()->Marshall(*http_resp); + // Rewind stream for later + stream.clear(); + stream.seekg(pos); + } + } + } + }; + + request.SetDataReceivedEventHandler(std::move(handler)); + + // We don't have access to the configured AWS retry strategy + // (m_retryStrategy is a private member of AwsClient), so don't use that. + std::unique_ptr<Aws::Client::RetryStrategy> retry_strategy; + if (s3_retry_strategy_) { + retry_strategy.reset(new WrappedRetryStrategy(s3_retry_strategy_)); + } else { + // Note that DefaultRetryStrategy, unlike StandardRetryStrategy, + // has empty definitions for RequestBookkeeping() and GetSendToken(), + // which simplifies the code below. + retry_strategy.reset(new Aws::Client::DefaultRetryStrategy()); + } + + for (int32_t retries = 0;; retries++) { + aws_error.reset(); + auto outcome = Aws::S3::S3Client::S3Client::CompleteMultipartUpload(request); + if (!outcome.IsSuccess()) { + // Error returned in HTTP headers (or client failure) + return outcome; + } + if (!aws_error.has_value()) { + // Genuinely successful outcome + return outcome; + } + + const bool should_retry = retry_strategy->ShouldRetry(*aws_error, retries); + + ARROW_LOG(WARNING) + << "CompletedMultipartUpload got error embedded in a 200 OK response: " + << aws_error->GetExceptionName() << " (\"" << aws_error->GetMessage() + << "\"), retry = " << should_retry; + + if (!should_retry) { + break; + } + const auto delay = std::chrono::milliseconds( + retry_strategy->CalculateDelayBeforeNextRetry(*aws_error, retries)); + std::this_thread::sleep_for(delay); + } + + DCHECK(aws_error.has_value()); + auto s3_error = AWSError<S3Errors>(std::move(aws_error).value()); + return S3Model::CompleteMultipartUploadOutcome(std::move(s3_error)); + } + + std::shared_ptr<S3RetryStrategy> s3_retry_strategy_; }; // In AWS SDK < 1.8, Aws::Client::ClientConfiguration::followRedirects is a bool. @@ -617,7 +714,7 @@ class ClientBuilder { const bool use_virtual_addressing = options_.endpoint_override.empty(); - /// Set proxy options if provided + // Set proxy options if provided if (!options_.proxy_options.scheme.empty()) { if (options_.proxy_options.scheme == "http") { client_config_.proxyScheme = Aws::Http::Scheme::HTTP; @@ -641,10 +738,12 @@ class ClientBuilder { client_config_.proxyPassword = ToAwsString(options_.proxy_options.password); } - return std::make_shared<S3Client>( + auto client = std::make_shared<S3Client>( credentials_provider_, client_config_, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing); + client->s3_retry_strategy_ = options_.retry_strategy; + return client; } const S3Options& options() const { return options_; } @@ -1021,9 +1120,8 @@ class ObjectOutputStream final : public io::OutputStream { struct UploadState; public: - ObjectOutputStream(std::shared_ptr<Aws::S3::S3Client> client, - const io::IOContext& io_context, const S3Path& path, - const S3Options& options, + ObjectOutputStream(std::shared_ptr<S3Client> client, const io::IOContext& io_context, + const S3Path& path, const S3Options& options, const std::shared_ptr<const KeyValueMetadata>& metadata) : client_(std::move(client)), io_context_(io_context), @@ -1118,7 +1216,7 @@ class ObjectOutputStream final : public io::OutputStream { req.SetUploadId(upload_id_); req.SetMultipartUpload(std::move(completed_upload)); - auto outcome = client_->CompleteMultipartUpload(req); + auto outcome = client_->CompleteMultipartUploadWithErrorFixup(std::move(req)); if (!outcome.IsSuccess()) { return ErrorToStatus( std::forward_as_tuple("When completing multiple part upload for key '", @@ -1314,7 +1412,7 @@ class ObjectOutputStream final : public io::OutputStream { } protected: - std::shared_ptr<Aws::S3::S3Client> client_; + std::shared_ptr<S3Client> client_; const io::IOContext io_context_; const S3Path path_; const std::shared_ptr<const KeyValueMetadata> metadata_; @@ -1503,7 +1601,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp public: ClientBuilder builder_; io::IOContext io_context_; - std::shared_ptr<Aws::S3::S3Client> client_; + std::shared_ptr<S3Client> client_; util::optional<S3Backend> backend_; const int32_t kListObjectsMaxKeys = 1000;
