westonpace commented on a change in pull request #9995:
URL: https://github.com/apache/arrow/pull/9995#discussion_r611712317
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1332,4 +1332,49 @@ Result<Iterator<T>> MakeReadaheadIterator(Iterator<T>
it, int readahead_queue_si
return MakeGeneratorIterator(std::move(owned_bg_generator));
}
+/// \brief Make a generator that returns a single pre-generated future
+template <typename T>
+std::function<Future<T>()> MakeSingleFutureGenerator(Future<T> future) {
Review comment:
The Rx name for this is `of` but that's pretty peculiar and wouldn't fit
here. Also, we haven't been following their naming scheme too closely anyways
since their names tend to be pretty unintuitive. I think this name is pretty
clear. So this is just an FYI.
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1332,4 +1332,49 @@ Result<Iterator<T>> MakeReadaheadIterator(Iterator<T>
it, int readahead_queue_si
return MakeGeneratorIterator(std::move(owned_bg_generator));
}
+/// \brief Make a generator that returns a single pre-generated future
Review comment:
Can you add a note that this generator is async-reentrant. I hope no
one would ever intentionally add readahead to this but I could see a situation
where a generator is maybe created from a known (cached) value and maybe has to
be fetched from a source that could use readahead. Also, it just helps with
the consistency of the `MakeXyz` functions.
##########
File path: cpp/src/arrow/filesystem/filesystem.h
##########
@@ -141,6 +142,19 @@ struct ARROW_EXPORT FileLocator {
std::string path;
};
+using FileInfoVector = std::vector<FileInfo>;
+using FileInfoGenerator = std::function<Future<FileInfoVector>()>;
+
+} // namespace fs
+
+template <>
+struct IterationTraits<fs::FileInfoVector> {
Review comment:
Should we just bite the bullet and add `struct
IterationTraits<std::vector<T>>`?
##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1762,6 +1851,50 @@ Result<std::vector<FileInfo>>
S3FileSystem::GetFileInfo(const FileSelector& sele
return results;
}
+FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector&
select) {
+ auto maybe_base_path = S3Path::FromString(select.base_dir);
+ if (!maybe_base_path.ok()) {
+ return MakeFailingGenerator<FileInfoVector>(maybe_base_path.status());
+ }
+ auto base_path = *std::move(maybe_base_path);
+
+ if (base_path.empty()) {
+ // List all buckets, then possibly recurse
+ PushGenerator<AsyncGenerator<FileInfoVector>> gen;
+ auto producer = gen.producer();
+
+ auto fut = impl_->ListBucketsAsync(io_context());
+ auto impl = impl_->shared_from_this();
+ fut.AddCallback(
+ [producer, select, impl](const Result<std::vector<std::string>>& res)
mutable {
+ if (!res.ok()) {
+ producer.Push(res.status());
+ producer.Close();
+ return;
+ }
+ FileInfoVector buckets;
+ for (const auto& bucket : *res) {
+ buckets.push_back(FileInfo{bucket, FileType::Directory});
+ }
+ // Generate all bucket infos
+
producer.Push(MakeVectorGenerator(std::vector<FileInfoVector>{buckets}));
+ if (select.recursive) {
+ // Generate recursive walk for each bucket in turn
+ for (const auto& bucket : buckets) {
+ producer.Push(impl->WalkAsync(select, bucket.path(), ""));
+ }
+ }
+ producer.Close();
+ });
+
+ return MakeConcatenatedGenerator(
Review comment:
Why concatenated instead of merged? Concatenated will run one bucket at
a time while merged can run multiple.
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1332,4 +1332,49 @@ Result<Iterator<T>> MakeReadaheadIterator(Iterator<T>
it, int readahead_queue_si
return MakeGeneratorIterator(std::move(owned_bg_generator));
}
+/// \brief Make a generator that returns a single pre-generated future
+template <typename T>
+std::function<Future<T>()> MakeSingleFutureGenerator(Future<T> future) {
+ assert(future.is_valid());
+ struct Generator {
+ Future<T> future;
+
+ Future<T> operator()() {
+ if (future.is_valid()) {
+ return std::move(future);
+ } else {
+ return IterationEnd<T>();
Review comment:
I've been using `AsyncGeneratorEnd`. I could probably consider merging
`AsyncGeneratorEnd` into `IterationEnd`. It ties back to the more general
question about whether we want to support the implicit conversion from `T` to
`Future<T>` via `Future<T>::MakeFinished`.
I think when I asked earlier there was some preference to keep explicitly
using `Future<T>::MakeFinished` when possible to be explicit but with the
addition of the conversion constructor I've noticed a few spots where that has
been missed.
##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1762,6 +1851,50 @@ Result<std::vector<FileInfo>>
S3FileSystem::GetFileInfo(const FileSelector& sele
return results;
}
+FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector&
select) {
+ auto maybe_base_path = S3Path::FromString(select.base_dir);
+ if (!maybe_base_path.ok()) {
+ return MakeFailingGenerator<FileInfoVector>(maybe_base_path.status());
+ }
+ auto base_path = *std::move(maybe_base_path);
+
+ if (base_path.empty()) {
+ // List all buckets, then possibly recurse
+ PushGenerator<AsyncGenerator<FileInfoVector>> gen;
+ auto producer = gen.producer();
Review comment:
Maybe make a future JIRA to abandon the walk if all references to `gen`
are lost?
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1332,4 +1332,49 @@ Result<Iterator<T>> MakeReadaheadIterator(Iterator<T>
it, int readahead_queue_si
return MakeGeneratorIterator(std::move(owned_bg_generator));
}
+/// \brief Make a generator that returns a single pre-generated future
+template <typename T>
+std::function<Future<T>()> MakeSingleFutureGenerator(Future<T> future) {
+ assert(future.is_valid());
+ struct Generator {
+ Future<T> future;
+
+ Future<T> operator()() {
+ if (future.is_valid()) {
+ return std::move(future);
+ } else {
+ return IterationEnd<T>();
+ }
+ }
+ };
+ return Generator{std::move(future)};
+}
+
+/// \brief Make a generator that always fails with a given error
+template <typename T>
+AsyncGenerator<T> MakeFailingGenerator(Status st) {
Review comment:
Another FYI, this would be called `throw` in Rx. We could name it
`raise` but `raise` doesn't really combine with `Make` and I think consistency
with `Make` would be nicer here. No change recommended.
Also, I've been tempted to create ARROW-12341 for a bit now. It will use
this.
##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1762,6 +1851,50 @@ Result<std::vector<FileInfo>>
S3FileSystem::GetFileInfo(const FileSelector& sele
return results;
}
+FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector&
select) {
+ auto maybe_base_path = S3Path::FromString(select.base_dir);
+ if (!maybe_base_path.ok()) {
+ return MakeFailingGenerator<FileInfoVector>(maybe_base_path.status());
+ }
+ auto base_path = *std::move(maybe_base_path);
+
+ if (base_path.empty()) {
+ // List all buckets, then possibly recurse
+ PushGenerator<AsyncGenerator<FileInfoVector>> gen;
+ auto producer = gen.producer();
+
+ auto fut = impl_->ListBucketsAsync(io_context());
+ auto impl = impl_->shared_from_this();
+ fut.AddCallback(
+ [producer, select, impl](const Result<std::vector<std::string>>& res)
mutable {
+ if (!res.ok()) {
+ producer.Push(res.status());
Review comment:
Does `PushGenerator` not auto-close on a failing status? If not can we
add a `Close(Status)`?
##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1126,6 +1127,11 @@ struct TreeWalker : public
std::enable_shared_from_this<TreeWalker> {
template <typename... Args>
static Status Walk(Args&&... args) {
+ return WalkAsync(std::forward<Args>(args)...).status();
Review comment:
This pattern got me in trouble with nested deadlock. Imagine the thread
pool has 8 threads and we are calling `Walk` 10 times. One potential
workaround is to feed this through the new `RunSerially` which will avoid
nested deadlock but could impact performance (although should be ok here).
Actually, I believe you are ok here. All tasks spawned by this call will
run on the I/O executor and do not transfer back.
##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1762,6 +1851,50 @@ Result<std::vector<FileInfo>>
S3FileSystem::GetFileInfo(const FileSelector& sele
return results;
}
+FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector&
select) {
+ auto maybe_base_path = S3Path::FromString(select.base_dir);
+ if (!maybe_base_path.ok()) {
+ return MakeFailingGenerator<FileInfoVector>(maybe_base_path.status());
+ }
+ auto base_path = *std::move(maybe_base_path);
+
+ if (base_path.empty()) {
+ // List all buckets, then possibly recurse
+ PushGenerator<AsyncGenerator<FileInfoVector>> gen;
+ auto producer = gen.producer();
+
+ auto fut = impl_->ListBucketsAsync(io_context());
+ auto impl = impl_->shared_from_this();
+ fut.AddCallback(
+ [producer, select, impl](const Result<std::vector<std::string>>& res)
mutable {
+ if (!res.ok()) {
+ producer.Push(res.status());
+ producer.Close();
+ return;
+ }
+ FileInfoVector buckets;
+ for (const auto& bucket : *res) {
+ buckets.push_back(FileInfo{bucket, FileType::Directory});
+ }
+ // Generate all bucket infos
+
producer.Push(MakeVectorGenerator(std::vector<FileInfoVector>{buckets}));
Review comment:
Why not use `MakeSingleFutureGenerator`?
##########
File path: cpp/src/arrow/filesystem/test_util.cc
##########
@@ -111,6 +113,12 @@ void SortInfos(std::vector<FileInfo>* infos) {
std::sort(infos->begin(), infos->end(), FileInfo::ByPath{});
}
+void AssertFileInfoGenerator(FileInfoGenerator gen, FileInfoVector* out_infos)
{
Review comment:
Naming nit: Perhaps don't name this `Assert` since it isn't actually
asserting the contents but just collecting them? I see
ASSERT_FINISHES_OK_AND_ASSIGN as an analogue of ASSERT_OK_AND_ASSIGN which is
more of a utility assertion and less of a test expectation.
##########
File path: cpp/src/arrow/filesystem/filesystem.cc
##########
@@ -159,10 +161,11 @@ Future<std::vector<FileInfo>>
FileSystem::GetFileInfoAsync(
[paths](std::shared_ptr<FileSystem> self) { return
self->GetFileInfo(paths); });
}
-Future<std::vector<FileInfo>> FileSystem::GetFileInfoAsync(const FileSelector&
select) {
- return FileSystemDefer(
+FileInfoGenerator FileSystem::GetFileInfoGenerator(const FileSelector& select)
{
Review comment:
I'm not sure if this is right. I would have expected
`AsyncGenerator<FileInfo>` and not `AsyncGenerator<FileInfoVector>`. The
latter is still going to block until all results are available. Is this meant
to be a step towards the former?
Nevermind. I looked at the S3 generator now and I think I see. You are
generating multiple batches of `FileInfo`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]