kou commented on code in PR #39009:
URL: https://github.com/apache/arrow/pull/39009#discussion_r1413053977


##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -815,6 +815,233 @@ class AzureFileSystem::Impl {
     }
   }
 
+ private:
+  template <typename OnContainer>
+  Status ListContainers(const Azure::Core::Context& context,
+                        OnContainer&& on_container) const {
+    Azure::Storage::Blobs::ListBlobContainersOptions options;
+    // Deleted containers are not returned.
+    options.Include = 
Azure::Storage::Blobs::Models::ListBlobContainersIncludeFlags::None;
+    try {
+      auto container_list_response =
+          blob_service_client_->ListBlobContainers(options, context);
+      for (; container_list_response.HasPage();
+           container_list_response.MoveToNextPage(context)) {
+        for (const auto& container : container_list_response.BlobContainers) {
+          RETURN_NOT_OK(on_container(container));
+        }
+      }
+    } catch (const Azure::Storage::StorageException& exception) {
+      return internal::ExceptionToStatus("Failed to list account containers.", 
exception);
+    }
+    return Status::OK();
+  }
+
+  static FileInfo FileInfoFromBlob(const std::string& container,
+                                   const 
Azure::Storage::Blobs::Models::BlobItem& blob) {
+    if (blob.Name.back() == internal::kSep) {
+      return DirectoryFileInfoFromPath(container + internal::kSep + blob.Name);
+    }
+    std::string path;
+    path.reserve(container.size() + 1 + blob.Name.size());
+    path += container;
+    path += internal::kSep;
+    path += blob.Name;
+    FileInfo info{std::move(path), FileType::File};
+    info.set_size(blob.BlobSize);
+    
info.set_mtime(std::chrono::system_clock::time_point{blob.Details.LastModified});
+    return info;
+  }
+
+  static FileInfo DirectoryFileInfoFromPath(const std::string& path) {
+    return FileInfo{std::string{internal::RemoveTrailingSlash(path)},
+                    FileType::Directory};
+  }
+
+  static std::string_view BasenameView(std::string_view s) {
+    auto offset = s.find_last_of(internal::kSep);
+    auto tail = (offset == std::string_view::npos) ? s : s.substr(offset);
+    return internal::RemoveTrailingSlash(tail, /*preserve_root=*/false);
+  }
+
+  /// \brief List the blobs at the root of a container or some dir in a 
container.
+  ///
+  /// \pre container_client is the client for the container named like the 
first
+  /// segment of select.base_dir.
+  Status GetFileInfoWithSelectorFromContainer(
+      const Azure::Storage::Blobs::BlobContainerClient& container_client,
+      const Azure::Core::Context& context, Azure::Nullable<int32_t> 
page_size_hint,
+      const FileSelector& select, FileInfoVector* acc_results) {
+    ARROW_ASSIGN_OR_RAISE(auto base_location, 
AzureLocation::FromString(select.base_dir));
+
+    bool found = false;
+    Azure::Storage::Blobs::ListBlobsOptions options;
+    if (internal::GetAbstractPathDepth(base_location.path) == 0) {
+      // If the base_dir is the root of the container, then we want to list 
all blobs in
+      // the container and the Prefix should be empty and not even include the 
trailing
+      // slash because the container itself represents the `<container>/` 
directory.
+      options.Prefix = {};
+      found = true;  // Unless the container itself is not found later!
+    } else {
+      options.Prefix = internal::EnsureTrailingSlash(base_location.path);
+    }
+    options.PageSizeHint = page_size_hint;
+    options.Include = 
Azure::Storage::Blobs::Models::ListBlobsIncludeFlags::Metadata;
+
+    // When Prefix.Value() contains a trailing slash and we find a blob that
+    // matches it completely, it is an empty directory marker blob for the
+    // directory we're listing from, and we should skip it.
+    auto is_empty_dir_marker =
+        [&options](const Azure::Storage::Blobs::Models::BlobItem& blob) 
noexcept -> bool {
+      return options.Prefix.HasValue() && blob.Name == options.Prefix.Value();

Review Comment:
   It seems that we just need to check the trailing slash: 
`HasTraiingSlash(blob.Name)`
   
   ```diff
   diff --git a/cpp/src/arrow/filesystem/path_util.cc 
b/cpp/src/arrow/filesystem/path_util.cc
   index 8931ff7f8..84b3b210c 100644
   --- a/cpp/src/arrow/filesystem/path_util.cc
   +++ b/cpp/src/arrow/filesystem/path_util.cc
   @@ -194,7 +194,7 @@ Status AssertNoTrailingSlash(std::string_view key) {
      return Status::OK();
    }
    
   -bool HasTrailingSlash(std::string_view key) { return key.back() == '/'; }
   +bool HasTrailingSlash(std::string_view key) { return !key.empty() && 
key.back() == '/'; }
    
    bool HasLeadingSlash(std::string_view key) { return key.front() == '/'; }
    ```



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -815,6 +815,233 @@ class AzureFileSystem::Impl {
     }
   }
 
+ private:
+  template <typename OnContainer>
+  Status ListContainers(const Azure::Core::Context& context,
+                        OnContainer&& on_container) const {
+    Azure::Storage::Blobs::ListBlobContainersOptions options;
+    // Deleted containers are not returned.
+    options.Include = 
Azure::Storage::Blobs::Models::ListBlobContainersIncludeFlags::None;
+    try {
+      auto container_list_response =
+          blob_service_client_->ListBlobContainers(options, context);
+      for (; container_list_response.HasPage();
+           container_list_response.MoveToNextPage(context)) {
+        for (const auto& container : container_list_response.BlobContainers) {
+          RETURN_NOT_OK(on_container(container));
+        }
+      }
+    } catch (const Azure::Storage::StorageException& exception) {
+      return internal::ExceptionToStatus("Failed to list account containers.", 
exception);
+    }
+    return Status::OK();
+  }
+
+  static FileInfo FileInfoFromBlob(const std::string& container,
+                                   const 
Azure::Storage::Blobs::Models::BlobItem& blob) {
+    if (blob.Name.back() == internal::kSep) {
+      return DirectoryFileInfoFromPath(container + internal::kSep + blob.Name);
+    }
+    std::string path;
+    path.reserve(container.size() + 1 + blob.Name.size());
+    path += container;
+    path += internal::kSep;
+    path += blob.Name;
+    FileInfo info{std::move(path), FileType::File};
+    info.set_size(blob.BlobSize);
+    
info.set_mtime(std::chrono::system_clock::time_point{blob.Details.LastModified});
+    return info;
+  }
+
+  static FileInfo DirectoryFileInfoFromPath(const std::string& path) {
+    return FileInfo{std::string{internal::RemoveTrailingSlash(path)},
+                    FileType::Directory};
+  }
+
+  static std::string_view BasenameView(std::string_view s) {
+    auto offset = s.find_last_of(internal::kSep);
+    auto tail = (offset == std::string_view::npos) ? s : s.substr(offset);
+    return internal::RemoveTrailingSlash(tail, /*preserve_root=*/false);
+  }
+
+  /// \brief List the blobs at the root of a container or some dir in a 
container.
+  ///
+  /// \pre container_client is the client for the container named like the 
first
+  /// segment of select.base_dir.
+  Status GetFileInfoWithSelectorFromContainer(
+      const Azure::Storage::Blobs::BlobContainerClient& container_client,
+      const Azure::Core::Context& context, Azure::Nullable<int32_t> 
page_size_hint,
+      const FileSelector& select, FileInfoVector* acc_results) {
+    ARROW_ASSIGN_OR_RAISE(auto base_location, 
AzureLocation::FromString(select.base_dir));
+
+    bool found = false;
+    Azure::Storage::Blobs::ListBlobsOptions options;
+    if (internal::GetAbstractPathDepth(base_location.path) == 0) {
+      // If the base_dir is the root of the container, then we want to list 
all blobs in
+      // the container and the Prefix should be empty and not even include the 
trailing
+      // slash because the container itself represents the `<container>/` 
directory.
+      options.Prefix = {};
+      found = true;  // Unless the container itself is not found later!
+    } else {
+      options.Prefix = internal::EnsureTrailingSlash(base_location.path);
+    }
+    options.PageSizeHint = page_size_hint;
+    options.Include = 
Azure::Storage::Blobs::Models::ListBlobsIncludeFlags::Metadata;
+
+    // When Prefix.Value() contains a trailing slash and we find a blob that
+    // matches it completely, it is an empty directory marker blob for the
+    // directory we're listing from, and we should skip it.
+    auto is_empty_dir_marker =
+        [&options](const Azure::Storage::Blobs::Models::BlobItem& blob) 
noexcept -> bool {
+      return options.Prefix.HasValue() && blob.Name == options.Prefix.Value();
+    };
+
+    auto recurse = [&](const std::string& blob_prefix) noexcept -> Status {
+      if (select.recursive && select.max_recursion > 0) {
+        FileSelector sub_select;
+        sub_select.base_dir = base_location.container;
+        sub_select.base_dir += internal::kSep;
+        sub_select.base_dir += internal::RemoveTrailingSlash(blob_prefix);
+        sub_select.allow_not_found = true;
+        sub_select.recursive = true;
+        sub_select.max_recursion = select.max_recursion - 1;
+        return GetFileInfoWithSelectorFromContainer(
+            container_client, context, page_size_hint, sub_select, 
acc_results);
+      }
+      return Status::OK();
+    };
+
+    // (*acc_results)[*last_dir_reported] is the last FileType::Directory in 
the results
+    // produced through this loop over the response pages.
+    std::optional<size_t> last_dir_reported{};
+    auto matches_last_dir_reported = [&last_dir_reported,
+                                      acc_results](const FileInfo& info) 
noexcept {
+      if (!last_dir_reported.has_value() || info.type() != 
FileType::Directory) {
+        return false;
+      }
+      const auto& last_dir = (*acc_results)[*last_dir_reported];
+      return BasenameView(info.path()) == BasenameView(last_dir.path());
+    };
+
+    auto process_blob =
+        [&](const Azure::Storage::Blobs::Models::BlobItem& blob) noexcept {
+          if (!is_empty_dir_marker(blob)) {
+            const auto& info = acc_results->emplace_back(
+                FileInfoFromBlob(base_location.container, blob));
+            if (info.type() == FileType::Directory) {
+              last_dir_reported = acc_results->size() - 1;
+            }
+          }
+        };
+    auto process_prefix = [&](const std::string& prefix) noexcept -> Status {
+      const std::string path = base_location.container + internal::kSep + 
prefix;
+      const auto& info = 
acc_results->emplace_back(DirectoryFileInfoFromPath(path));
+      if (ARROW_PREDICT_FALSE(matches_last_dir_reported(info))) {
+        acc_results->pop_back();
+      } else {
+        last_dir_reported = acc_results->size() - 1;
+        return recurse(prefix);
+      }
+      return Status::OK();
+    };
+
+    try {
+      auto list_response =
+          container_client.ListBlobsByHierarchy(/*delimiter=*/"/", options, 
context);

Review Comment:
   I think that using `internal::kSep` is better here but it's a `char` not 
`char*`...



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1103,7 +1330,12 @@ Result<FileInfo> AzureFileSystem::GetFileInfo(const 
std::string& path) {
 }
 
 Result<FileInfoVector> AzureFileSystem::GetFileInfo(const FileSelector& 
select) {
-  return Status::NotImplemented("The Azure FileSystem is not fully 
implemented");
+  Azure::Core::Context context;
+  Azure::Nullable<int32_t> page_size_hint;  // unspecified

Review Comment:
   I don't object them but why do you want to specify them explicitly?
   Our other methods don't specify them explicitly. (They use the default 
value.)



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -815,6 +815,233 @@ class AzureFileSystem::Impl {
     }
   }
 
+ private:
+  template <typename OnContainer>
+  Status ListContainers(const Azure::Core::Context& context,
+                        OnContainer&& on_container) const {
+    Azure::Storage::Blobs::ListBlobContainersOptions options;
+    // Deleted containers are not returned.
+    options.Include = 
Azure::Storage::Blobs::Models::ListBlobContainersIncludeFlags::None;
+    try {
+      auto container_list_response =
+          blob_service_client_->ListBlobContainers(options, context);
+      for (; container_list_response.HasPage();
+           container_list_response.MoveToNextPage(context)) {
+        for (const auto& container : container_list_response.BlobContainers) {
+          RETURN_NOT_OK(on_container(container));
+        }
+      }
+    } catch (const Azure::Storage::StorageException& exception) {
+      return internal::ExceptionToStatus("Failed to list account containers.", 
exception);
+    }
+    return Status::OK();
+  }
+
+  static FileInfo FileInfoFromBlob(const std::string& container,
+                                   const 
Azure::Storage::Blobs::Models::BlobItem& blob) {
+    if (blob.Name.back() == internal::kSep) {
+      return DirectoryFileInfoFromPath(container + internal::kSep + blob.Name);
+    }
+    std::string path;
+    path.reserve(container.size() + 1 + blob.Name.size());
+    path += container;
+    path += internal::kSep;
+    path += blob.Name;
+    FileInfo info{std::move(path), FileType::File};
+    info.set_size(blob.BlobSize);
+    
info.set_mtime(std::chrono::system_clock::time_point{blob.Details.LastModified});
+    return info;
+  }
+
+  static FileInfo DirectoryFileInfoFromPath(const std::string& path) {
+    return FileInfo{std::string{internal::RemoveTrailingSlash(path)},
+                    FileType::Directory};
+  }
+
+  static std::string_view BasenameView(std::string_view s) {
+    auto offset = s.find_last_of(internal::kSep);
+    auto tail = (offset == std::string_view::npos) ? s : s.substr(offset);
+    return internal::RemoveTrailingSlash(tail, /*preserve_root=*/false);
+  }
+
+  /// \brief List the blobs at the root of a container or some dir in a 
container.
+  ///
+  /// \pre container_client is the client for the container named like the 
first
+  /// segment of select.base_dir.
+  Status GetFileInfoWithSelectorFromContainer(
+      const Azure::Storage::Blobs::BlobContainerClient& container_client,
+      const Azure::Core::Context& context, Azure::Nullable<int32_t> 
page_size_hint,
+      const FileSelector& select, FileInfoVector* acc_results) {
+    ARROW_ASSIGN_OR_RAISE(auto base_location, 
AzureLocation::FromString(select.base_dir));
+
+    bool found = false;
+    Azure::Storage::Blobs::ListBlobsOptions options;
+    if (internal::GetAbstractPathDepth(base_location.path) == 0) {
+      // If the base_dir is the root of the container, then we want to list 
all blobs in
+      // the container and the Prefix should be empty and not even include the 
trailing
+      // slash because the container itself represents the `<container>/` 
directory.
+      options.Prefix = {};
+      found = true;  // Unless the container itself is not found later!
+    } else {
+      options.Prefix = internal::EnsureTrailingSlash(base_location.path);
+    }
+    options.PageSizeHint = page_size_hint;
+    options.Include = 
Azure::Storage::Blobs::Models::ListBlobsIncludeFlags::Metadata;
+
+    // When Prefix.Value() contains a trailing slash and we find a blob that
+    // matches it completely, it is an empty directory marker blob for the
+    // directory we're listing from, and we should skip it.
+    auto is_empty_dir_marker =
+        [&options](const Azure::Storage::Blobs::Models::BlobItem& blob) 
noexcept -> bool {
+      return options.Prefix.HasValue() && blob.Name == options.Prefix.Value();
+    };
+
+    auto recurse = [&](const std::string& blob_prefix) noexcept -> Status {
+      if (select.recursive && select.max_recursion > 0) {
+        FileSelector sub_select;
+        sub_select.base_dir = base_location.container;
+        sub_select.base_dir += internal::kSep;
+        sub_select.base_dir += internal::RemoveTrailingSlash(blob_prefix);
+        sub_select.allow_not_found = true;
+        sub_select.recursive = true;
+        sub_select.max_recursion = select.max_recursion - 1;
+        return GetFileInfoWithSelectorFromContainer(
+            container_client, context, page_size_hint, sub_select, 
acc_results);
+      }
+      return Status::OK();
+    };
+
+    // (*acc_results)[*last_dir_reported] is the last FileType::Directory in 
the results
+    // produced through this loop over the response pages.
+    std::optional<size_t> last_dir_reported{};
+    auto matches_last_dir_reported = [&last_dir_reported,
+                                      acc_results](const FileInfo& info) 
noexcept {
+      if (!last_dir_reported.has_value() || info.type() != 
FileType::Directory) {
+        return false;
+      }
+      const auto& last_dir = (*acc_results)[*last_dir_reported];
+      return BasenameView(info.path()) == BasenameView(last_dir.path());
+    };
+
+    auto process_blob =
+        [&](const Azure::Storage::Blobs::Models::BlobItem& blob) noexcept {
+          if (!is_empty_dir_marker(blob)) {
+            const auto& info = acc_results->emplace_back(
+                FileInfoFromBlob(base_location.container, blob));
+            if (info.type() == FileType::Directory) {
+              last_dir_reported = acc_results->size() - 1;
+            }
+          }
+        };
+    auto process_prefix = [&](const std::string& prefix) noexcept -> Status {
+      const std::string path = base_location.container + internal::kSep + 
prefix;
+      const auto& info = 
acc_results->emplace_back(DirectoryFileInfoFromPath(path));
+      if (ARROW_PREDICT_FALSE(matches_last_dir_reported(info))) {
+        acc_results->pop_back();
+      } else {
+        last_dir_reported = acc_results->size() - 1;
+        return recurse(prefix);
+      }
+      return Status::OK();
+    };
+
+    try {
+      auto list_response =
+          container_client.ListBlobsByHierarchy(/*delimiter=*/"/", options, 
context);
+      for (; list_response.HasPage(); list_response.MoveToNextPage(context)) {
+        if (list_response.Blobs.empty() && list_response.BlobPrefixes.empty()) 
{
+          continue;
+        }
+        found = true;
+        // Blob and BlobPrefixes are sorted by name, so we can merge-iterate
+        // them to ensure returned results are all sorted.
+        size_t blob_index = 0;
+        size_t blob_prefix_index = 0;
+        while (blob_index < list_response.Blobs.size() &&
+               blob_prefix_index < list_response.BlobPrefixes.size()) {
+          const auto& blob = list_response.Blobs[blob_index];
+          const auto& prefix = list_response.BlobPrefixes[blob_prefix_index];
+          const int cmp = blob.Name.compare(prefix);
+          if (cmp < 0) {
+            process_blob(blob);
+            blob_index += 1;
+          } else if (cmp > 0) {
+            RETURN_NOT_OK(process_prefix(prefix));
+            blob_prefix_index += 1;
+          } else {  // there is a blob (empty dir marker) and a prefix with 
the same name
+            DCHECK_EQ(blob.Name, prefix);
+            RETURN_NOT_OK(process_prefix(prefix));
+            blob_index += 1;
+            blob_prefix_index += 1;
+          }
+        }
+        for (; blob_index < list_response.Blobs.size(); blob_index++) {
+          process_blob(list_response.Blobs[blob_index]);
+        }
+        for (; blob_prefix_index < list_response.BlobPrefixes.size();
+             blob_prefix_index++) {
+          
RETURN_NOT_OK(process_prefix(list_response.BlobPrefixes[blob_prefix_index]));
+        }
+      }
+    } catch (const Azure::Storage::StorageException& exception) {
+      if (exception.ErrorCode == "ContainerNotFound") {
+        found = false;
+      } else {
+        return internal::ExceptionToStatus(
+            "Failed to list blobs in a directory: " + select.base_dir + ": " +
+                container_client.GetUrl(),
+            exception);
+      }
+    }
+
+    return found || select.allow_not_found
+               ? Status::OK()
+               : ::arrow::fs::internal::PathNotFound(select.base_dir);
+  }
+
+ public:
+  Status GetFileInfoWithSelector(const Azure::Core::Context& context,
+                                 Azure::Nullable<int32_t> page_size_hint,
+                                 const FileSelector& select,
+                                 FileInfoVector* acc_results) {
+    ARROW_ASSIGN_OR_RAISE(auto base_location, 
AzureLocation::FromString(select.base_dir));
+
+    if (base_location.container.empty()) {
+      // Without a container, the base_location is equivalent to the filesystem
+      // root -- `/`. FileSelector::allow_not_found doesn't matter in this case
+      // because the root always exists.
+      auto on_container =
+          [&](const Azure::Storage::Blobs::Models::BlobContainerItem& 
container) {
+            // Deleted containers are not listed by ListContainers.
+            DCHECK(!container.IsDeleted);
+
+            // Every container is considered a directory.
+            FileInfo info{container.Name, FileType::Directory};
+            info.set_mtime(
+                
std::chrono::system_clock::time_point{container.Details.LastModified});
+            acc_results->push_back(std::move(info));
+
+            // Recurse into containers (subdirectories) if requested.
+            if (select.recursive && select.max_recursion > 0) {
+              FileSelector sub_select;
+              sub_select.base_dir = container.Name;
+              sub_select.allow_not_found = true;
+              sub_select.recursive = true;
+              sub_select.max_recursion = select.max_recursion - 1;
+              ARROW_RETURN_NOT_OK(GetFileInfoWithSelector(context, 
page_size_hint,
+                                                          sub_select, 
acc_results));
+            }
+            return Status::OK();
+          };
+      return ListContainers(context, std::move(on_container));

Review Comment:
   Should we pass `page_size_hint` here too?



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -815,6 +815,233 @@ class AzureFileSystem::Impl {
     }
   }
 
+ private:
+  template <typename OnContainer>
+  Status ListContainers(const Azure::Core::Context& context,

Review Comment:
   How about renaming this to `EachContainer()`?
   Because this doesn't return a list of containers.



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -815,6 +815,233 @@ class AzureFileSystem::Impl {
     }
   }
 
+ private:
+  template <typename OnContainer>
+  Status ListContainers(const Azure::Core::Context& context,
+                        OnContainer&& on_container) const {
+    Azure::Storage::Blobs::ListBlobContainersOptions options;
+    // Deleted containers are not returned.
+    options.Include = 
Azure::Storage::Blobs::Models::ListBlobContainersIncludeFlags::None;
+    try {
+      auto container_list_response =
+          blob_service_client_->ListBlobContainers(options, context);
+      for (; container_list_response.HasPage();
+           container_list_response.MoveToNextPage(context)) {
+        for (const auto& container : container_list_response.BlobContainers) {
+          RETURN_NOT_OK(on_container(container));
+        }
+      }
+    } catch (const Azure::Storage::StorageException& exception) {
+      return internal::ExceptionToStatus("Failed to list account containers.", 
exception);
+    }
+    return Status::OK();
+  }
+
+  static FileInfo FileInfoFromBlob(const std::string& container,
+                                   const 
Azure::Storage::Blobs::Models::BlobItem& blob) {
+    if (blob.Name.back() == internal::kSep) {
+      return DirectoryFileInfoFromPath(container + internal::kSep + blob.Name);
+    }
+    std::string path;
+    path.reserve(container.size() + 1 + blob.Name.size());
+    path += container;
+    path += internal::kSep;
+    path += blob.Name;
+    FileInfo info{std::move(path), FileType::File};
+    info.set_size(blob.BlobSize);
+    
info.set_mtime(std::chrono::system_clock::time_point{blob.Details.LastModified});
+    return info;
+  }
+
+  static FileInfo DirectoryFileInfoFromPath(const std::string& path) {
+    return FileInfo{std::string{internal::RemoveTrailingSlash(path)},
+                    FileType::Directory};
+  }
+
+  static std::string_view BasenameView(std::string_view s) {
+    auto offset = s.find_last_of(internal::kSep);
+    auto tail = (offset == std::string_view::npos) ? s : s.substr(offset);
+    return internal::RemoveTrailingSlash(tail, /*preserve_root=*/false);
+  }
+
+  /// \brief List the blobs at the root of a container or some dir in a 
container.
+  ///
+  /// \pre container_client is the client for the container named like the 
first
+  /// segment of select.base_dir.
+  Status GetFileInfoWithSelectorFromContainer(
+      const Azure::Storage::Blobs::BlobContainerClient& container_client,
+      const Azure::Core::Context& context, Azure::Nullable<int32_t> 
page_size_hint,
+      const FileSelector& select, FileInfoVector* acc_results) {
+    ARROW_ASSIGN_OR_RAISE(auto base_location, 
AzureLocation::FromString(select.base_dir));
+
+    bool found = false;
+    Azure::Storage::Blobs::ListBlobsOptions options;
+    if (internal::GetAbstractPathDepth(base_location.path) == 0) {
+      // If the base_dir is the root of the container, then we want to list 
all blobs in
+      // the container and the Prefix should be empty and not even include the 
trailing
+      // slash because the container itself represents the `<container>/` 
directory.
+      options.Prefix = {};
+      found = true;  // Unless the container itself is not found later!
+    } else {
+      options.Prefix = internal::EnsureTrailingSlash(base_location.path);
+    }
+    options.PageSizeHint = page_size_hint;
+    options.Include = 
Azure::Storage::Blobs::Models::ListBlobsIncludeFlags::Metadata;
+
+    // When Prefix.Value() contains a trailing slash and we find a blob that
+    // matches it completely, it is an empty directory marker blob for the
+    // directory we're listing from, and we should skip it.
+    auto is_empty_dir_marker =
+        [&options](const Azure::Storage::Blobs::Models::BlobItem& blob) 
noexcept -> bool {
+      return options.Prefix.HasValue() && blob.Name == options.Prefix.Value();
+    };
+
+    auto recurse = [&](const std::string& blob_prefix) noexcept -> Status {
+      if (select.recursive && select.max_recursion > 0) {
+        FileSelector sub_select;
+        sub_select.base_dir = base_location.container;
+        sub_select.base_dir += internal::kSep;
+        sub_select.base_dir += internal::RemoveTrailingSlash(blob_prefix);
+        sub_select.allow_not_found = true;
+        sub_select.recursive = true;
+        sub_select.max_recursion = select.max_recursion - 1;
+        return GetFileInfoWithSelectorFromContainer(
+            container_client, context, page_size_hint, sub_select, 
acc_results);
+      }
+      return Status::OK();
+    };
+
+    // (*acc_results)[*last_dir_reported] is the last FileType::Directory in 
the results
+    // produced through this loop over the response pages.
+    std::optional<size_t> last_dir_reported{};
+    auto matches_last_dir_reported = [&last_dir_reported,
+                                      acc_results](const FileInfo& info) 
noexcept {
+      if (!last_dir_reported.has_value() || info.type() != 
FileType::Directory) {
+        return false;
+      }
+      const auto& last_dir = (*acc_results)[*last_dir_reported];
+      return BasenameView(info.path()) == BasenameView(last_dir.path());
+    };
+
+    auto process_blob =
+        [&](const Azure::Storage::Blobs::Models::BlobItem& blob) noexcept {
+          if (!is_empty_dir_marker(blob)) {
+            const auto& info = acc_results->emplace_back(
+                FileInfoFromBlob(base_location.container, blob));
+            if (info.type() == FileType::Directory) {
+              last_dir_reported = acc_results->size() - 1;
+            }
+          }
+        };
+    auto process_prefix = [&](const std::string& prefix) noexcept -> Status {
+      const std::string path = base_location.container + internal::kSep + 
prefix;
+      const auto& info = 
acc_results->emplace_back(DirectoryFileInfoFromPath(path));
+      if (ARROW_PREDICT_FALSE(matches_last_dir_reported(info))) {
+        acc_results->pop_back();
+      } else {
+        last_dir_reported = acc_results->size() - 1;
+        return recurse(prefix);
+      }
+      return Status::OK();
+    };
+
+    try {
+      auto list_response =
+          container_client.ListBlobsByHierarchy(/*delimiter=*/"/", options, 
context);

Review Comment:
   Can we use `ListBlobs()` instead when `select.max_recursion == INT32_MAX` as 
an optimization?



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1103,7 +1330,12 @@ Result<FileInfo> AzureFileSystem::GetFileInfo(const 
std::string& path) {
 }
 
 Result<FileInfoVector> AzureFileSystem::GetFileInfo(const FileSelector& 
select) {
-  return Status::NotImplemented("The Azure FileSystem is not fully 
implemented");
+  Azure::Core::Context context;
+  Azure::Nullable<int32_t> page_size_hint;  // unspecified
+  FileInfoVector results;
+  RETURN_NOT_OK(
+      impl_->GetFileInfoWithSelector(context, page_size_hint, select, 
&results));
+  return std::move(results);

Review Comment:
   Do we need this `std::move()`?
   Does this prevent NRVO (Named Return Value Optimization)?



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1103,7 +1330,12 @@ Result<FileInfo> AzureFileSystem::GetFileInfo(const 
std::string& path) {
 }
 
 Result<FileInfoVector> AzureFileSystem::GetFileInfo(const FileSelector& 
select) {
-  return Status::NotImplemented("The Azure FileSystem is not fully 
implemented");
+  Azure::Core::Context context;
+  Azure::Nullable<int32_t> page_size_hint;  // unspecified
+  FileInfoVector results;
+  RETURN_NOT_OK(
+      impl_->GetFileInfoWithSelector(context, page_size_hint, select, 
&results));

Review Comment:
   How about removing `WithSelector` suffix because `FileSystem::GetFileInfo()` 
doesn't use the suffix?



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -815,6 +815,233 @@ class AzureFileSystem::Impl {
     }
   }
 
+ private:
+  template <typename OnContainer>
+  Status ListContainers(const Azure::Core::Context& context,
+                        OnContainer&& on_container) const {
+    Azure::Storage::Blobs::ListBlobContainersOptions options;
+    // Deleted containers are not returned.
+    options.Include = 
Azure::Storage::Blobs::Models::ListBlobContainersIncludeFlags::None;
+    try {
+      auto container_list_response =
+          blob_service_client_->ListBlobContainers(options, context);
+      for (; container_list_response.HasPage();
+           container_list_response.MoveToNextPage(context)) {
+        for (const auto& container : container_list_response.BlobContainers) {
+          RETURN_NOT_OK(on_container(container));
+        }
+      }
+    } catch (const Azure::Storage::StorageException& exception) {
+      return internal::ExceptionToStatus("Failed to list account containers.", 
exception);
+    }
+    return Status::OK();
+  }
+
+  static FileInfo FileInfoFromBlob(const std::string& container,
+                                   const 
Azure::Storage::Blobs::Models::BlobItem& blob) {
+    if (blob.Name.back() == internal::kSep) {
+      return DirectoryFileInfoFromPath(container + internal::kSep + blob.Name);
+    }
+    std::string path;
+    path.reserve(container.size() + 1 + blob.Name.size());
+    path += container;
+    path += internal::kSep;
+    path += blob.Name;
+    FileInfo info{std::move(path), FileType::File};
+    info.set_size(blob.BlobSize);
+    
info.set_mtime(std::chrono::system_clock::time_point{blob.Details.LastModified});
+    return info;
+  }
+
+  static FileInfo DirectoryFileInfoFromPath(const std::string& path) {
+    return FileInfo{std::string{internal::RemoveTrailingSlash(path)},
+                    FileType::Directory};
+  }
+
+  static std::string_view BasenameView(std::string_view s) {
+    auto offset = s.find_last_of(internal::kSep);
+    auto tail = (offset == std::string_view::npos) ? s : s.substr(offset);
+    return internal::RemoveTrailingSlash(tail, /*preserve_root=*/false);
+  }
+
+  /// \brief List the blobs at the root of a container or some dir in a 
container.
+  ///
+  /// \pre container_client is the client for the container named like the 
first
+  /// segment of select.base_dir.
+  Status GetFileInfoWithSelectorFromContainer(
+      const Azure::Storage::Blobs::BlobContainerClient& container_client,
+      const Azure::Core::Context& context, Azure::Nullable<int32_t> 
page_size_hint,
+      const FileSelector& select, FileInfoVector* acc_results) {
+    ARROW_ASSIGN_OR_RAISE(auto base_location, 
AzureLocation::FromString(select.base_dir));
+
+    bool found = false;
+    Azure::Storage::Blobs::ListBlobsOptions options;
+    if (internal::GetAbstractPathDepth(base_location.path) == 0) {
+      // If the base_dir is the root of the container, then we want to list 
all blobs in
+      // the container and the Prefix should be empty and not even include the 
trailing
+      // slash because the container itself represents the `<container>/` 
directory.
+      options.Prefix = {};
+      found = true;  // Unless the container itself is not found later!
+    } else {
+      options.Prefix = internal::EnsureTrailingSlash(base_location.path);
+    }
+    options.PageSizeHint = page_size_hint;
+    options.Include = 
Azure::Storage::Blobs::Models::ListBlobsIncludeFlags::Metadata;
+
+    // When Prefix.Value() contains a trailing slash and we find a blob that
+    // matches it completely, it is an empty directory marker blob for the
+    // directory we're listing from, and we should skip it.
+    auto is_empty_dir_marker =
+        [&options](const Azure::Storage::Blobs::Models::BlobItem& blob) 
noexcept -> bool {
+      return options.Prefix.HasValue() && blob.Name == options.Prefix.Value();
+    };
+
+    auto recurse = [&](const std::string& blob_prefix) noexcept -> Status {
+      if (select.recursive && select.max_recursion > 0) {
+        FileSelector sub_select;
+        sub_select.base_dir = base_location.container;
+        sub_select.base_dir += internal::kSep;
+        sub_select.base_dir += internal::RemoveTrailingSlash(blob_prefix);
+        sub_select.allow_not_found = true;
+        sub_select.recursive = true;
+        sub_select.max_recursion = select.max_recursion - 1;
+        return GetFileInfoWithSelectorFromContainer(
+            container_client, context, page_size_hint, sub_select, 
acc_results);
+      }
+      return Status::OK();
+    };
+
+    // (*acc_results)[*last_dir_reported] is the last FileType::Directory in 
the results
+    // produced through this loop over the response pages.
+    std::optional<size_t> last_dir_reported{};

Review Comment:
   Is this really needed?
   The added tests are passed without this:
   
   ```diff
   diff --git a/cpp/src/arrow/filesystem/azurefs.cc 
b/cpp/src/arrow/filesystem/azurefs.cc
   index ffcab5755..75f3ea68c 100644
   --- a/cpp/src/arrow/filesystem/azurefs.cc
   +++ b/cpp/src/arrow/filesystem/azurefs.cc
   @@ -913,15 +913,15 @@ class AzureFileSystem::Impl {
    
        // (*acc_results)[*last_dir_reported] is the last FileType::Directory 
in the results
        // produced through this loop over the response pages.
   -    std::optional<size_t> last_dir_reported{};
   -    auto matches_last_dir_reported = [&last_dir_reported,
   -                                      acc_results](const FileInfo& info) 
noexcept {
   -      if (!last_dir_reported.has_value() || info.type() != 
FileType::Directory) {
   -        return false;
   -      }
   -      const auto& last_dir = (*acc_results)[*last_dir_reported];
   -      return BasenameView(info.path()) == BasenameView(last_dir.path());
   -    };
   +    // std::optional<size_t> last_dir_reported{};
   +    // auto matches_last_dir_reported = [&last_dir_reported,
   +    //                                   acc_results](const FileInfo& info) 
noexcept {
   +    //   if (!last_dir_reported.has_value() || info.type() != 
FileType::Directory) {
   +    //     return false;
   +    //   }
   +    //   const auto& last_dir = (*acc_results)[*last_dir_reported];
   +    //   return BasenameView(info.path()) == BasenameView(last_dir.path());
   +    // };
    
        auto process_blob =
            [&](const Azure::Storage::Blobs::Models::BlobItem& blob) noexcept {
   @@ -929,20 +929,21 @@ class AzureFileSystem::Impl {
                const auto& info = acc_results->emplace_back(
                    FileInfoFromBlob(base_location.container, blob));
                if (info.type() == FileType::Directory) {
   -              last_dir_reported = acc_results->size() - 1;
   +              // last_dir_reported = acc_results->size() - 1;
                }
              }
            };
        auto process_prefix = [&](const std::string& prefix) noexcept -> Status 
{
          const std::string path = base_location.container + internal::kSep + 
prefix;
          const auto& info = 
acc_results->emplace_back(DirectoryFileInfoFromPath(path));
   -      if (ARROW_PREDICT_FALSE(matches_last_dir_reported(info))) {
   -        acc_results->pop_back();
   -      } else {
   -        last_dir_reported = acc_results->size() - 1;
   +      info.type();
   +      // if (ARROW_PREDICT_FALSE(matches_last_dir_reported(info))) {
   +        // acc_results->pop_back();
   +      // } else {
   +        // last_dir_reported = acc_results->size() - 1;
            return recurse(prefix);
   -      }
   -      return Status::OK();
   +      // }
   +      // return Status::OK();
        };
    
        try {
   ```



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -815,6 +815,233 @@ class AzureFileSystem::Impl {
     }
   }
 
+ private:
+  template <typename OnContainer>
+  Status ListContainers(const Azure::Core::Context& context,
+                        OnContainer&& on_container) const {
+    Azure::Storage::Blobs::ListBlobContainersOptions options;
+    // Deleted containers are not returned.
+    options.Include = 
Azure::Storage::Blobs::Models::ListBlobContainersIncludeFlags::None;
+    try {
+      auto container_list_response =
+          blob_service_client_->ListBlobContainers(options, context);
+      for (; container_list_response.HasPage();
+           container_list_response.MoveToNextPage(context)) {
+        for (const auto& container : container_list_response.BlobContainers) {
+          RETURN_NOT_OK(on_container(container));
+        }
+      }
+    } catch (const Azure::Storage::StorageException& exception) {
+      return internal::ExceptionToStatus("Failed to list account containers.", 
exception);
+    }
+    return Status::OK();
+  }
+
+  static FileInfo FileInfoFromBlob(const std::string& container,
+                                   const 
Azure::Storage::Blobs::Models::BlobItem& blob) {
+    if (blob.Name.back() == internal::kSep) {
+      return DirectoryFileInfoFromPath(container + internal::kSep + blob.Name);
+    }
+    std::string path;
+    path.reserve(container.size() + 1 + blob.Name.size());
+    path += container;
+    path += internal::kSep;
+    path += blob.Name;
+    FileInfo info{std::move(path), FileType::File};
+    info.set_size(blob.BlobSize);
+    
info.set_mtime(std::chrono::system_clock::time_point{blob.Details.LastModified});
+    return info;
+  }
+
+  static FileInfo DirectoryFileInfoFromPath(const std::string& path) {
+    return FileInfo{std::string{internal::RemoveTrailingSlash(path)},
+                    FileType::Directory};
+  }
+
+  static std::string_view BasenameView(std::string_view s) {
+    auto offset = s.find_last_of(internal::kSep);
+    auto tail = (offset == std::string_view::npos) ? s : s.substr(offset);
+    return internal::RemoveTrailingSlash(tail, /*preserve_root=*/false);
+  }
+
+  /// \brief List the blobs at the root of a container or some dir in a 
container.
+  ///
+  /// \pre container_client is the client for the container named like the 
first
+  /// segment of select.base_dir.
+  Status GetFileInfoWithSelectorFromContainer(
+      const Azure::Storage::Blobs::BlobContainerClient& container_client,
+      const Azure::Core::Context& context, Azure::Nullable<int32_t> 
page_size_hint,
+      const FileSelector& select, FileInfoVector* acc_results) {
+    ARROW_ASSIGN_OR_RAISE(auto base_location, 
AzureLocation::FromString(select.base_dir));
+
+    bool found = false;
+    Azure::Storage::Blobs::ListBlobsOptions options;
+    if (internal::GetAbstractPathDepth(base_location.path) == 0) {
+      // If the base_dir is the root of the container, then we want to list 
all blobs in
+      // the container and the Prefix should be empty and not even include the 
trailing
+      // slash because the container itself represents the `<container>/` 
directory.
+      options.Prefix = {};
+      found = true;  // Unless the container itself is not found later!
+    } else {
+      options.Prefix = internal::EnsureTrailingSlash(base_location.path);
+    }
+    options.PageSizeHint = page_size_hint;
+    options.Include = 
Azure::Storage::Blobs::Models::ListBlobsIncludeFlags::Metadata;
+
+    // When Prefix.Value() contains a trailing slash and we find a blob that
+    // matches it completely, it is an empty directory marker blob for the
+    // directory we're listing from, and we should skip it.
+    auto is_empty_dir_marker =
+        [&options](const Azure::Storage::Blobs::Models::BlobItem& blob) 
noexcept -> bool {
+      return options.Prefix.HasValue() && blob.Name == options.Prefix.Value();
+    };
+
+    auto recurse = [&](const std::string& blob_prefix) noexcept -> Status {
+      if (select.recursive && select.max_recursion > 0) {
+        FileSelector sub_select;
+        sub_select.base_dir = base_location.container;
+        sub_select.base_dir += internal::kSep;
+        sub_select.base_dir += internal::RemoveTrailingSlash(blob_prefix);
+        sub_select.allow_not_found = true;
+        sub_select.recursive = true;
+        sub_select.max_recursion = select.max_recursion - 1;
+        return GetFileInfoWithSelectorFromContainer(
+            container_client, context, page_size_hint, sub_select, 
acc_results);
+      }
+      return Status::OK();
+    };
+
+    // (*acc_results)[*last_dir_reported] is the last FileType::Directory in 
the results
+    // produced through this loop over the response pages.
+    std::optional<size_t> last_dir_reported{};
+    auto matches_last_dir_reported = [&last_dir_reported,
+                                      acc_results](const FileInfo& info) 
noexcept {
+      if (!last_dir_reported.has_value() || info.type() != 
FileType::Directory) {
+        return false;
+      }
+      const auto& last_dir = (*acc_results)[*last_dir_reported];
+      return BasenameView(info.path()) == BasenameView(last_dir.path());
+    };
+
+    auto process_blob =
+        [&](const Azure::Storage::Blobs::Models::BlobItem& blob) noexcept {
+          if (!is_empty_dir_marker(blob)) {
+            const auto& info = acc_results->emplace_back(
+                FileInfoFromBlob(base_location.container, blob));
+            if (info.type() == FileType::Directory) {
+              last_dir_reported = acc_results->size() - 1;
+            }
+          }
+        };
+    auto process_prefix = [&](const std::string& prefix) noexcept -> Status {
+      const std::string path = base_location.container + internal::kSep + 
prefix;
+      const auto& info = 
acc_results->emplace_back(DirectoryFileInfoFromPath(path));
+      if (ARROW_PREDICT_FALSE(matches_last_dir_reported(info))) {
+        acc_results->pop_back();
+      } else {
+        last_dir_reported = acc_results->size() - 1;
+        return recurse(prefix);
+      }
+      return Status::OK();
+    };
+
+    try {
+      auto list_response =
+          container_client.ListBlobsByHierarchy(/*delimiter=*/"/", options, 
context);
+      for (; list_response.HasPage(); list_response.MoveToNextPage(context)) {
+        if (list_response.Blobs.empty() && list_response.BlobPrefixes.empty()) 
{
+          continue;
+        }
+        found = true;
+        // Blob and BlobPrefixes are sorted by name, so we can merge-iterate
+        // them to ensure returned results are all sorted.
+        size_t blob_index = 0;
+        size_t blob_prefix_index = 0;
+        while (blob_index < list_response.Blobs.size() &&
+               blob_prefix_index < list_response.BlobPrefixes.size()) {
+          const auto& blob = list_response.Blobs[blob_index];
+          const auto& prefix = list_response.BlobPrefixes[blob_prefix_index];
+          const int cmp = blob.Name.compare(prefix);
+          if (cmp < 0) {
+            process_blob(blob);
+            blob_index += 1;
+          } else if (cmp > 0) {
+            RETURN_NOT_OK(process_prefix(prefix));
+            blob_prefix_index += 1;
+          } else {  // there is a blob (empty dir marker) and a prefix with 
the same name

Review Comment:
   Is this really happen?
   `emptydir/` is a case of this in the test, right?
   In this case, `emptydir/` may exist in `Blobs` (with `options.Prefix = 
`emptydir/`) or `BlobPrefixes` (with no `options.Prefix`) but doesn't exit in 
both of them. (I checked this with `gdb`.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to