This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 27d21d4 ARROW-11907: [C++] Use our own executor in S3FileSystem
27d21d4 is described below
commit 27d21d4d3b18280805ed7118c089d930fd018f11
Author: Antoine Pitrou <[email protected]>
AuthorDate: Wed Mar 17 11:26:24 2021 +0100
ARROW-11907: [C++] Use our own executor in S3FileSystem
The async APIs in the AWS SDK merely spawn a new thread each time
they are called.
By using our own executor, we schedule requests on our IO thread pool,
and we allow for potential cancellation.
Closes #9678 from pitrou/ARROW-11907-s3fs-executor
Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/filesystem/s3fs.cc | 211 +++++++++++++++++++---------------
cpp/src/arrow/filesystem/s3fs_test.cc | 5 +-
cpp/src/arrow/util/future.cc | 4 +
cpp/src/arrow/util/future.h | 4 +
cpp/src/arrow/util/future_test.cc | 16 ++-
5 files changed, 145 insertions(+), 95 deletions(-)
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 6b2b708..1940f4d 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -79,6 +79,7 @@
#include "arrow/util/future.h"
#include "arrow/util/logging.h"
#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
#include "arrow/util/windows_fixup.h"
namespace arrow {
@@ -488,7 +489,7 @@ class ClientBuilder {
Aws::Client::ClientConfiguration* mutable_config() { return &client_config_;
}
- Result<std::unique_ptr<S3Client>> BuildClient() {
+ Result<std::shared_ptr<S3Client>> BuildClient() {
credentials_provider_ = options_.credentials_provider;
if (!options_.region.empty()) {
client_config_.region = ToAwsString(options_.region);
@@ -510,10 +511,10 @@ class ClientBuilder {
}
const bool use_virtual_addressing = options_.endpoint_override.empty();
- return std::unique_ptr<S3Client>(
- new S3Client(credentials_provider_, client_config_,
- Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
- use_virtual_addressing));
+ return std::make_shared<S3Client>(
+ credentials_provider_, client_config_,
+ Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
+ use_virtual_addressing);
}
const S3Options& options() const { return options_; }
@@ -585,7 +586,7 @@ class RegionResolver {
}
ClientBuilder builder_;
- std::unique_ptr<S3Client> client_;
+ std::shared_ptr<S3Client> client_;
std::mutex cache_mutex_;
// XXX Should cache size be bounded? It must be quite unusual to query
millions
@@ -630,11 +631,10 @@ Result<S3Model::GetObjectResult>
GetObjectRange(Aws::S3::S3Client* client,
// A RandomAccessFile that reads from a S3 object
class ObjectInputFile final : public io::RandomAccessFile {
public:
- ObjectInputFile(std::shared_ptr<FileSystem> fs, Aws::S3::S3Client* client,
+ ObjectInputFile(std::shared_ptr<Aws::S3::S3Client> client,
const io::IOContext& io_context, const S3Path& path,
int64_t size = kNoSize)
- : fs_(std::move(fs)),
- client_(client),
+ : client_(std::move(client)),
io_context_(io_context),
path_(path),
content_length_(size) {}
@@ -687,7 +687,6 @@ class ObjectInputFile final : public io::RandomAccessFile {
// RandomAccessFile APIs
Status Close() override {
- fs_.reset();
client_ = nullptr;
closed_ = true;
return Status::OK();
@@ -724,7 +723,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
// Read the desired range of bytes
ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result,
- GetObjectRange(client_, path_, position, nbytes,
out));
+ GetObjectRange(client_.get(), path_, position,
nbytes, out));
auto& stream = result.GetBody();
stream.ignore(nbytes);
@@ -763,8 +762,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
}
protected:
- std::shared_ptr<FileSystem> fs_; // Owner of S3Client
- Aws::S3::S3Client* client_;
+ std::shared_ptr<Aws::S3::S3Client> client_;
const io::IOContext io_context_;
S3Path path_;
@@ -785,11 +783,10 @@ class ObjectOutputStream final : public io::OutputStream {
struct UploadState;
public:
- ObjectOutputStream(std::shared_ptr<FileSystem> fs, Aws::S3::S3Client* client,
+ ObjectOutputStream(std::shared_ptr<Aws::S3::S3Client> client,
const io::IOContext& io_context, const S3Path& path,
const S3Options& options)
- : fs_(std::move(fs)),
- client_(client),
+ : client_(std::move(client)),
io_context_(io_context),
path_(path),
options_(options) {}
@@ -837,7 +834,6 @@ class ObjectOutputStream final : public io::OutputStream {
outcome.GetError());
}
current_part_.reset();
- fs_.reset();
client_ = nullptr;
closed_ = true;
return Status::OK();
@@ -884,7 +880,6 @@ class ObjectOutputStream final : public io::OutputStream {
outcome.GetError());
}
- fs_.reset();
client_ = nullptr;
closed_ = true;
return Status::OK();
@@ -981,10 +976,6 @@ class ObjectOutputStream final : public io::OutputStream {
AddCompletedPart(upload_state_, part_number_, outcome.GetResult());
}
} else {
- std::unique_lock<std::mutex> lock(upload_state_->mutex);
- auto state = upload_state_; // Keep upload state alive in closure
- auto part_number = part_number_;
-
// If the data isn't owned, make an immutable copy for the lifetime of
the closure
if (owned_buffer == nullptr) {
ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes,
io_context_.pool()));
@@ -996,24 +987,23 @@ class ObjectOutputStream final : public io::OutputStream {
req.SetBody(
std::make_shared<StringViewStream>(owned_buffer->data(),
owned_buffer->size()));
- auto handler =
- [state, owned_buffer, part_number](
- const Aws::S3::S3Client*, const S3Model::UploadPartRequest& req,
- const S3Model::UploadPartOutcome& outcome,
- const std::shared_ptr<const Aws::Client::AsyncCallerContext>&)
-> void {
- std::unique_lock<std::mutex> lock(state->mutex);
- if (!outcome.IsSuccess()) {
- state->status &= UploadPartError(req, outcome);
- } else {
- AddCompletedPart(state, part_number, outcome.GetResult());
- }
- // Notify completion, regardless of success / error status
- if (--state->parts_in_progress == 0) {
- state->cv.notify_all();
- }
+ {
+ std::unique_lock<std::mutex> lock(upload_state_->mutex);
+ ++upload_state_->parts_in_progress;
+ }
+ auto client = client_;
+ ARROW_ASSIGN_OR_RAISE(auto fut, io_context_.executor()->Submit(
+ io_context_.stop_token(), [client,
req]() {
+ return client->UploadPart(req);
+ }));
+ // The closure keeps the buffer and the upload state alive
+ auto state = upload_state_;
+ auto part_number = part_number_;
+ auto handler = [owned_buffer, state, part_number,
+ req](const Result<S3Model::UploadPartOutcome>& result)
-> void {
+ HandleUploadOutcome(state, part_number, req, result);
};
- ++upload_state_->parts_in_progress;
- client_->UploadPartAsync(req, handler);
+ fut.AddCallback(std::move(handler));
}
++part_number_;
@@ -1035,6 +1025,26 @@ class ObjectOutputStream final : public io::OutputStream
{
return Status::OK();
}
+ static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
+ int part_number, const
S3Model::UploadPartRequest& req,
+ const Result<S3Model::UploadPartOutcome>&
result) {
+ std::unique_lock<std::mutex> lock(state->mutex);
+ if (!result.ok()) {
+ state->status &= result.status();
+ } else {
+ const auto& outcome = *result;
+ if (!outcome.IsSuccess()) {
+ state->status &= UploadPartError(req, outcome);
+ } else {
+ AddCompletedPart(state, part_number, outcome.GetResult());
+ }
+ }
+ // Notify completion
+ if (--state->parts_in_progress == 0) {
+ state->cv.notify_all();
+ }
+ }
+
static void AddCompletedPart(const std::shared_ptr<UploadState>& state, int
part_number,
const S3Model::UploadPartResult& result) {
S3Model::CompletedPart part;
@@ -1059,8 +1069,7 @@ class ObjectOutputStream final : public io::OutputStream {
}
protected:
- std::shared_ptr<FileSystem> fs_; // Owner of S3Client
- Aws::S3::S3Client* client_;
+ std::shared_ptr<Aws::S3::S3Client> client_;
const io::IOContext io_context_;
S3Path path_;
const S3Options& options_;
@@ -1081,8 +1090,6 @@ class ObjectOutputStream final : public io::OutputStream {
Aws::Vector<S3Model::CompletedPart> completed_parts;
int64_t parts_in_progress = 0;
Status status;
-
- UploadState() : status(Status::OK()) {}
};
std::shared_ptr<UploadState> upload_state_;
};
@@ -1106,7 +1113,8 @@ struct TreeWalker : public
std::enable_shared_from_this<TreeWalker> {
using ErrorHandler = std::function<Status(const AWSError<S3Errors>& error)>;
using RecursionHandler = std::function<Result<bool>(int32_t nesting_depth)>;
- Aws::S3::S3Client* client_;
+ std::shared_ptr<Aws::S3::S3Client> client_;
+ io::IOContext io_context_;
const std::string bucket_;
const std::string base_dir_;
const int32_t max_keys_;
@@ -1120,10 +1128,12 @@ struct TreeWalker : public
std::enable_shared_from_this<TreeWalker> {
return self->DoWalk();
}
- TreeWalker(Aws::S3::S3Client* client, std::string bucket, std::string
base_dir,
- int32_t max_keys, ResultHandler result_handler, ErrorHandler
error_handler,
+ TreeWalker(std::shared_ptr<Aws::S3::S3Client> client, io::IOContext
io_context,
+ std::string bucket, std::string base_dir, int32_t max_keys,
+ ResultHandler result_handler, ErrorHandler error_handler,
RecursionHandler recursion_handler)
: client_(std::move(client)),
+ io_context_(io_context),
bucket_(std::move(bucket)),
base_dir_(std::move(base_dir)),
max_keys_(max_keys),
@@ -1159,23 +1169,41 @@ struct TreeWalker : public
std::enable_shared_from_this<TreeWalker> {
int32_t nesting_depth;
S3Model::ListObjectsV2Request req;
- void operator()(const Aws::S3::S3Client*, const
S3Model::ListObjectsV2Request&,
- const S3Model::ListObjectsV2Outcome& outcome,
- const std::shared_ptr<const
Aws::Client::AsyncCallerContext>&) {
+ void operator()(const Result<S3Model::ListObjectsV2Outcome>& result) {
// Serialize calls to operation-specific handlers
std::unique_lock<std::mutex> guard(walker->mutex_);
if (walker->is_finished()) {
// Early exit: avoid executing handlers if DoWalk() returned
return;
}
+ if (!result.ok()) {
+ HandleError(result.status());
+ return;
+ }
+ const auto& outcome = *result;
if (!outcome.IsSuccess()) {
Status st = walker->error_handler_(outcome.GetError());
- walker->ListObjectsFinished(std::move(st));
+ HandleError(std::move(st));
return;
}
HandleResult(outcome.GetResult());
}
+ void SpawnListObjectsV2() {
+ auto walker = this->walker;
+ auto req = this->req;
+ auto maybe_fut = walker->io_context_.executor()->Submit(
+ walker->io_context_.stop_token(),
+ [walker, req]() { return walker->client_->ListObjectsV2(req); });
+ if (!maybe_fut.ok()) {
+ HandleError(maybe_fut.status());
+ return;
+ }
+ maybe_fut->AddCallback(*this);
+ }
+
+ void HandleError(Status status) {
walker->ListObjectsFinished(std::move(status)); }
+
void HandleResult(const S3Model::ListObjectsV2Result& result) {
bool recurse = result.GetCommonPrefixes().size() > 0;
if (recurse) {
@@ -1199,7 +1227,7 @@ struct TreeWalker : public
std::enable_shared_from_this<TreeWalker> {
if (result.GetIsTruncated()) {
DCHECK(!result.GetNextContinuationToken().empty());
req.SetContinuationToken(result.GetNextContinuationToken());
- walker->client_->ListObjectsV2Async(req, *this);
+ SpawnListObjectsV2();
} else {
walker->ListObjectsFinished(Status::OK());
}
@@ -1212,7 +1240,7 @@ struct TreeWalker : public
std::enable_shared_from_this<TreeWalker> {
}
req.SetDelimiter(Aws::String() + kSep);
req.SetMaxKeys(walker->max_keys_);
- walker->client_->ListObjectsV2Async(req, *this);
+ SpawnListObjectsV2();
}
};
@@ -1241,7 +1269,8 @@ struct TreeWalker : public
std::enable_shared_from_this<TreeWalker> {
class S3FileSystem::Impl {
public:
ClientBuilder builder_;
- std::unique_ptr<Aws::S3::S3Client> client_;
+ io::IOContext io_context_;
+ std::shared_ptr<Aws::S3::S3Client> client_;
util::optional<S3Backend> backend_;
const int32_t kListObjectsMaxKeys = 1000;
@@ -1250,7 +1279,8 @@ class S3FileSystem::Impl {
// Limit recursing depth, since a recursion bomb can be created
const int32_t kMaxNestingDepth = 100;
- explicit Impl(S3Options options) : builder_(std::move(options)) {}
+ explicit Impl(S3Options options, io::IOContext io_context)
+ : builder_(std::move(options)), io_context_(io_context) {}
Status Init() { return builder_.BuildClient().Value(&client_); }
@@ -1442,7 +1472,7 @@ class S3FileSystem::Impl {
return Status::OK();
};
- RETURN_NOT_OK(TreeWalker::Walk(client_.get(), bucket, key,
kListObjectsMaxKeys,
+ RETURN_NOT_OK(TreeWalker::Walk(client_, io_context_, bucket, key,
kListObjectsMaxKeys,
handle_results, handle_error,
handle_recursion));
// If no contents were found, perhaps it's an empty "directory",
@@ -1487,22 +1517,23 @@ class S3FileSystem::Impl {
return true; // Recurse
};
- return TreeWalker::Walk(client_.get(), bucket, key, kListObjectsMaxKeys,
+ return TreeWalker::Walk(client_, io_context_, bucket, key,
kListObjectsMaxKeys,
handle_results, handle_error, handle_recursion);
}
// Delete multiple objects at once
- Status DeleteObjects(const std::string& bucket, const
std::vector<std::string>& keys) {
- struct DeleteHandler {
- Future<> future = Future<>::Make();
-
- // Callback for DeleteObjectsAsync
- void operator()(const Aws::S3::S3Client*, const
S3Model::DeleteObjectsRequest& req,
- const S3Model::DeleteObjectsOutcome& outcome,
- const std::shared_ptr<const
Aws::Client::AsyncCallerContext>&) {
+ Future<> DeleteObjectsAsync(const std::string& bucket,
+ const std::vector<std::string>& keys) {
+ struct DeleteCallback {
+ const std::string bucket;
+
+ Status operator()(const Result<S3Model::DeleteObjectsOutcome>& result) {
+ if (!result.ok()) {
+ return result.status();
+ }
+ const auto& outcome = *result;
if (!outcome.IsSuccess()) {
- future.MarkFinished(ErrorToStatus(outcome.GetError()));
- return;
+ return ErrorToStatus(outcome.GetError());
}
// Also need to check per-key errors, even on successful outcome
// See
@@ -1511,23 +1542,22 @@ class S3FileSystem::Impl {
if (!errors.empty()) {
std::stringstream ss;
ss << "Got the following " << errors.size()
- << " errors when deleting objects in S3 bucket '" <<
req.GetBucket()
- << "':\n";
+ << " errors when deleting objects in S3 bucket '" << bucket <<
"':\n";
for (const auto& error : errors) {
ss << "- key '" << error.GetKey() << "': " << error.GetMessage()
<< "\n";
}
- future.MarkFinished(Status::IOError(ss.str()));
- } else {
- future.MarkFinished();
+ return Status::IOError(ss.str());
}
+ return Status::OK();
}
};
const auto chunk_size = static_cast<size_t>(kMultipleDeleteMaxKeys);
- std::vector<DeleteHandler> delete_handlers;
- std::vector<Future<>*> futures;
- delete_handlers.reserve(keys.size() / chunk_size + 1);
- futures.reserve(delete_handlers.capacity());
+ DeleteCallback delete_cb{bucket};
+ auto client = client_;
+
+ std::vector<Future<>> futures;
+ futures.reserve(keys.size() / chunk_size + 1);
for (size_t start = 0; start < keys.size(); start += chunk_size) {
S3Model::DeleteObjectsRequest req;
@@ -1537,16 +1567,18 @@ class S3FileSystem::Impl {
}
req.SetBucket(ToAwsString(bucket));
req.SetDelete(std::move(del));
- delete_handlers.emplace_back();
- futures.push_back(&delete_handlers.back().future);
- client_->DeleteObjectsAsync(req, delete_handlers.back());
+ ARROW_ASSIGN_OR_RAISE(auto fut, io_context_.executor()->Submit(
+ io_context_.stop_token(), [client,
req]() {
+ return client->DeleteObjects(req);
+ }));
+ futures.push_back(std::move(fut).Then(delete_cb));
}
- WaitForAll(futures);
- for (const auto* fut : futures) {
- RETURN_NOT_OK(fut->status());
- }
- return Status::OK();
+ return AllComplete(futures);
+ }
+
+ Status DeleteObjects(const std::string& bucket, const
std::vector<std::string>& keys) {
+ return DeleteObjectsAsync(bucket, keys).status();
}
Status DeleteDirContents(const std::string& bucket, const std::string& key) {
@@ -1601,8 +1633,7 @@ class S3FileSystem::Impl {
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));
- auto ptr = std::make_shared<ObjectInputFile>(fs->shared_from_this(),
client_.get(),
- fs->io_context(), path);
+ auto ptr = std::make_shared<ObjectInputFile>(client_, fs->io_context(),
path);
RETURN_NOT_OK(ptr->Init());
return ptr;
}
@@ -1619,15 +1650,15 @@ class S3FileSystem::Impl {
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path()));
RETURN_NOT_OK(ValidateFilePath(path));
- auto ptr = std::make_shared<ObjectInputFile>(fs->shared_from_this(),
client_.get(),
- fs->io_context(), path,
info.size());
+ auto ptr =
+ std::make_shared<ObjectInputFile>(client_, fs->io_context(), path,
info.size());
RETURN_NOT_OK(ptr->Init());
return ptr;
}
};
S3FileSystem::S3FileSystem(const S3Options& options, const io::IOContext&
io_context)
- : FileSystem(io_context), impl_(new Impl{options}) {
+ : FileSystem(io_context), impl_(new Impl{options, io_context}) {
default_async_is_sync_ = false;
}
@@ -1908,8 +1939,8 @@ Result<std::shared_ptr<io::OutputStream>>
S3FileSystem::OpenOutputStream(
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));
- auto ptr = std::make_shared<ObjectOutputStream>(
- shared_from_this(), impl_->client_.get(), io_context(), path,
impl_->options());
+ auto ptr = std::make_shared<ObjectOutputStream>(impl_->client_,
io_context(), path,
+ impl_->options());
RETURN_NOT_OK(ptr->Init());
return ptr;
}
diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc
b/cpp/src/arrow/filesystem/s3fs_test.cc
index 99e6b3f..c79d9f7 100644
--- a/cpp/src/arrow/filesystem/s3fs_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_test.cc
@@ -464,10 +464,10 @@ class TestS3FS : public S3TestMixin {
std::weak_ptr<S3FileSystem> weak_fs(fs_);
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile5"));
fs_.reset();
- ASSERT_FALSE(weak_fs.expired());
- ASSERT_OK(stream->Write("some data"));
+ ASSERT_OK(stream->Write("some other data"));
ASSERT_OK(stream->Close());
ASSERT_TRUE(weak_fs.expired());
+ AssertObjectContents(client_.get(), "bucket", "newfile5", "some other
data");
}
void TestOpenOutputStreamAbort() {
@@ -802,7 +802,6 @@ TEST_F(TestS3FS, OpenInputStream) {
std::weak_ptr<S3FileSystem> weak_fs(fs_);
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream("bucket/somefile"));
fs_.reset();
- ASSERT_FALSE(weak_fs.expired());
ASSERT_OK_AND_ASSIGN(buf, stream->Read(10));
AssertBufferEqual(*buf, "some data");
ASSERT_OK(stream->Close());
diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc
index 4b867ea..501d5ca 100644
--- a/cpp/src/arrow/util/future.cc
+++ b/cpp/src/arrow/util/future.cc
@@ -348,6 +348,10 @@ Future<> AllComplete(const std::vector<Future<>>& futures)
{
std::atomic<size_t> n_remaining;
};
+ if (futures.empty()) {
+ return Future<>::MakeFinished();
+ }
+
auto state = std::make_shared<State>(futures.size());
auto out = Future<>::Make();
for (const auto& future : futures) {
diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h
index fc63a42..4ede291 100644
--- a/cpp/src/arrow/util/future.h
+++ b/cpp/src/arrow/util/future.h
@@ -628,6 +628,10 @@ Future<std::vector<Result<T>>> All(std::vector<Future<T>>
futures) {
std::atomic<size_t> n_remaining;
};
+ if (futures.size() == 0) {
+ return {std::vector<Result<T>>{}};
+ }
+
auto state = std::make_shared<State>(std::move(futures));
auto out = Future<std::vector<Result<T>>>::Make();
diff --git a/cpp/src/arrow/util/future_test.cc
b/cpp/src/arrow/util/future_test.cc
index 0bd8380..0fc202e 100644
--- a/cpp/src/arrow/util/future_test.cc
+++ b/cpp/src/arrow/util/future_test.cc
@@ -971,6 +971,13 @@ TEST(FutureCompletionTest, FutureVoid) {
}
}
+TEST(FutureAllTest, Empty) {
+ auto combined = arrow::All(std::vector<Future<int>>{});
+ auto after_assert = combined.Then(
+ [](std::vector<Result<int>> results) { ASSERT_EQ(0, results.size()); });
+ AssertSuccessful(after_assert);
+}
+
TEST(FutureAllTest, Simple) {
auto f1 = Future<int>::Make();
auto f2 = Future<int>::Make();
@@ -1012,11 +1019,16 @@ TEST(FutureAllTest, Failure) {
AssertFinished(after_assert);
}
+TEST(FutureAllCompleteTest, Empty) {
+ Future<> combined = AllComplete(std::vector<Future<>>{});
+ AssertSuccessful(combined);
+}
+
TEST(FutureAllCompleteTest, Simple) {
auto f1 = Future<int>::Make();
auto f2 = Future<int>::Make();
std::vector<Future<>> futures = {Future<>(f1), Future<>(f2)};
- auto combined = arrow::AllComplete(futures);
+ auto combined = AllComplete(futures);
AssertNotFinished(combined);
f2.MarkFinished(2);
AssertNotFinished(combined);
@@ -1029,7 +1041,7 @@ TEST(FutureAllCompleteTest, Failure) {
auto f2 = Future<int>::Make();
auto f3 = Future<int>::Make();
std::vector<Future<>> futures = {Future<>(f1), Future<>(f2), Future<>(f3)};
- auto combined = arrow::AllComplete(futures);
+ auto combined = AllComplete(futures);
AssertNotFinished(combined);
f1.MarkFinished(1);
AssertNotFinished(combined);