westonpace commented on code in PR #35440:
URL: https://github.com/apache/arrow/pull/35440#discussion_r1243015692
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1870,195 +1731,301 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
"ListObjectsV2", outcome.GetError());
}
- Status CheckNestingDepth(int32_t nesting_depth) {
- if (nesting_depth >= kMaxNestingDepth) {
- return Status::IOError("S3 filesystem tree exceeds maximum nesting depth
(",
- kMaxNestingDepth, ")");
+ static FileInfo MakeDirectoryInfo(std::string dirname) {
+ FileInfo dir;
+ dir.set_type(FileType::Directory);
+ dir.set_path(dirname);
+ return dir;
+ }
+
+ static std::vector<FileInfo> MakeDirectoryInfos(std::vector<std::string>
dirnames) {
+ std::vector<FileInfo> dir_infos;
+ for (auto& dirname : dirnames) {
+ dir_infos.push_back(MakeDirectoryInfo(std::move(dirname)));
}
- return Status::OK();
+ return dir_infos;
}
- // A helper class for Walk and WalkAsync
- struct FileInfoCollector {
- FileInfoCollector(std::string bucket, std::string key, const FileSelector&
select)
- : bucket(std::move(bucket)),
- key(std::move(key)),
- allow_not_found(select.allow_not_found) {}
+ using FileInfoSink = PushGenerator<std::vector<FileInfo>>::Producer;
+
+ struct FileListerState {
+ FileInfoSink files_queue;
+ bool allow_not_found;
+ int max_recursion;
+ bool include_virtual;
+ S3Model::ListObjectsV2Request req;
+ io::IOContext io_context;
+ std::shared_ptr<Aws::S3::S3Client> client;
+ bool close_sink;
+ bool no_files_means_not_found;
+ std::unordered_set<std::string> directories;
+ bool empty = true;
+
+ FileListerState(PushGenerator<std::vector<FileInfo>>::Producer files_queue,
+ FileSelector select, const std::string& bucket,
+ const std::string& key, bool include_virtual,
+ io::IOContext io_context,
std::shared_ptr<Aws::S3::S3Client> client,
+ bool close_sink)
+ : files_queue(std::move(files_queue)),
+ allow_not_found(select.allow_not_found),
+ max_recursion(select.max_recursion),
+ include_virtual(include_virtual),
+ io_context(io_context),
+ client(std::move(client)),
+ close_sink(close_sink),
+ no_files_means_not_found(!select.allow_not_found && !key.empty()) {
+ req.SetBucket(bucket);
+ req.SetMaxKeys(kListObjectsMaxKeys);
+ if (!key.empty()) {
+ req.SetPrefix(key + kSep);
+ }
+ if (!select.recursive) {
+ req.SetDelimiter(Aws::String() + kSep);
+ }
+ }
+
+ // The FileListerState is kept alive by the various FileListerTasks. Once
all the
+ // tasks are finished this will be destroyed and we can run some cleanup
+ ~FileListerState() {
+ // * If the bucket doesn't exist we will have already gotten an error
from the
+ // ListObjectsV2 call
+ // * If the key is empty, and the bucket exists, then there is
+ // no way we can hit "not found"
+ // * If they key is not empty, then it's possible
+ // that the file itself didn't exist and there
+ // were not files under it. In that case we will hit this if
statement and
+ // should treat this as a "not found" case.
+ if (empty && no_files_means_not_found) {
+ files_queue.Push(PathNotFound(req.GetBucket(), req.GetPrefix()));
+ }
+ if (close_sink) {
+ files_queue.Close();
Review Comment:
Not necessarily the top-level task but it means that we've finished all
tasks. This happens as the final task is finishing. However, I agree this
makes more sense in the task continuation and I've moved it there (this code is
no longer a destructor which was kind of confusing).
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -1870,195 +1731,301 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
"ListObjectsV2", outcome.GetError());
}
- Status CheckNestingDepth(int32_t nesting_depth) {
- if (nesting_depth >= kMaxNestingDepth) {
- return Status::IOError("S3 filesystem tree exceeds maximum nesting depth
(",
- kMaxNestingDepth, ")");
+ static FileInfo MakeDirectoryInfo(std::string dirname) {
+ FileInfo dir;
+ dir.set_type(FileType::Directory);
+ dir.set_path(dirname);
+ return dir;
+ }
+
+ static std::vector<FileInfo> MakeDirectoryInfos(std::vector<std::string>
dirnames) {
+ std::vector<FileInfo> dir_infos;
+ for (auto& dirname : dirnames) {
+ dir_infos.push_back(MakeDirectoryInfo(std::move(dirname)));
}
- return Status::OK();
+ return dir_infos;
}
- // A helper class for Walk and WalkAsync
- struct FileInfoCollector {
- FileInfoCollector(std::string bucket, std::string key, const FileSelector&
select)
- : bucket(std::move(bucket)),
- key(std::move(key)),
- allow_not_found(select.allow_not_found) {}
+ using FileInfoSink = PushGenerator<std::vector<FileInfo>>::Producer;
+
+ struct FileListerState {
+ FileInfoSink files_queue;
+ bool allow_not_found;
+ int max_recursion;
+ bool include_virtual;
+ S3Model::ListObjectsV2Request req;
+ io::IOContext io_context;
+ std::shared_ptr<Aws::S3::S3Client> client;
+ bool close_sink;
+ bool no_files_means_not_found;
+ std::unordered_set<std::string> directories;
+ bool empty = true;
+
+ FileListerState(PushGenerator<std::vector<FileInfo>>::Producer files_queue,
+ FileSelector select, const std::string& bucket,
+ const std::string& key, bool include_virtual,
+ io::IOContext io_context,
std::shared_ptr<Aws::S3::S3Client> client,
+ bool close_sink)
+ : files_queue(std::move(files_queue)),
+ allow_not_found(select.allow_not_found),
+ max_recursion(select.max_recursion),
+ include_virtual(include_virtual),
+ io_context(io_context),
+ client(std::move(client)),
+ close_sink(close_sink),
+ no_files_means_not_found(!select.allow_not_found && !key.empty()) {
+ req.SetBucket(bucket);
+ req.SetMaxKeys(kListObjectsMaxKeys);
+ if (!key.empty()) {
+ req.SetPrefix(key + kSep);
+ }
+ if (!select.recursive) {
+ req.SetDelimiter(Aws::String() + kSep);
+ }
+ }
+
+ // The FileListerState is kept alive by the various FileListerTasks. Once
all the
+ // tasks are finished this will be destroyed and we can run some cleanup
+ ~FileListerState() {
+ // * If the bucket doesn't exist we will have already gotten an error
from the
+ // ListObjectsV2 call
+ // * If the key is empty, and the bucket exists, then there is
+ // no way we can hit "not found"
+ // * If they key is not empty, then it's possible
+ // that the file itself didn't exist and there
+ // were not files under it. In that case we will hit this if
statement and
+ // should treat this as a "not found" case.
+ if (empty && no_files_means_not_found) {
+ files_queue.Push(PathNotFound(req.GetBucket(), req.GetPrefix()));
+ }
+ if (close_sink) {
+ files_queue.Close();
+ }
+ }
- Status Collect(const std::string& prefix, const
S3Model::ListObjectsV2Result& result,
- std::vector<FileInfo>* out) {
- // Walk "directories"
+ std::vector<std::string> GetNewDirectories(const std::string_view& path) {
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]