felipecrv commented on code in PR #39009:
URL: https://github.com/apache/arrow/pull/39009#discussion_r1414721290
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -815,6 +815,233 @@ class AzureFileSystem::Impl {
}
}
+ private:
+ template <typename OnContainer>
+ Status ListContainers(const Azure::Core::Context& context,
+ OnContainer&& on_container) const {
+ Azure::Storage::Blobs::ListBlobContainersOptions options;
+ // Deleted containers are not returned.
+ options.Include =
Azure::Storage::Blobs::Models::ListBlobContainersIncludeFlags::None;
+ try {
+ auto container_list_response =
+ blob_service_client_->ListBlobContainers(options, context);
+ for (; container_list_response.HasPage();
+ container_list_response.MoveToNextPage(context)) {
+ for (const auto& container : container_list_response.BlobContainers) {
+ RETURN_NOT_OK(on_container(container));
+ }
+ }
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus("Failed to list account containers.",
exception);
+ }
+ return Status::OK();
+ }
+
+ static FileInfo FileInfoFromBlob(const std::string& container,
+ const
Azure::Storage::Blobs::Models::BlobItem& blob) {
+ if (blob.Name.back() == internal::kSep) {
+ return DirectoryFileInfoFromPath(container + internal::kSep + blob.Name);
+ }
+ std::string path;
+ path.reserve(container.size() + 1 + blob.Name.size());
+ path += container;
+ path += internal::kSep;
+ path += blob.Name;
+ FileInfo info{std::move(path), FileType::File};
+ info.set_size(blob.BlobSize);
+
info.set_mtime(std::chrono::system_clock::time_point{blob.Details.LastModified});
+ return info;
+ }
+
+ static FileInfo DirectoryFileInfoFromPath(const std::string& path) {
+ return FileInfo{std::string{internal::RemoveTrailingSlash(path)},
+ FileType::Directory};
+ }
+
+ static std::string_view BasenameView(std::string_view s) {
+ auto offset = s.find_last_of(internal::kSep);
+ auto tail = (offset == std::string_view::npos) ? s : s.substr(offset);
+ return internal::RemoveTrailingSlash(tail, /*preserve_root=*/false);
+ }
+
+ /// \brief List the blobs at the root of a container or some dir in a
container.
+ ///
+ /// \pre container_client is the client for the container named like the
first
+ /// segment of select.base_dir.
+ Status GetFileInfoWithSelectorFromContainer(
+ const Azure::Storage::Blobs::BlobContainerClient& container_client,
+ const Azure::Core::Context& context, Azure::Nullable<int32_t>
page_size_hint,
+ const FileSelector& select, FileInfoVector* acc_results) {
+ ARROW_ASSIGN_OR_RAISE(auto base_location,
AzureLocation::FromString(select.base_dir));
+
+ bool found = false;
+ Azure::Storage::Blobs::ListBlobsOptions options;
+ if (internal::GetAbstractPathDepth(base_location.path) == 0) {
+ // If the base_dir is the root of the container, then we want to list
all blobs in
+ // the container and the Prefix should be empty and not even include the
trailing
+ // slash because the container itself represents the `<container>/`
directory.
+ options.Prefix = {};
+ found = true; // Unless the container itself is not found later!
+ } else {
+ options.Prefix = internal::EnsureTrailingSlash(base_location.path);
+ }
+ options.PageSizeHint = page_size_hint;
+ options.Include =
Azure::Storage::Blobs::Models::ListBlobsIncludeFlags::Metadata;
+
+ // When Prefix.Value() contains a trailing slash and we find a blob that
+ // matches it completely, it is an empty directory marker blob for the
+ // directory we're listing from, and we should skip it.
+ auto is_empty_dir_marker =
+ [&options](const Azure::Storage::Blobs::Models::BlobItem& blob)
noexcept -> bool {
+ return options.Prefix.HasValue() && blob.Name == options.Prefix.Value();
+ };
+
+ auto recurse = [&](const std::string& blob_prefix) noexcept -> Status {
+ if (select.recursive && select.max_recursion > 0) {
+ FileSelector sub_select;
+ sub_select.base_dir = base_location.container;
+ sub_select.base_dir += internal::kSep;
+ sub_select.base_dir += internal::RemoveTrailingSlash(blob_prefix);
+ sub_select.allow_not_found = true;
+ sub_select.recursive = true;
+ sub_select.max_recursion = select.max_recursion - 1;
+ return GetFileInfoWithSelectorFromContainer(
+ container_client, context, page_size_hint, sub_select,
acc_results);
+ }
+ return Status::OK();
+ };
+
+ // (*acc_results)[*last_dir_reported] is the last FileType::Directory in
the results
+ // produced through this loop over the response pages.
+ std::optional<size_t> last_dir_reported{};
+ auto matches_last_dir_reported = [&last_dir_reported,
+ acc_results](const FileInfo& info)
noexcept {
+ if (!last_dir_reported.has_value() || info.type() !=
FileType::Directory) {
+ return false;
+ }
+ const auto& last_dir = (*acc_results)[*last_dir_reported];
+ return BasenameView(info.path()) == BasenameView(last_dir.path());
+ };
+
+ auto process_blob =
+ [&](const Azure::Storage::Blobs::Models::BlobItem& blob) noexcept {
+ if (!is_empty_dir_marker(blob)) {
+ const auto& info = acc_results->emplace_back(
+ FileInfoFromBlob(base_location.container, blob));
+ if (info.type() == FileType::Directory) {
+ last_dir_reported = acc_results->size() - 1;
+ }
+ }
+ };
+ auto process_prefix = [&](const std::string& prefix) noexcept -> Status {
+ const std::string path = base_location.container + internal::kSep +
prefix;
+ const auto& info =
acc_results->emplace_back(DirectoryFileInfoFromPath(path));
+ if (ARROW_PREDICT_FALSE(matches_last_dir_reported(info))) {
+ acc_results->pop_back();
+ } else {
+ last_dir_reported = acc_results->size() - 1;
+ return recurse(prefix);
+ }
+ return Status::OK();
+ };
+
+ try {
+ auto list_response =
+ container_client.ListBlobsByHierarchy(/*delimiter=*/"/", options,
context);
+ for (; list_response.HasPage(); list_response.MoveToNextPage(context)) {
+ if (list_response.Blobs.empty() && list_response.BlobPrefixes.empty())
{
+ continue;
+ }
+ found = true;
+ // Blob and BlobPrefixes are sorted by name, so we can merge-iterate
+ // them to ensure returned results are all sorted.
+ size_t blob_index = 0;
+ size_t blob_prefix_index = 0;
+ while (blob_index < list_response.Blobs.size() &&
+ blob_prefix_index < list_response.BlobPrefixes.size()) {
+ const auto& blob = list_response.Blobs[blob_index];
+ const auto& prefix = list_response.BlobPrefixes[blob_prefix_index];
+ const int cmp = blob.Name.compare(prefix);
+ if (cmp < 0) {
+ process_blob(blob);
+ blob_index += 1;
+ } else if (cmp > 0) {
+ RETURN_NOT_OK(process_prefix(prefix));
+ blob_prefix_index += 1;
+ } else { // there is a blob (empty dir marker) and a prefix with
the same name
+ DCHECK_EQ(blob.Name, prefix);
+ RETURN_NOT_OK(process_prefix(prefix));
+ blob_index += 1;
+ blob_prefix_index += 1;
+ }
+ }
+ for (; blob_index < list_response.Blobs.size(); blob_index++) {
+ process_blob(list_response.Blobs[blob_index]);
+ }
+ for (; blob_prefix_index < list_response.BlobPrefixes.size();
+ blob_prefix_index++) {
+
RETURN_NOT_OK(process_prefix(list_response.BlobPrefixes[blob_prefix_index]));
+ }
+ }
+ } catch (const Azure::Storage::StorageException& exception) {
+ if (exception.ErrorCode == "ContainerNotFound") {
+ found = false;
+ } else {
+ return internal::ExceptionToStatus(
+ "Failed to list blobs in a directory: " + select.base_dir + ": " +
+ container_client.GetUrl(),
+ exception);
+ }
+ }
+
+ return found || select.allow_not_found
+ ? Status::OK()
+ : ::arrow::fs::internal::PathNotFound(select.base_dir);
+ }
+
+ public:
+ Status GetFileInfoWithSelector(const Azure::Core::Context& context,
+ Azure::Nullable<int32_t> page_size_hint,
+ const FileSelector& select,
+ FileInfoVector* acc_results) {
+ ARROW_ASSIGN_OR_RAISE(auto base_location,
AzureLocation::FromString(select.base_dir));
+
+ if (base_location.container.empty()) {
+ // Without a container, the base_location is equivalent to the filesystem
+ // root -- `/`. FileSelector::allow_not_found doesn't matter in this case
+ // because the root always exists.
+ auto on_container =
+ [&](const Azure::Storage::Blobs::Models::BlobContainerItem&
container) {
+ // Deleted containers are not listed by ListContainers.
+ DCHECK(!container.IsDeleted);
+
+ // Every container is considered a directory.
+ FileInfo info{container.Name, FileType::Directory};
+ info.set_mtime(
+
std::chrono::system_clock::time_point{container.Details.LastModified});
+ acc_results->push_back(std::move(info));
+
+ // Recurse into containers (subdirectories) if requested.
+ if (select.recursive && select.max_recursion > 0) {
+ FileSelector sub_select;
+ sub_select.base_dir = container.Name;
+ sub_select.allow_not_found = true;
+ sub_select.recursive = true;
+ sub_select.max_recursion = select.max_recursion - 1;
+ ARROW_RETURN_NOT_OK(GetFileInfoWithSelector(context,
page_size_hint,
+ sub_select,
acc_results));
+ }
+ return Status::OK();
+ };
+ return ListContainers(context, std::move(on_container));
Review Comment:
`page_size_hint` only applies to the pagination of blobs. If I were to pass
page size here, it would be a separate constant, but I thought this would be an
overkill.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]