westonpace commented on code in PR #35440:
URL: https://github.com/apache/arrow/pull/35440#discussion_r1268939494


##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -2068,197 +1932,306 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
         "ListObjectsV2", outcome.GetError());
   }
 
-  Status CheckNestingDepth(int32_t nesting_depth) {
-    if (nesting_depth >= kMaxNestingDepth) {
-      return Status::IOError("S3 filesystem tree exceeds maximum nesting depth 
(",
-                             kMaxNestingDepth, ")");
+  static FileInfo MakeDirectoryInfo(std::string dirname) {
+    FileInfo dir;
+    dir.set_type(FileType::Directory);
+    dir.set_path(std::move(dirname));
+    return dir;
+  }
+
+  static std::vector<FileInfo> MakeDirectoryInfos(std::vector<std::string> 
dirnames) {
+    std::vector<FileInfo> dir_infos;
+    for (auto& dirname : dirnames) {
+      dir_infos.push_back(MakeDirectoryInfo(std::move(dirname)));
     }
-    return Status::OK();
+    return dir_infos;
   }
 
-  // A helper class for Walk and WalkAsync
-  struct FileInfoCollector {
-    FileInfoCollector(std::string bucket, std::string key, const FileSelector& 
select)
-        : bucket(std::move(bucket)),
-          key(std::move(key)),
-          allow_not_found(select.allow_not_found) {}
+  using FileInfoSink = PushGenerator<std::vector<FileInfo>>::Producer;
 
-    Status Collect(const std::string& prefix, const 
S3Model::ListObjectsV2Result& result,
-                   std::vector<FileInfo>* out) {
-      // Walk "directories"
-      for (const auto& child_prefix : result.GetCommonPrefixes()) {
-        is_empty = false;
-        const auto child_key =
-            
internal::RemoveTrailingSlash(FromAwsString(child_prefix.GetPrefix()));
-        std::stringstream child_path;
-        child_path << bucket << kSep << child_key;
-        FileInfo info;
-        info.set_path(child_path.str());
-        info.set_type(FileType::Directory);
-        out->push_back(std::move(info));
+  struct FileListerState {
+    FileInfoSink files_queue;
+    const bool allow_not_found;
+    const int max_recursion;
+
+    const bool include_implicit_dirs;
+    const io::IOContext io_context;
+    S3FileSystem::Impl* self;
+
+    S3Model::ListObjectsV2Request req;
+    std::unordered_set<std::string> directories;
+    bool empty = true;
+
+    FileListerState(PushGenerator<std::vector<FileInfo>>::Producer files_queue,
+                    FileSelector select, const std::string& bucket,
+                    const std::string& key, bool include_implicit_dirs,
+                    io::IOContext io_context, S3FileSystem::Impl* self)
+        : files_queue(std::move(files_queue)),
+          allow_not_found(select.allow_not_found),
+          max_recursion(select.max_recursion),
+          include_implicit_dirs(include_implicit_dirs),
+          io_context(io_context),
+          self(self) {
+      req.SetBucket(bucket);
+      req.SetMaxKeys(kListObjectsMaxKeys);
+      if (!key.empty()) {
+        req.SetPrefix(key + kSep);
       }
-      // Walk "files"
-      for (const auto& obj : result.GetContents()) {
-        is_empty = false;
-        FileInfo info;
-        const auto child_key = 
internal::RemoveTrailingSlash(FromAwsString(obj.GetKey()));
-        if (child_key == std::string_view(prefix)) {
-          // Amazon can return the "directory" key itself as part of the 
results, skip
-          continue;
-        }
-        std::stringstream child_path;
-        child_path << bucket << kSep << child_key;
-        info.set_path(child_path.str());
-        FileObjectToInfo(obj, &info);
-        out->push_back(std::move(info));
+      if (!select.recursive) {
+        req.SetDelimiter(Aws::String() + kSep);
       }
-      return Status::OK();
     }
 
-    Status Finish(Impl* impl) {
-      // If no contents were found, perhaps it's an empty "directory",
-      // or perhaps it's a nonexistent entry.  Check.
-      if (is_empty && !allow_not_found) {
-        ARROW_ASSIGN_OR_RAISE(bool is_actually_empty,
-                              impl->IsEmptyDirectory(bucket, key));
-        if (!is_actually_empty) {
-          return PathNotFound(bucket, key);
-        }
+    void Finish() {
+      // `empty` means that we didn't get a single file info back from S3.  
This may be
+      // a situation that we should consider as PathNotFound.
+      //
+      // * If the prefix is empty then we were querying the contents of an 
entire bucket
+      //   and this is not a PathNotFound case because if the bucket didn't 
exist then
+      //   we would have received an error and not an empty set of results.
+      //
+      // * If the prefix is not empty then we asked for all files under a 
particular
+      //   directory.  S3 will also return the directory itself, if it exists. 
 So if
+      //   we get zero results then we know that there are no files under the 
directory
+      //   and the directory itself doesn't exist.  This should be considered 
PathNotFound
+      if (empty && !allow_not_found && !req.GetPrefix().empty()) {
+        files_queue.Push(PathNotFound(req.GetBucket(), req.GetPrefix()));
       }
-      return Status::OK();
     }
 
-    std::string bucket;
-    std::string key;
-    bool allow_not_found;
-    bool is_empty = true;
+    // Given a path, iterate through all possible sub-paths and, if we haven't
+    // seen that sub-path before, return it.
+    //
+    // For example, given A/B/C we might return A/B and A if we have not seen
+    // those paths before.  This allows us to consider "implicit" directories 
which
+    // don't exist as objects in S3 but can be inferred.
+    std::vector<std::string> GetNewDirectories(const std::string_view& path) {
+      std::string current(path);
+      std::string base = req.GetBucket();
+      if (!req.GetPrefix().empty()) {
+        base = base + kSep + 
std::string(internal::RemoveTrailingSlash(req.GetPrefix()));
+      }
+      std::vector<std::string> new_directories;
+      while (true) {
+        const std::string parent_dir = 
internal::GetAbstractPathParent(current).first;
+        if (parent_dir.empty()) {
+          break;
+        }
+        current = parent_dir;
+        if (current == base) {
+          break;
+        }
+        if (directories.insert(parent_dir).second) {
+          new_directories.push_back(std::move(parent_dir));
+        }
+      }
+      return new_directories;
+    }
   };
 
-  // Workhorse for GetFileInfo(FileSelector...)
-  Status Walk(const FileSelector& select, const std::string& bucket,
-              const std::string& key, std::vector<FileInfo>* out) {
-    RETURN_NOT_OK(CheckS3Initialized());
+  struct FileListerTask : public util::AsyncTaskScheduler::Task {
+    std::shared_ptr<FileListerState> state;
+    util::AsyncTaskScheduler* scheduler;
 
-    FileInfoCollector collector(bucket, key, select);
+    FileListerTask(std::shared_ptr<FileListerState> state,
+                   util::AsyncTaskScheduler* scheduler)
+        : state(state), scheduler(scheduler) {}
 
-    auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
-      if (select.allow_not_found && IsNotFound(error)) {
-        return Status::OK();
+    std::vector<FileInfo> ToFileInfos(const std::string& bucket,
+                                      const std::string& prefix,
+                                      const S3Model::ListObjectsV2Result& 
result) {
+      std::vector<FileInfo> file_infos;
+      // If this is a non-recursive listing we may see "common prefixes" which 
represent
+      // directories we did not recurse into.  We will add those as 
directories.
+      for (const auto& child_prefix : result.GetCommonPrefixes()) {
+        const auto child_key =
+            
internal::RemoveTrailingSlash(FromAwsString(child_prefix.GetPrefix()));
+        std::stringstream child_path_ss;
+        child_path_ss << bucket << kSep << child_key;
+        FileInfo info;
+        info.set_path(child_path_ss.str());
+        info.set_type(FileType::Directory);
+        file_infos.push_back(std::move(info));
       }
-      return ErrorToStatus(std::forward_as_tuple("When listing objects under 
key '", key,
-                                                 "' in bucket '", bucket, "': 
"),
-                           "ListObjectsV2", error);
-    };
-
-    auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
-      RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
-      return select.recursive && nesting_depth <= select.max_recursion;
-    };
-
-    auto handle_results = [&](const std::string& prefix,
-                              const S3Model::ListObjectsV2Result& result) -> 
Status {
-      return collector.Collect(prefix, result, out);
-    };
-
-    RETURN_NOT_OK(TreeWalker::Walk(holder_, io_context_, bucket, key, 
kListObjectsMaxKeys,
-                                   handle_results, handle_error, 
handle_recursion));
-
-    // If no contents were found, perhaps it's an empty "directory",
-    // or perhaps it's a nonexistent entry.  Check.
-    RETURN_NOT_OK(collector.Finish(this));
-    // Sort results for convenience, since they can come massively out of order
-    std::sort(out->begin(), out->end(), FileInfo::ByPath{});
-    return Status::OK();
-  }
-
-  // Workhorse for GetFileInfoGenerator(FileSelector...)
-  FileInfoGenerator WalkAsync(const FileSelector& select, const std::string& 
bucket,
-                              const std::string& key) {
-    PushGenerator<std::vector<FileInfo>> gen;
-    auto producer = gen.producer();
-    auto collector = std::make_shared<FileInfoCollector>(bucket, key, select);
-    auto self = shared_from_this();
+      // S3 doesn't have any concept of "max depth" and so we emulate it by 
counting the
+      // number of '/' characters.  E.g. if the user is searching 
bucket/subdirA/subdirB
+      // then the starting depth is 2.
+      // A file subdirA/subdirB/somefile will have a child depth of 2 and a 
"depth" of 0.
+      // A file subdirA/subdirB/subdirC/somefile will have a child depth of 3 
and a
+      //   "depth" of 1
+      int base_depth = internal::GetAbstractPathDepth(prefix);
+      for (const auto& obj : result.GetContents()) {
+        if (obj.GetKey() == prefix) {
+          // S3 will return the basedir itself (if it is a file / empty file). 
 We don't
+          // want that.  But this is still considered "finding the basedir" 
and so we mark
+          // it "not empty".
+          state->empty = false;
+          continue;
+        }
+        std::string child_key =
+            
std::string(internal::RemoveTrailingSlash(FromAwsString(obj.GetKey())));
+        bool had_trailing_slash = child_key.size() != obj.GetKey().size();
+        int child_depth = internal::GetAbstractPathDepth(child_key);
+        // Recursion depth is 1 smaller because a path with depth 1 (e.g. foo) 
is
+        // considered to have a "recursion" of 0
+        int recursion_depth = child_depth - base_depth - 1;
+        if (recursion_depth > state->max_recursion) {
+          // If we have A/B/C/D and max_recursion is 2 then we ignore this 
(don't add it
+          // to file_infos) but we still want to potentially add A and A/B as 
directories.
+          // So we "pretend" like we have a file A/B/C for the call to 
GetNewDirectories
+          // below
+          int to_trim = recursion_depth - state->max_recursion - 1;
+          if (to_trim > 0) {
+            child_key = bucket + kSep +
+                        internal::SliceAbstractPath(child_key, 0, child_depth 
- to_trim);
+          } else {
+            child_key = bucket + kSep + child_key;
+          }
+        } else {
+          // If the file isn't beyond our max recursion then count it as a file
+          // unless it's empty and then it depends on whether or not the file 
ends
+          // with a trailing slash
+          std::stringstream child_path_ss;
+          child_path_ss << bucket << kSep << child_key;
+          child_key = child_path_ss.str();
+          if (obj.GetSize() > 0 || !had_trailing_slash) {
+            // We found a real file
+            FileInfo info;
+            info.set_path(child_key);
+            FileObjectToInfo(obj, &info);
+            file_infos.push_back(std::move(info));
+          } else {
+            // We found an empty file and we want to treat it like a 
directory.  Only
+            // add it if we haven't seen this directory before.
+            if (state->directories.insert(child_key).second) {
+              file_infos.push_back(MakeDirectoryInfo(child_key));
+            }
+          }
+        }
 
-    auto handle_error = [select, bucket, key](const AWSError<S3Errors>& error) 
-> Status {
-      if (select.allow_not_found && IsNotFound(error)) {
-        return Status::OK();
+        if (state->include_implicit_dirs) {
+          // Now that we've dealt with the file itself we need to look at each 
of the
+          // parent paths and potentially add them as directories.  For 
example, after
+          // finding a file A/B/C/D we want to consider adding directories A, 
A/B, and
+          // A/B/C.
+          for (const auto& newdir : state->GetNewDirectories(child_key)) {
+            file_infos.push_back(MakeDirectoryInfo(newdir));
+          }
+        }
       }
-      return ErrorToStatus(std::forward_as_tuple("When listing objects under 
key '", key,
-                                                 "' in bucket '", bucket, "': 
"),
-                           "ListObjectsV2", error);
-    };
-
-    auto handle_recursion = [producer, select,
-                             self](int32_t nesting_depth) -> Result<bool> {
-      if (producer.is_closed()) {
-        return false;
+      if (file_infos.size() > 0) {
+        state->empty = false;
       }
-      RETURN_NOT_OK(self->CheckNestingDepth(nesting_depth));
-      return select.recursive && nesting_depth <= select.max_recursion;
-    };
+      return file_infos;
+    }
 
-    auto handle_results =
-        [collector, producer](
-            const std::string& prefix,
-            const S3Model::ListObjectsV2Result& result) mutable -> Status {
-      std::vector<FileInfo> out;
-      RETURN_NOT_OK(collector->Collect(prefix, result, &out));
-      if (!out.empty()) {
-        producer.Push(std::move(out));
+    void Run() {
+      // We are on an I/O thread now so just synchronously make the call and 
interpret the
+      // results.
+      Result<S3ClientLock> client_lock = state->self->holder_->Lock();
+      if (!client_lock.ok()) {
+        state->files_queue.Push(client_lock.status());
+        return;
       }
-      return Status::OK();
-    };
-
-    TreeWalker::WalkAsync(holder_, io_context_, bucket, key, 
kListObjectsMaxKeys,
-                          handle_results, handle_error, handle_recursion)
-        .AddCallback([collector, producer, self](const Status& status) mutable 
{
-          auto st = collector->Finish(self.get());
-          if (!st.ok()) {
-            producer.Push(st);
-          }
-          producer.Close();
-        });
-    return gen;
-  }
-
-  struct WalkResult {
-    std::vector<std::string> file_keys;
-    std::vector<std::string> dir_keys;
-  };
-  Future<std::shared_ptr<WalkResult>> WalkForDeleteDirAsync(const std::string& 
bucket,
-                                                            const std::string& 
key) {
-    auto state = std::make_shared<WalkResult>();
-
-    auto handle_results = [state](const std::string& prefix,
-                                  const S3Model::ListObjectsV2Result& result) 
-> Status {
-      // Walk "files"
-      state->file_keys.reserve(state->file_keys.size() + 
result.GetContents().size());
-      for (const auto& obj : result.GetContents()) {
-        state->file_keys.emplace_back(FromAwsString(obj.GetKey()));
+      S3Model::ListObjectsV2Outcome outcome =
+          client_lock->Move()->ListObjectsV2(state->req);
+      if (!outcome.IsSuccess()) {
+        const auto& err = outcome.GetError();
+        if (state->allow_not_found && IsNotFound(err)) {
+          return;
+        }
+        state->files_queue.Push(
+            ErrorToStatus(std::forward_as_tuple("When listing objects under 
key '",
+                                                state->req.GetPrefix(), "' in 
bucket '",
+                                                state->req.GetBucket(), "': "),
+                          "ListObjectsV2", err));
+        return;
       }
-      // Walk "directories"
-      state->dir_keys.reserve(state->dir_keys.size() + 
result.GetCommonPrefixes().size());
-      for (const auto& prefix : result.GetCommonPrefixes()) {
-        state->dir_keys.emplace_back(FromAwsString(prefix.GetPrefix()));
+      const S3Model::ListObjectsV2Result& result = outcome.GetResult();
+      // We could immediately schedule the continuation (if there are enough 
results to
+      // trigger paging) but that would introduce race condition complexity 
for arguably
+      // little benefit.
+      std::vector<FileInfo> file_infos =
+          ToFileInfos(state->req.GetBucket(), state->req.GetPrefix(), result);
+      if (file_infos.size() > 0) {
+        state->files_queue.Push(std::move(file_infos));
       }
-      return Status::OK();
-    };
 
-    auto handle_error = [=](const AWSError<S3Errors>& error) -> Status {
-      return ErrorToStatus(std::forward_as_tuple("When listing objects under 
key '", key,
-                                                 "' in bucket '", bucket, "': 
"),
-                           "ListObjectsV2", error);
-    };
+      // If there are enough files to warrant a continuation then go ahead and 
schedule
+      // that now.
+      if (result.GetIsTruncated()) {
+        DCHECK(!result.GetNextContinuationToken().empty());
+        state->req.SetContinuationToken(result.GetNextContinuationToken());
+        scheduler->AddTask(std::make_unique<FileListerTask>(state, scheduler));
+      } else {
+        // Otherwise, we have finished listing all the files
+        state->Finish();
+      }
+    }
 
-    auto self = shared_from_this();
-    auto handle_recursion = [self](int32_t nesting_depth) -> Result<bool> {
-      RETURN_NOT_OK(self->CheckNestingDepth(nesting_depth));
-      return true;  // Recurse
-    };
+    Result<Future<>> operator()() override {
+      return state->io_context.executor()->Submit([this] {
+        Run();
+        return Status::OK();
+      });
+    }
+    std::string_view name() const override { return "S3ListFiles"; }
+  };
 
-    return TreeWalker::WalkAsync(holder_, io_context_, bucket, key, 
kListObjectsMaxKeys,
-                                 handle_results, handle_error, 
handle_recursion)
-        .Then([state]() { return state; });
+  // Lists all file, potentially recursively, in a bucket
+  //
+  // include_implicit_dirs controls whether or not implicit directories should 
be
+  // included. These are directories that are not actually file objects but 
instead are
+  // inferred from other objects.
+  //
+  // For example, if a file exists with path A/B/C then implicit directories 
A/ and A/B/
+  // will exist even if there are no file objects with these paths.
+  void ListAsync(const FileSelector& select, const std::string& bucket,
+                 const std::string& key, bool include_implicit_dirs,
+                 util::AsyncTaskScheduler* scheduler, FileInfoSink sink,
+                 S3FileSystem::Impl* self) {

Review Comment:
   Yes.  Good catch.  I'm not sure what I was thinking.  I've removed these 
arguments.



##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -2068,197 +1932,306 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
         "ListObjectsV2", outcome.GetError());
   }
 
-  Status CheckNestingDepth(int32_t nesting_depth) {
-    if (nesting_depth >= kMaxNestingDepth) {
-      return Status::IOError("S3 filesystem tree exceeds maximum nesting depth 
(",
-                             kMaxNestingDepth, ")");
+  static FileInfo MakeDirectoryInfo(std::string dirname) {
+    FileInfo dir;
+    dir.set_type(FileType::Directory);
+    dir.set_path(std::move(dirname));
+    return dir;
+  }
+
+  static std::vector<FileInfo> MakeDirectoryInfos(std::vector<std::string> 
dirnames) {
+    std::vector<FileInfo> dir_infos;
+    for (auto& dirname : dirnames) {
+      dir_infos.push_back(MakeDirectoryInfo(std::move(dirname)));
     }
-    return Status::OK();
+    return dir_infos;
   }
 
-  // A helper class for Walk and WalkAsync
-  struct FileInfoCollector {
-    FileInfoCollector(std::string bucket, std::string key, const FileSelector& 
select)
-        : bucket(std::move(bucket)),
-          key(std::move(key)),
-          allow_not_found(select.allow_not_found) {}
+  using FileInfoSink = PushGenerator<std::vector<FileInfo>>::Producer;
 
-    Status Collect(const std::string& prefix, const 
S3Model::ListObjectsV2Result& result,
-                   std::vector<FileInfo>* out) {
-      // Walk "directories"
-      for (const auto& child_prefix : result.GetCommonPrefixes()) {
-        is_empty = false;
-        const auto child_key =
-            
internal::RemoveTrailingSlash(FromAwsString(child_prefix.GetPrefix()));
-        std::stringstream child_path;
-        child_path << bucket << kSep << child_key;
-        FileInfo info;
-        info.set_path(child_path.str());
-        info.set_type(FileType::Directory);
-        out->push_back(std::move(info));
+  struct FileListerState {
+    FileInfoSink files_queue;
+    const bool allow_not_found;
+    const int max_recursion;
+
+    const bool include_implicit_dirs;
+    const io::IOContext io_context;
+    S3FileSystem::Impl* self;
+
+    S3Model::ListObjectsV2Request req;
+    std::unordered_set<std::string> directories;
+    bool empty = true;
+
+    FileListerState(PushGenerator<std::vector<FileInfo>>::Producer files_queue,
+                    FileSelector select, const std::string& bucket,
+                    const std::string& key, bool include_implicit_dirs,
+                    io::IOContext io_context, S3FileSystem::Impl* self)
+        : files_queue(std::move(files_queue)),
+          allow_not_found(select.allow_not_found),
+          max_recursion(select.max_recursion),
+          include_implicit_dirs(include_implicit_dirs),
+          io_context(io_context),
+          self(self) {
+      req.SetBucket(bucket);
+      req.SetMaxKeys(kListObjectsMaxKeys);
+      if (!key.empty()) {
+        req.SetPrefix(key + kSep);
       }
-      // Walk "files"
-      for (const auto& obj : result.GetContents()) {
-        is_empty = false;
-        FileInfo info;
-        const auto child_key = 
internal::RemoveTrailingSlash(FromAwsString(obj.GetKey()));
-        if (child_key == std::string_view(prefix)) {
-          // Amazon can return the "directory" key itself as part of the 
results, skip
-          continue;
-        }
-        std::stringstream child_path;
-        child_path << bucket << kSep << child_key;
-        info.set_path(child_path.str());
-        FileObjectToInfo(obj, &info);
-        out->push_back(std::move(info));
+      if (!select.recursive) {
+        req.SetDelimiter(Aws::String() + kSep);
       }
-      return Status::OK();
     }
 
-    Status Finish(Impl* impl) {
-      // If no contents were found, perhaps it's an empty "directory",
-      // or perhaps it's a nonexistent entry.  Check.
-      if (is_empty && !allow_not_found) {
-        ARROW_ASSIGN_OR_RAISE(bool is_actually_empty,
-                              impl->IsEmptyDirectory(bucket, key));
-        if (!is_actually_empty) {
-          return PathNotFound(bucket, key);
-        }
+    void Finish() {
+      // `empty` means that we didn't get a single file info back from S3.  
This may be
+      // a situation that we should consider as PathNotFound.
+      //
+      // * If the prefix is empty then we were querying the contents of an 
entire bucket
+      //   and this is not a PathNotFound case because if the bucket didn't 
exist then
+      //   we would have received an error and not an empty set of results.
+      //
+      // * If the prefix is not empty then we asked for all files under a 
particular
+      //   directory.  S3 will also return the directory itself, if it exists. 
 So if
+      //   we get zero results then we know that there are no files under the 
directory
+      //   and the directory itself doesn't exist.  This should be considered 
PathNotFound
+      if (empty && !allow_not_found && !req.GetPrefix().empty()) {
+        files_queue.Push(PathNotFound(req.GetBucket(), req.GetPrefix()));
       }
-      return Status::OK();
     }
 
-    std::string bucket;
-    std::string key;
-    bool allow_not_found;
-    bool is_empty = true;
+    // Given a path, iterate through all possible sub-paths and, if we haven't
+    // seen that sub-path before, return it.
+    //
+    // For example, given A/B/C we might return A/B and A if we have not seen
+    // those paths before.  This allows us to consider "implicit" directories 
which
+    // don't exist as objects in S3 but can be inferred.
+    std::vector<std::string> GetNewDirectories(const std::string_view& path) {
+      std::string current(path);
+      std::string base = req.GetBucket();
+      if (!req.GetPrefix().empty()) {
+        base = base + kSep + 
std::string(internal::RemoveTrailingSlash(req.GetPrefix()));
+      }
+      std::vector<std::string> new_directories;
+      while (true) {
+        const std::string parent_dir = 
internal::GetAbstractPathParent(current).first;
+        if (parent_dir.empty()) {
+          break;
+        }
+        current = parent_dir;
+        if (current == base) {
+          break;
+        }
+        if (directories.insert(parent_dir).second) {
+          new_directories.push_back(std::move(parent_dir));
+        }
+      }
+      return new_directories;
+    }
   };
 
-  // Workhorse for GetFileInfo(FileSelector...)
-  Status Walk(const FileSelector& select, const std::string& bucket,
-              const std::string& key, std::vector<FileInfo>* out) {
-    RETURN_NOT_OK(CheckS3Initialized());
+  struct FileListerTask : public util::AsyncTaskScheduler::Task {
+    std::shared_ptr<FileListerState> state;
+    util::AsyncTaskScheduler* scheduler;
 
-    FileInfoCollector collector(bucket, key, select);
+    FileListerTask(std::shared_ptr<FileListerState> state,
+                   util::AsyncTaskScheduler* scheduler)
+        : state(state), scheduler(scheduler) {}
 
-    auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
-      if (select.allow_not_found && IsNotFound(error)) {
-        return Status::OK();
+    std::vector<FileInfo> ToFileInfos(const std::string& bucket,
+                                      const std::string& prefix,
+                                      const S3Model::ListObjectsV2Result& 
result) {
+      std::vector<FileInfo> file_infos;
+      // If this is a non-recursive listing we may see "common prefixes" which 
represent
+      // directories we did not recurse into.  We will add those as 
directories.
+      for (const auto& child_prefix : result.GetCommonPrefixes()) {
+        const auto child_key =
+            
internal::RemoveTrailingSlash(FromAwsString(child_prefix.GetPrefix()));
+        std::stringstream child_path_ss;
+        child_path_ss << bucket << kSep << child_key;
+        FileInfo info;
+        info.set_path(child_path_ss.str());
+        info.set_type(FileType::Directory);
+        file_infos.push_back(std::move(info));
       }
-      return ErrorToStatus(std::forward_as_tuple("When listing objects under 
key '", key,
-                                                 "' in bucket '", bucket, "': 
"),
-                           "ListObjectsV2", error);
-    };
-
-    auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
-      RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
-      return select.recursive && nesting_depth <= select.max_recursion;
-    };
-
-    auto handle_results = [&](const std::string& prefix,
-                              const S3Model::ListObjectsV2Result& result) -> 
Status {
-      return collector.Collect(prefix, result, out);
-    };
-
-    RETURN_NOT_OK(TreeWalker::Walk(holder_, io_context_, bucket, key, 
kListObjectsMaxKeys,
-                                   handle_results, handle_error, 
handle_recursion));
-
-    // If no contents were found, perhaps it's an empty "directory",
-    // or perhaps it's a nonexistent entry.  Check.
-    RETURN_NOT_OK(collector.Finish(this));
-    // Sort results for convenience, since they can come massively out of order
-    std::sort(out->begin(), out->end(), FileInfo::ByPath{});
-    return Status::OK();
-  }
-
-  // Workhorse for GetFileInfoGenerator(FileSelector...)
-  FileInfoGenerator WalkAsync(const FileSelector& select, const std::string& 
bucket,
-                              const std::string& key) {
-    PushGenerator<std::vector<FileInfo>> gen;
-    auto producer = gen.producer();
-    auto collector = std::make_shared<FileInfoCollector>(bucket, key, select);
-    auto self = shared_from_this();
+      // S3 doesn't have any concept of "max depth" and so we emulate it by 
counting the
+      // number of '/' characters.  E.g. if the user is searching 
bucket/subdirA/subdirB
+      // then the starting depth is 2.
+      // A file subdirA/subdirB/somefile will have a child depth of 2 and a 
"depth" of 0.
+      // A file subdirA/subdirB/subdirC/somefile will have a child depth of 3 
and a
+      //   "depth" of 1
+      int base_depth = internal::GetAbstractPathDepth(prefix);
+      for (const auto& obj : result.GetContents()) {
+        if (obj.GetKey() == prefix) {
+          // S3 will return the basedir itself (if it is a file / empty file). 
 We don't
+          // want that.  But this is still considered "finding the basedir" 
and so we mark
+          // it "not empty".
+          state->empty = false;
+          continue;
+        }
+        std::string child_key =
+            
std::string(internal::RemoveTrailingSlash(FromAwsString(obj.GetKey())));
+        bool had_trailing_slash = child_key.size() != obj.GetKey().size();
+        int child_depth = internal::GetAbstractPathDepth(child_key);
+        // Recursion depth is 1 smaller because a path with depth 1 (e.g. foo) 
is
+        // considered to have a "recursion" of 0
+        int recursion_depth = child_depth - base_depth - 1;
+        if (recursion_depth > state->max_recursion) {
+          // If we have A/B/C/D and max_recursion is 2 then we ignore this 
(don't add it
+          // to file_infos) but we still want to potentially add A and A/B as 
directories.
+          // So we "pretend" like we have a file A/B/C for the call to 
GetNewDirectories
+          // below
+          int to_trim = recursion_depth - state->max_recursion - 1;
+          if (to_trim > 0) {
+            child_key = bucket + kSep +
+                        internal::SliceAbstractPath(child_key, 0, child_depth 
- to_trim);
+          } else {
+            child_key = bucket + kSep + child_key;
+          }
+        } else {
+          // If the file isn't beyond our max recursion then count it as a file
+          // unless it's empty and then it depends on whether or not the file 
ends
+          // with a trailing slash
+          std::stringstream child_path_ss;
+          child_path_ss << bucket << kSep << child_key;
+          child_key = child_path_ss.str();
+          if (obj.GetSize() > 0 || !had_trailing_slash) {
+            // We found a real file
+            FileInfo info;
+            info.set_path(child_key);
+            FileObjectToInfo(obj, &info);
+            file_infos.push_back(std::move(info));
+          } else {
+            // We found an empty file and we want to treat it like a 
directory.  Only
+            // add it if we haven't seen this directory before.
+            if (state->directories.insert(child_key).second) {
+              file_infos.push_back(MakeDirectoryInfo(child_key));
+            }
+          }
+        }
 
-    auto handle_error = [select, bucket, key](const AWSError<S3Errors>& error) 
-> Status {
-      if (select.allow_not_found && IsNotFound(error)) {
-        return Status::OK();
+        if (state->include_implicit_dirs) {
+          // Now that we've dealt with the file itself we need to look at each 
of the
+          // parent paths and potentially add them as directories.  For 
example, after
+          // finding a file A/B/C/D we want to consider adding directories A, 
A/B, and
+          // A/B/C.
+          for (const auto& newdir : state->GetNewDirectories(child_key)) {
+            file_infos.push_back(MakeDirectoryInfo(newdir));
+          }
+        }
       }
-      return ErrorToStatus(std::forward_as_tuple("When listing objects under 
key '", key,
-                                                 "' in bucket '", bucket, "': 
"),
-                           "ListObjectsV2", error);
-    };
-
-    auto handle_recursion = [producer, select,
-                             self](int32_t nesting_depth) -> Result<bool> {
-      if (producer.is_closed()) {
-        return false;
+      if (file_infos.size() > 0) {
+        state->empty = false;
       }
-      RETURN_NOT_OK(self->CheckNestingDepth(nesting_depth));
-      return select.recursive && nesting_depth <= select.max_recursion;
-    };
+      return file_infos;
+    }
 
-    auto handle_results =
-        [collector, producer](
-            const std::string& prefix,
-            const S3Model::ListObjectsV2Result& result) mutable -> Status {
-      std::vector<FileInfo> out;
-      RETURN_NOT_OK(collector->Collect(prefix, result, &out));
-      if (!out.empty()) {
-        producer.Push(std::move(out));
+    void Run() {
+      // We are on an I/O thread now so just synchronously make the call and 
interpret the
+      // results.
+      Result<S3ClientLock> client_lock = state->self->holder_->Lock();
+      if (!client_lock.ok()) {
+        state->files_queue.Push(client_lock.status());
+        return;
       }
-      return Status::OK();
-    };
-
-    TreeWalker::WalkAsync(holder_, io_context_, bucket, key, 
kListObjectsMaxKeys,
-                          handle_results, handle_error, handle_recursion)
-        .AddCallback([collector, producer, self](const Status& status) mutable 
{
-          auto st = collector->Finish(self.get());
-          if (!st.ok()) {
-            producer.Push(st);
-          }
-          producer.Close();
-        });
-    return gen;
-  }
-
-  struct WalkResult {
-    std::vector<std::string> file_keys;
-    std::vector<std::string> dir_keys;
-  };
-  Future<std::shared_ptr<WalkResult>> WalkForDeleteDirAsync(const std::string& 
bucket,
-                                                            const std::string& 
key) {
-    auto state = std::make_shared<WalkResult>();
-
-    auto handle_results = [state](const std::string& prefix,
-                                  const S3Model::ListObjectsV2Result& result) 
-> Status {
-      // Walk "files"
-      state->file_keys.reserve(state->file_keys.size() + 
result.GetContents().size());
-      for (const auto& obj : result.GetContents()) {
-        state->file_keys.emplace_back(FromAwsString(obj.GetKey()));
+      S3Model::ListObjectsV2Outcome outcome =
+          client_lock->Move()->ListObjectsV2(state->req);
+      if (!outcome.IsSuccess()) {
+        const auto& err = outcome.GetError();
+        if (state->allow_not_found && IsNotFound(err)) {
+          return;
+        }
+        state->files_queue.Push(
+            ErrorToStatus(std::forward_as_tuple("When listing objects under 
key '",
+                                                state->req.GetPrefix(), "' in 
bucket '",
+                                                state->req.GetBucket(), "': "),
+                          "ListObjectsV2", err));
+        return;
       }
-      // Walk "directories"
-      state->dir_keys.reserve(state->dir_keys.size() + 
result.GetCommonPrefixes().size());
-      for (const auto& prefix : result.GetCommonPrefixes()) {
-        state->dir_keys.emplace_back(FromAwsString(prefix.GetPrefix()));
+      const S3Model::ListObjectsV2Result& result = outcome.GetResult();
+      // We could immediately schedule the continuation (if there are enough 
results to
+      // trigger paging) but that would introduce race condition complexity 
for arguably
+      // little benefit.
+      std::vector<FileInfo> file_infos =
+          ToFileInfos(state->req.GetBucket(), state->req.GetPrefix(), result);
+      if (file_infos.size() > 0) {
+        state->files_queue.Push(std::move(file_infos));
       }
-      return Status::OK();
-    };
 
-    auto handle_error = [=](const AWSError<S3Errors>& error) -> Status {
-      return ErrorToStatus(std::forward_as_tuple("When listing objects under 
key '", key,
-                                                 "' in bucket '", bucket, "': 
"),
-                           "ListObjectsV2", error);
-    };
+      // If there are enough files to warrant a continuation then go ahead and 
schedule
+      // that now.
+      if (result.GetIsTruncated()) {
+        DCHECK(!result.GetNextContinuationToken().empty());
+        state->req.SetContinuationToken(result.GetNextContinuationToken());
+        scheduler->AddTask(std::make_unique<FileListerTask>(state, scheduler));
+      } else {
+        // Otherwise, we have finished listing all the files
+        state->Finish();
+      }
+    }
 
-    auto self = shared_from_this();
-    auto handle_recursion = [self](int32_t nesting_depth) -> Result<bool> {
-      RETURN_NOT_OK(self->CheckNestingDepth(nesting_depth));
-      return true;  // Recurse
-    };
+    Result<Future<>> operator()() override {
+      return state->io_context.executor()->Submit([this] {
+        Run();
+        return Status::OK();
+      });
+    }
+    std::string_view name() const override { return "S3ListFiles"; }
+  };
 
-    return TreeWalker::WalkAsync(holder_, io_context_, bucket, key, 
kListObjectsMaxKeys,
-                                 handle_results, handle_error, 
handle_recursion)
-        .Then([state]() { return state; });
+  // Lists all file, potentially recursively, in a bucket
+  //
+  // include_implicit_dirs controls whether or not implicit directories should 
be
+  // included. These are directories that are not actually file objects but 
instead are
+  // inferred from other objects.
+  //
+  // For example, if a file exists with path A/B/C then implicit directories 
A/ and A/B/
+  // will exist even if there are no file objects with these paths.
+  void ListAsync(const FileSelector& select, const std::string& bucket,
+                 const std::string& key, bool include_implicit_dirs,
+                 util::AsyncTaskScheduler* scheduler, FileInfoSink sink,
+                 S3FileSystem::Impl* self) {
+    // We can only fetch kListObjectsMaxKeys files at a time and so we create a
+    // scheduler and schedule a task to grab the first batch.  Once that's 
done we
+    // schedule a new task for the next batch.  All of these tasks share the 
same
+    // FileListerState object but none of these tasks run in parallel so there 
is
+    // no need to worry about mutexes
+    auto state = std::make_shared<FileListerState>(
+        sink, select, bucket, key, include_implicit_dirs, io_context_, self);
+
+    // Create the first file lister task (it may spawn more)
+    auto file_lister_task = std::make_unique<FileListerTask>(state, scheduler);
+    scheduler->AddTask(std::move(file_lister_task));
+  }
+
+  // Fully list all files from all buckets
+  void FullListAsync(bool include_implicit_dirs, util::AsyncTaskScheduler* 
scheduler,
+                     FileInfoSink sink, io::IOContext io_context, bool 
recursive,
+                     S3FileSystem::Impl* self) {

Review Comment:
   I've removed this argument



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to