This is an automated email from the ASF dual-hosted git repository.

felipecrv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ce54b6a2a GH-38704: [C++] Implement Azure FileSystem Move() via Azure 
DataLake Storage Gen 2 API (#39904)
0ce54b6a2a is described below

commit 0ce54b6a2ad7ee5dc48dc31eae4f10cb4cd2915d
Author: Felipe Oliveira Carvalho <felipe...@gmail.com>
AuthorDate: Fri Feb 9 23:14:20 2024 -0300

    GH-38704: [C++] Implement Azure FileSystem Move() via Azure DataLake 
Storage Gen 2 API (#39904)
    
    ### Rationale for this change
    
    We need to move directories and files via the `arrow::FileSystem` interface.
    
    ### What changes are included in this PR?
    
     - A few filesystem error reporting improvements
     - A helper class to deal with Azure Storage leases [1]
     - The `Move()` implementation that can move files and directories within 
the same container on storage accounts with Hierarchical Namespace Support 
enabled
     - Lots of tests
    
    [1]: https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob
    
    ### Are these changes tested?
    
    Yes, by existing and a huge number of tests added by this PR. The test code 
introduced here should be extracted to a reusable test module that we can use 
to test move in other file system implementations.
    
    ### Are there any user-facing changes?
    
    No breaking changes, only new functionality.
    * Closes: #38704
    
    Authored-by: Felipe Oliveira Carvalho <felipe...@gmail.com>
    Signed-off-by: Felipe Oliveira Carvalho <felipe...@gmail.com>
---
 cpp/src/arrow/filesystem/azurefs.cc       | 725 ++++++++++++++++++++++++++++--
 cpp/src/arrow/filesystem/azurefs.h        |  19 +
 cpp/src/arrow/filesystem/azurefs_test.cc  | 499 +++++++++++++++++++-
 cpp/src/arrow/filesystem/util_internal.cc |  10 +
 cpp/src/arrow/filesystem/util_internal.h  |   6 +
 cpp/src/arrow/util/io_util.cc             |   7 +
 cpp/src/arrow/util/io_util.h              |   3 +
 7 files changed, 1216 insertions(+), 53 deletions(-)

diff --git a/cpp/src/arrow/filesystem/azurefs.cc 
b/cpp/src/arrow/filesystem/azurefs.cc
index 87b9822878..d4bb445701 100644
--- a/cpp/src/arrow/filesystem/azurefs.cc
+++ b/cpp/src/arrow/filesystem/azurefs.cc
@@ -15,6 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <atomic>
+#include <chrono>
+#include <memory>
+#include <optional>
+
 #include "arrow/filesystem/azurefs.h"
 #include "arrow/filesystem/azurefs_internal.h"
 
@@ -283,18 +288,26 @@ struct AzureLocation {
 template <typename... PrefixArgs>
 Status ExceptionToStatus(const Storage::StorageException& exception,
                          PrefixArgs&&... prefix_args) {
-  return Status::IOError(std::forward<PrefixArgs>(prefix_args)...,
-                         " Azure Error: ", exception.what());
+  return Status::IOError(std::forward<PrefixArgs>(prefix_args)..., " Azure 
Error: [",
+                         exception.ErrorCode, "] ", exception.what());
 }
 
 Status PathNotFound(const AzureLocation& location) {
   return ::arrow::fs::internal::PathNotFound(location.all);
 }
 
+Status NotADir(const AzureLocation& location) {
+  return ::arrow::fs::internal::NotADir(location.all);
+}
+
 Status NotAFile(const AzureLocation& location) {
   return ::arrow::fs::internal::NotAFile(location.all);
 }
 
+Status NotEmpty(const AzureLocation& location) {
+  return ::arrow::fs::internal::NotEmpty(location.all);
+}
+
 Status ValidateFileLocation(const AzureLocation& location) {
   if (location.container.empty()) {
     return PathNotFound(location);
@@ -305,6 +318,23 @@ Status ValidateFileLocation(const AzureLocation& location) 
{
   return internal::AssertNoTrailingSlash(location.path);
 }
 
+Status InvalidDirMoveToSubdir(const AzureLocation& src, const AzureLocation& 
dest) {
+  return Status::Invalid("Cannot Move to '", dest.all, "' and make '", src.all,
+                         "' a sub-directory of itself.");
+}
+
+Status DestinationParentPathNotFound(const AzureLocation& dest) {
+  return Status::IOError("The parent directory of the destination path '", 
dest.all,
+                         "' does not exist.");
+}
+
+Status CrossContainerMoveNotImplemented(const AzureLocation& src,
+                                        const AzureLocation& dest) {
+  return Status::NotImplemented(
+      "Move of '", src.all, "' to '", dest.all,
+      "' requires moving data between containers, which is not implemented.");
+}
+
 bool IsContainerNotFound(const Storage::StorageException& e) {
   // In some situations, only the ReasonPhrase is set and the
   // ErrorCode is empty, so we check both.
@@ -942,6 +972,185 @@ FileInfo FileInfoFromBlob(std::string_view container,
   return info;
 }
 
+/// \brief RAII-style guard for releasing a lease on a blob or container.
+///
+/// The guard should be constructed right after a successful 
BlobLeaseClient::Acquire()
+/// call. Use std::optional<LeaseGuard> to declare a guard in outer scope and 
construct it
+/// later with std::optional::emplace(...).
+///
+/// Leases expire automatically, but explicit release means concurrent clients 
or
+/// ourselves when trying new operations on the same blob or container don't 
have
+/// to wait for the lease to expire by itself.
+///
+/// Learn more about leases at
+/// https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob
+class LeaseGuard {
+ public:
+  using SteadyClock = std::chrono::steady_clock;
+
+ private:
+  /// \brief The time when the lease expires or is broken.
+  ///
+  /// The lease is not guaranteed to be valid until this time, but it is 
guaranteed to
+  /// be expired after this time. In other words, this is an overestimation of
+  /// the true time_point.
+  SteadyClock::time_point break_or_expires_at_;
+  const std::unique_ptr<Blobs::BlobLeaseClient> lease_client_;
+  bool release_attempt_pending_ = true;
+
+  /// \brief The latest known expiry time of a lease guarded by this class
+  /// that failed to be released or was forgotten by calling Forget().
+  static std::atomic<SteadyClock::time_point> latest_known_expiry_time_;
+
+  /// \brief The maximum lease duration supported by Azure Storage.
+  static constexpr std::chrono::seconds kMaxLeaseDuration{60};
+
+ public:
+  LeaseGuard(std::unique_ptr<Blobs::BlobLeaseClient> lease_client,
+             std::chrono::seconds lease_duration)
+      : break_or_expires_at_(SteadyClock::now() +
+                             std::min(kMaxLeaseDuration, lease_duration)),
+        lease_client_(std::move(lease_client)) {
+    DCHECK(lease_duration <= kMaxLeaseDuration);
+    DCHECK(this->lease_client_);
+  }
+
+  ARROW_DISALLOW_COPY_AND_ASSIGN(LeaseGuard);
+
+  ~LeaseGuard() {
+    // No point in trying any error handling here other than the debug 
checking. The lease
+    // will eventually expire on the backend without any intervention from us 
(just much
+    // later than if we released it).
+    [[maybe_unused]] auto status = Release();
+    ARROW_LOG(DEBUG) << status;
+  }
+
+  bool PendingRelease() const {
+    return release_attempt_pending_ && SteadyClock::now() <= 
break_or_expires_at_;
+  }
+
+ private:
+  Status DoRelease() {
+    DCHECK(release_attempt_pending_);
+    try {
+      lease_client_->Release();
+    } catch (const Storage::StorageException& exception) {
+      return ExceptionToStatus(exception, "Failed to release the ",
+                               lease_client_->GetLeaseId(), " lease");
+    }
+    return Status::OK();
+  }
+
+ public:
+  std::string LeaseId() const { return lease_client_->GetLeaseId(); }
+
+  bool StillValidFor(SteadyClock::duration expected_time_left) const {
+    return SteadyClock::now() + expected_time_left < break_or_expires_at_;
+  }
+
+  /// \brief Break the lease.
+  ///
+  /// The lease will stay in the "Breaking" state for break_period seconds or
+  /// less if the lease is expiring before that.
+  ///
+  /// 
https://learn.microsoft.com/en-us/rest/api/storageservices/lease-container#outcomes-of-use-attempts-on-containers-by-lease-state
+  /// 
https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob#outcomes-of-use-attempts-on-blobs-by-lease-state
+  Status Break(Azure::Nullable<std::chrono::seconds> break_period = {}) {
+    auto remaining_time_ms = [this]() {
+      const auto remaining_time = break_or_expires_at_ - SteadyClock::now();
+      return 
std::chrono::duration_cast<std::chrono::milliseconds>(remaining_time);
+    };
+#ifndef NDEBUG
+    if (break_period.HasValue() && !StillValidFor(*break_period)) {
+      ARROW_LOG(WARNING)
+          << "Azure Storage: requested break_period ("
+          << break_period.ValueOr(std::chrono::seconds{0}).count()
+          << "s) is too long or lease duration is too short for all the 
operations "
+             "performed so far (lease expires in "
+          << remaining_time_ms().count() << "ms)";
+    }
+#endif
+    Blobs::BreakLeaseOptions options;
+    options.BreakPeriod = break_period;
+    try {
+      lease_client_->Break(options);
+      break_or_expires_at_ =
+          std::min(break_or_expires_at_,
+                   SteadyClock::now() + 
break_period.ValueOr(std::chrono::seconds{0}));
+    } catch (const Storage::StorageException& exception) {
+      return ExceptionToStatus(exception, "Failed to break the ",
+                               lease_client_->GetLeaseId(), " lease expiring 
in ",
+                               remaining_time_ms().count(), "ms");
+    }
+    return Status::OK();
+  }
+
+  /// \brief Break the lease before deleting or renaming the resource.
+  ///
+  /// Calling this is recommended when the resource for which the lease was 
acquired is
+  /// about to be deleted as there is no way of releasing the lease after 
that, we can
+  /// only forget about it. The break_period should be a conservative estimate 
of the time
+  /// it takes to delete/rename the resource.
+  ///
+  /// If break_period is too small, the delete/rename will fail with a lease 
conflict,
+  /// and if it's too large the only consequence is that a lease on a 
non-existent
+  /// resource will remain in the "Breaking" state for a while blocking others
+  /// from recreating the resource.
+  void BreakBeforeDeletion(std::chrono::seconds break_period) {
+    ARROW_CHECK_OK(Break(break_period));
+  }
+
+  // These functions are marked ARROW_NOINLINE because they are called from
+  // multiple locations, but are not performance-critical.
+
+  ARROW_NOINLINE Status Release() {
+    if (!PendingRelease()) {
+      return Status::OK();
+    }
+    auto status = DoRelease();
+    if (!status.ok()) {
+      Forget();
+      return status;
+    }
+    release_attempt_pending_ = false;
+    return Status::OK();
+  }
+
+  /// \brief Prevent any release attempts in the destructor.
+  ///
+  /// When it's known they would certainly fail.
+  /// \see LeaseGuard::BreakBeforeDeletion()
+  ARROW_NOINLINE void Forget() {
+    if (!PendingRelease()) {
+      release_attempt_pending_ = false;
+      return;
+    }
+    release_attempt_pending_ = false;
+    // Remember the latest known expiry time so we can gracefully handle lease
+    // acquisition failures by waiting until the latest forgotten lease.
+    auto latest = latest_known_expiry_time_.load(std::memory_order_relaxed);
+    while (
+        latest < break_or_expires_at_ &&
+        !latest_known_expiry_time_.compare_exchange_weak(latest, 
break_or_expires_at_)) {
+    }
+    DCHECK_GE(latest_known_expiry_time_.load(), break_or_expires_at_);
+  }
+
+  ARROW_NOINLINE static void WaitUntilLatestKnownExpiryTime() {
+    auto remaining_time = latest_known_expiry_time_.load() - 
SteadyClock::now();
+#ifndef NDEBUG
+    int64_t remaining_time_ms =
+        
std::chrono::duration_cast<std::chrono::milliseconds>(remaining_time).count();
+    ARROW_LOG(WARNING) << "LeaseGuard::WaitUntilLatestKnownExpiryTime for "
+                       << remaining_time_ms << "ms...";
+#endif
+    DCHECK(remaining_time <= kMaxLeaseDuration);
+    if (remaining_time > SteadyClock::duration::zero()) {
+      std::this_thread::sleep_for(remaining_time);
+    }
+  }
+};
+
 }  // namespace
 
 class AzureFileSystem::Impl {
@@ -975,6 +1184,11 @@ class AzureFileSystem::Impl {
     return blob_service_client_->GetBlobContainerClient(container_name);
   }
 
+  Blobs::BlobClient GetBlobClient(const std::string& container_name,
+                                  const std::string& blob_name) {
+    return GetBlobContainerClient(container_name).GetBlobClient(blob_name);
+  }
+
   /// \param container_name Also known as "filesystem" in the ADLS Gen2 API.
   DataLake::DataLakeFileSystemClient GetFileSystemClient(
       const std::string& container_name) {
@@ -1030,11 +1244,14 @@ class AzureFileSystem::Impl {
 
   /// \pre location.path is not empty.
   Result<FileInfo> GetFileInfo(const DataLake::DataLakeFileSystemClient& 
adlfs_client,
-                               const AzureLocation& location) {
+                               const AzureLocation& location,
+                               Azure::Nullable<std::string> lease_id = {}) {
     auto file_client = adlfs_client.GetFileClient(location.path);
+    DataLake::GetPathPropertiesOptions options;
+    options.AccessConditions.LeaseId = std::move(lease_id);
     try {
       FileInfo info{location.all};
-      auto properties = file_client.GetProperties();
+      auto properties = file_client.GetProperties(options);
       if (properties.Value.IsDirectory) {
         info.set_type(FileType::Directory);
       } else if (internal::HasTrailingSlash(location.path)) {
@@ -1115,6 +1332,22 @@ class AzureFileSystem::Impl {
     }
   }
 
+  Result<FileInfo> GetFileInfoOfPathWithinContainer(const AzureLocation& 
location) {
+    DCHECK(!location.container.empty() && !location.path.empty());
+    // There is a path to search within the container. Check HNS support to 
proceed.
+    auto adlfs_client = GetFileSystemClient(location.container);
+    ARROW_ASSIGN_OR_RAISE(auto hns_support, 
HierarchicalNamespaceSupport(adlfs_client));
+    if (hns_support == HNSSupport::kContainerNotFound) {
+      return FileInfo{location.all, FileType::NotFound};
+    }
+    if (hns_support == HNSSupport::kEnabled) {
+      return GetFileInfo(adlfs_client, location);
+    }
+    DCHECK_EQ(hns_support, HNSSupport::kDisabled);
+    auto container_client = GetBlobContainerClient(location.container);
+    return GetFileInfo(container_client, location);
+  }
+
  private:
   /// \pref location.container is not empty.
   template <typename ContainerClient>
@@ -1320,8 +1553,7 @@ class AzureFileSystem::Impl {
                                                          AzureFileSystem* fs) {
     RETURN_NOT_OK(ValidateFileLocation(location));
     auto blob_client = std::make_shared<Blobs::BlobClient>(
-        blob_service_client_->GetBlobContainerClient(location.container)
-            .GetBlobClient(location.path));
+        GetBlobClient(location.container, location.path));
 
     auto ptr = std::make_shared<ObjectInputFile>(blob_client, fs->io_context(),
                                                  std::move(location));
@@ -1340,8 +1572,7 @@ class AzureFileSystem::Impl {
     ARROW_ASSIGN_OR_RAISE(auto location, 
AzureLocation::FromString(info.path()));
     RETURN_NOT_OK(ValidateFileLocation(location));
     auto blob_client = std::make_shared<Blobs::BlobClient>(
-        blob_service_client_->GetBlobContainerClient(location.container)
-            .GetBlobClient(location.path));
+        GetBlobClient(location.container, location.path));
 
     auto ptr = std::make_shared<ObjectInputFile>(blob_client, fs->io_context(),
                                                  std::move(location), 
info.size());
@@ -1625,13 +1856,16 @@ class AzureFileSystem::Impl {
   /// \pre location.path is not empty.
   Status DeleteDirOnFileSystem(const DataLake::DataLakeFileSystemClient& 
adlfs_client,
                                const AzureLocation& location, bool recursive,
-                               bool require_dir_to_exist) {
+                               bool require_dir_to_exist,
+                               Azure::Nullable<std::string> lease_id = {}) {
     DCHECK(!location.container.empty());
     DCHECK(!location.path.empty());
     auto directory_client = adlfs_client.GetDirectoryClient(location.path);
+    DataLake::DeleteDirectoryOptions options;
+    options.AccessConditions.LeaseId = std::move(lease_id);
     try {
-      auto response =
-          recursive ? directory_client.DeleteRecursive() : 
directory_client.DeleteEmpty();
+      auto response = recursive ? directory_client.DeleteRecursive(options)
+                                : directory_client.DeleteEmpty(options);
       // Only the "*IfExists" functions ever set Deleted to false.
       // All the others either succeed or throw an exception.
       DCHECK(response.Value.Deleted);
@@ -1710,17 +1944,440 @@ class AzureFileSystem::Impl {
     return Status::OK();
   }
 
+ private:
+  /// \brief Create a BlobLeaseClient and acquire a lease on the container.
+  ///
+  /// \param allow_missing_container if true, a nullptr may be returned when 
the container
+  /// doesn't exist, otherwise a PathNotFound(location) error is produced 
right away
+  /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and
+  /// optional (nullptr denotes container not found)
+  Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireContainerLease(
+      const AzureLocation& location, std::chrono::seconds lease_duration,
+      bool allow_missing_container = false, bool retry_allowed = true) {
+    DCHECK(!location.container.empty());
+    auto container_client = GetBlobContainerClient(location.container);
+    auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId();
+    auto container_url = container_client.GetUrl();
+    auto lease_client = std::make_unique<Blobs::BlobLeaseClient>(
+        std::move(container_client), std::move(lease_id));
+    try {
+      [[maybe_unused]] auto result = lease_client->Acquire(lease_duration);
+      DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId());
+    } catch (const Storage::StorageException& exception) {
+      if (IsContainerNotFound(exception)) {
+        if (allow_missing_container) {
+          return nullptr;
+        }
+        return PathNotFound(location);
+      } else if (exception.StatusCode == Http::HttpStatusCode::Conflict &&
+                 exception.ErrorCode == "LeaseAlreadyPresent") {
+        if (retry_allowed) {
+          LeaseGuard::WaitUntilLatestKnownExpiryTime();
+          return AcquireContainerLease(location, lease_duration, 
allow_missing_container,
+                                       /*retry_allowed=*/false);
+        }
+      }
+      return ExceptionToStatus(exception, "Failed to acquire a lease on 
container '",
+                               location.container, "': ", container_url);
+    }
+    return lease_client;
+  }
+
+  /// \brief Create a BlobLeaseClient and acquire a lease on a blob/file (or
+  /// directory if Hierarchical Namespace is supported).
+  ///
+  /// \param allow_missing if true, a nullptr may be returned when the blob
+  /// doesn't exist, otherwise a PathNotFound(location) error is produced 
right away
+  /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and
+  /// optional (nullptr denotes blob not found)
+  Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireBlobLease(
+      const AzureLocation& location, std::chrono::seconds lease_duration,
+      bool allow_missing = false, bool retry_allowed = true) {
+    DCHECK(!location.container.empty() && !location.path.empty());
+    auto path = std::string{internal::RemoveTrailingSlash(location.path)};
+    auto blob_client = GetBlobClient(location.container, std::move(path));
+    auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId();
+    auto blob_url = blob_client.GetUrl();
+    auto lease_client = 
std::make_unique<Blobs::BlobLeaseClient>(std::move(blob_client),
+                                                                 
std::move(lease_id));
+    try {
+      [[maybe_unused]] auto result = lease_client->Acquire(lease_duration);
+      DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId());
+    } catch (const Storage::StorageException& exception) {
+      if (exception.StatusCode == Http::HttpStatusCode::NotFound) {
+        if (allow_missing) {
+          return nullptr;
+        }
+        return PathNotFound(location);
+      } else if (exception.StatusCode == Http::HttpStatusCode::Conflict &&
+                 exception.ErrorCode == "LeaseAlreadyPresent") {
+        if (retry_allowed) {
+          LeaseGuard::WaitUntilLatestKnownExpiryTime();
+          return AcquireBlobLease(location, lease_duration, allow_missing,
+                                  /*retry_allowed=*/false);
+        }
+      }
+      return ExceptionToStatus(exception, "Failed to acquire a lease on file 
'",
+                               location.all, "': ", blob_url);
+    }
+    return lease_client;
+  }
+
+  /// \brief The default lease duration used for acquiring a lease on a 
container or blob.
+  ///
+  /// Azure Storage leases can be acquired for a duration of 15 to 60 seconds.
+  ///
+  /// Operations consisting of an unpredictable number of sub-operations should
+  /// renew the lease periodically (heartbeat pattern) instead of acquiring an
+  /// infinite lease (very bad idea for a library like Arrow).
+  static constexpr auto kLeaseDuration = std::chrono::seconds{15};
+
+  // These are conservative estimates of how long it takes for the client
+  // request to reach the server counting from the moment the Azure SDK 
function
+  // issuing the request is called. See their usage for more context.
+  //
+  // If the client connection to the server is unpredictably slow, operations
+  // may fail, but due to the use of leases, the entire arrow::FileSystem
+  // operation can be retried without risk of data loss. Thus, unexpected 
network
+  // slow downs can be fixed with retries (either by some system using Arrow or
+  // an user interactively retrying a failed operation).
+  //
+  // If a network is permanently slow, the lease time and these numbers should 
be
+  // increased but not so much so that the client gives up an operation 
because the
+  // values say it takes more time to reach the server than the remaining lease
+  // time on the resources.
+  //
+  // NOTE: The initial constant values were chosen conservatively. If we learn,
+  // from experience, that they are causing issues, we can increase them. And 
if
+  // broadly applicable values aren't possible, we can make them configurable.
+  static constexpr auto kTimeNeededForContainerDeletion = 
std::chrono::seconds{3};
+  static constexpr auto kTimeNeededForContainerRename = 
std::chrono::seconds{3};
+  static constexpr auto kTimeNeededForFileOrDirectoryRename = 
std::chrono::seconds{3};
+
+ public:
+  /// The conditions for a successful container rename are derived from the
+  /// conditions for a successful `Move("/$src.container", 
"/$dest.container")`.
+  /// The numbers here match the list in `Move`.
+  ///
+  /// 1. `src.container` must exist.
+  /// 2. If `src.container` and `dest.container` are the same, do nothing and
+  ///    return OK.
+  /// 3. N/A.
+  /// 4. N/A.
+  /// 5. `dest.container` doesn't exist or is empty.
+  ///    NOTE: one exception related to container Move is that when the
+  ///    destination is empty we also require the source container to be empty,
+  ///    because the only way to perform the "Move" is by deleting the source
+  ///    instead of deleting the destination and renaming the source.
+  Status RenameContainer(const AzureLocation& src, const AzureLocation& dest) {
+    DCHECK(!src.container.empty() && src.path.empty());
+    DCHECK(!dest.container.empty() && dest.path.empty());
+    auto src_container_client = GetBlobContainerClient(src.container);
+
+    // If src and dest are the same, we only have to check src exists.
+    if (src.container == dest.container) {
+      ARROW_ASSIGN_OR_RAISE(auto info,
+                            GetContainerPropsAsFileInfo(src, 
src_container_client));
+      if (info.type() == FileType::NotFound) {
+        return PathNotFound(src);
+      }
+      DCHECK(info.type() == FileType::Directory);
+      return Status::OK();
+    }
+
+    // Acquire a lease on the source container because (1) we need the lease
+    // before rename and (2) it works as a way of checking the container 
exists.
+    ARROW_ASSIGN_OR_RAISE(auto src_lease_client,
+                          AcquireContainerLease(src, kLeaseDuration));
+    LeaseGuard src_lease_guard{std::move(src_lease_client), kLeaseDuration};
+    // Check dest.container doesn't exist or is empty.
+    auto dest_container_client = GetBlobContainerClient(dest.container);
+    std::optional<LeaseGuard> dest_lease_guard;
+    bool dest_exists = false;
+    bool dest_is_empty = false;
+    ARROW_ASSIGN_OR_RAISE(
+        auto dest_lease_client,
+        AcquireContainerLease(dest, kLeaseDuration, 
/*allow_missing_container*/ true));
+    if (dest_lease_client) {
+      dest_lease_guard.emplace(std::move(dest_lease_client), kLeaseDuration);
+      dest_exists = true;
+      // Emptiness check after successful acquisition of the lease.
+      Blobs::ListBlobsOptions list_blobs_options;
+      list_blobs_options.PageSizeHint = 1;
+      try {
+        auto dest_list_response = 
dest_container_client.ListBlobs(list_blobs_options);
+        dest_is_empty = dest_list_response.Blobs.empty();
+        if (!dest_is_empty) {
+          return NotEmpty(dest);
+        }
+      } catch (const Storage::StorageException& exception) {
+        return ExceptionToStatus(exception, "Failed to check that '", 
dest.container,
+                                 "' is empty: ", 
dest_container_client.GetUrl());
+      }
+    }
+    DCHECK(!dest_exists || dest_is_empty);
+
+    if (!dest_exists) {
+      // Rename the source container.
+      Blobs::RenameBlobContainerOptions options;
+      options.SourceAccessConditions.LeaseId = src_lease_guard.LeaseId();
+      try {
+        src_lease_guard.BreakBeforeDeletion(kTimeNeededForContainerRename);
+        blob_service_client_->RenameBlobContainer(src.container, 
dest.container, options);
+        src_lease_guard.Forget();
+      } catch (const Storage::StorageException& exception) {
+        if (exception.StatusCode == Http::HttpStatusCode::BadRequest &&
+            exception.ErrorCode == "InvalidQueryParameterValue") {
+          auto param_name = 
exception.AdditionalInformation.find("QueryParameterName");
+          if (param_name != exception.AdditionalInformation.end() &&
+              param_name->second == "comp") {
+            return ExceptionToStatus(
+                exception, "The 'rename' operation is not supported on 
containers. ",
+                "Attempting a rename of '", src.container, "' to '", 
dest.container,
+                "': ", blob_service_client_->GetUrl());
+          }
+        }
+        return ExceptionToStatus(exception, "Failed to rename container '", 
src.container,
+                                 "' to '", dest.container,
+                                 "': ", blob_service_client_->GetUrl());
+      }
+    } else if (dest_is_empty) {
+      // Even if we deleted the empty dest.container, RenameBlobContainer() 
would still
+      // fail because containers are not immediately deleted by the service -- 
they are
+      // deleted asynchronously based on retention policies and 
non-deterministic behavior
+      // of the garbage collection process.
+      //
+      // One way to still match POSIX rename semantics is to delete the 
src.container if
+      // and only if it's empty because the final state would be equivalent to 
replacing
+      // the dest.container with the src.container and its contents (which 
happens
+      // to also be empty).
+      Blobs::ListBlobsOptions list_blobs_options;
+      list_blobs_options.PageSizeHint = 1;
+      try {
+        auto src_list_response = 
src_container_client.ListBlobs(list_blobs_options);
+        if (!src_list_response.Blobs.empty()) {
+          // Reminder: dest is used here because we're semantically replacing 
dest
+          // with src. By deleting src if it's empty just like dest.
+          return Status::IOError("Unable to replace empty container: '", 
dest.all, "'");
+        }
+        // Delete the source container now that we know it's empty.
+        Blobs::DeleteBlobContainerOptions options;
+        options.AccessConditions.LeaseId = src_lease_guard.LeaseId();
+        DCHECK(dest_lease_guard.has_value());
+        // Make sure lease on dest is still valid before deleting src. This is 
to ensure
+        // the destination container is not deleted by another process/client 
before
+        // Move() returns.
+        if (!dest_lease_guard->StillValidFor(kTimeNeededForContainerDeletion)) 
{
+          return Status::IOError("Unable to replace empty container: '", 
dest.all, "'. ",
+                                 "Preparation for the operation took too long 
and a "
+                                 "container lease expired.");
+        }
+        try {
+          src_lease_guard.BreakBeforeDeletion(kTimeNeededForContainerDeletion);
+          src_container_client.Delete(options);
+          src_lease_guard.Forget();
+        } catch (const Storage::StorageException& exception) {
+          return ExceptionToStatus(exception, "Failed to delete empty 
container: '",
+                                   src.container, "': ", 
src_container_client.GetUrl());
+        }
+      } catch (const Storage::StorageException& exception) {
+        return ExceptionToStatus(exception, "Unable to replace empty 
container: '",
+                                 dest.all, "': ", 
dest_container_client.GetUrl());
+      }
+    }
+    return Status::OK();
+  }
+
+  Status MoveContainerToPath(const AzureLocation& src, const AzureLocation& 
dest) {
+    DCHECK(!src.container.empty() && src.path.empty());
+    DCHECK(!dest.container.empty() && !dest.path.empty());
+    // Check Move pre-condition 1 -- `src` must exist.
+    auto container_client = GetBlobContainerClient(src.container);
+    ARROW_ASSIGN_OR_RAISE(auto src_info,
+                          GetContainerPropsAsFileInfo(src, container_client));
+    if (src_info.type() == FileType::NotFound) {
+      return PathNotFound(src);
+    }
+    // Check Move pre-condition 2.
+    if (src.container == dest.container) {
+      return InvalidDirMoveToSubdir(src, dest);
+    }
+    // Instead of checking more pre-conditions, we just abort with a
+    // NotImplemented status.
+    return CrossContainerMoveNotImplemented(src, dest);
+  }
+
+  Status CreateContainerFromPath(const AzureLocation& src, const 
AzureLocation& dest) {
+    DCHECK(!src.container.empty() && !src.path.empty());
+    DCHECK(!dest.empty() && dest.path.empty());
+    ARROW_ASSIGN_OR_RAISE(auto src_file_info, 
GetFileInfoOfPathWithinContainer(src));
+    switch (src_file_info.type()) {
+      case FileType::Unknown:
+      case FileType::NotFound:
+        return PathNotFound(src);
+      case FileType::File:
+        return Status::Invalid(
+            "Creating files at '/' is not possible, only directories.");
+      case FileType::Directory:
+        break;
+    }
+    if (src.container == dest.container) {
+      return InvalidDirMoveToSubdir(src, dest);
+    }
+    return CrossContainerMoveNotImplemented(src, dest);
+  }
+
+  Status MovePathWithDataLakeAPI(
+      const DataLake::DataLakeFileSystemClient& src_adlfs_client,
+      const AzureLocation& src, const AzureLocation& dest) {
+    DCHECK(!src.container.empty() && !src.path.empty());
+    DCHECK(!dest.container.empty() && !dest.path.empty());
+    const auto src_path = std::string{internal::RemoveTrailingSlash(src.path)};
+    const auto dest_path = 
std::string{internal::RemoveTrailingSlash(dest.path)};
+
+    // Ensure that src exists and, if path has a trailing slash, that it's a 
directory.
+    ARROW_ASSIGN_OR_RAISE(auto src_lease_client, AcquireBlobLease(src, 
kLeaseDuration));
+    LeaseGuard src_lease_guard{std::move(src_lease_client), kLeaseDuration};
+    // It might be necessary to check src is a directory 0-3 times in this 
function,
+    // so we use a lazy evaluation function to avoid redundant calls to 
GetFileInfo().
+    std::optional<bool> src_is_dir_opt{};
+    auto src_is_dir_lazy = [&]() -> Result<bool> {
+      if (src_is_dir_opt.has_value()) {
+        return *src_is_dir_opt;
+      }
+      ARROW_ASSIGN_OR_RAISE(
+          auto src_info, GetFileInfo(src_adlfs_client, src, 
src_lease_guard.LeaseId()));
+      src_is_dir_opt = src_info.type() == FileType::Directory;
+      return *src_is_dir_opt;
+    };
+    // src must be a directory if it has a trailing slash.
+    if (internal::HasTrailingSlash(src.path)) {
+      ARROW_ASSIGN_OR_RAISE(auto src_is_dir, src_is_dir_lazy());
+      if (!src_is_dir) {
+        return NotADir(src);
+      }
+    }
+    // The Azure SDK and the backend don't perform many important checks, so 
we have to
+    // do them ourselves. Additionally, based on many tests on a 
default-configuration
+    // storage account, if the destination is an empty directory, the rename 
operation
+    // will most likely fail due to a timeout. Providing both leases -- to 
source and
+    // destination -- seems to have made things work.
+    ARROW_ASSIGN_OR_RAISE(auto dest_lease_client,
+                          AcquireBlobLease(dest, kLeaseDuration, 
/*allow_missing=*/true));
+    std::optional<LeaseGuard> dest_lease_guard;
+    if (dest_lease_client) {
+      dest_lease_guard.emplace(std::move(dest_lease_client), kLeaseDuration);
+      // Perform all the checks on dest (and src) before proceeding with the 
rename.
+      auto dest_adlfs_client = GetFileSystemClient(dest.container);
+      ARROW_ASSIGN_OR_RAISE(auto dest_info, GetFileInfo(dest_adlfs_client, 
dest,
+                                                        
dest_lease_guard->LeaseId()));
+      if (dest_info.type() == FileType::Directory) {
+        ARROW_ASSIGN_OR_RAISE(auto src_is_dir, src_is_dir_lazy());
+        if (!src_is_dir) {
+          // If src is a regular file, complain that dest is a directory
+          // like POSIX rename() does.
+          return internal::IsADir(dest.all);
+        }
+      } else {
+        if (internal::HasTrailingSlash(dest.path)) {
+          return NotADir(dest);
+        }
+      }
+    } else {
+      // If the destination has trailing slash, we would have to check that 
it's a
+      // directory, but since it doesn't exist we must return PathNotFound...
+      if (internal::HasTrailingSlash(dest.path)) {
+        // ...unless the src is a directory, in which case we can proceed with 
the rename.
+        ARROW_ASSIGN_OR_RAISE(auto src_is_dir, src_is_dir_lazy());
+        if (!src_is_dir) {
+          return PathNotFound(dest);
+        }
+      }
+    }
+
+    try {
+      // NOTE: The Azure SDK provides a RenameDirectory() function, but the
+      // implementation is the same as RenameFile() with the only difference 
being
+      // the type of the returned object (DataLakeDirectoryClient vs 
DataLakeFileClient).
+      //
+      // If we call RenameDirectory() and the source is a file, no error would
+      // be returned and the file would be renamed just fine (!).
+      //
+      // Since we don't use the returned object, we can just use RenameFile() 
for both
+      // files and directories. Ideally, the SDK would simply expose the 
PathClient
+      // that it uses internally for both files and directories.
+      DataLake::RenameFileOptions options;
+      options.DestinationFileSystem = dest.container;
+      options.SourceAccessConditions.LeaseId = src_lease_guard.LeaseId();
+      if (dest_lease_guard.has_value()) {
+        options.AccessConditions.LeaseId = dest_lease_guard->LeaseId();
+      }
+      src_lease_guard.BreakBeforeDeletion(kTimeNeededForFileOrDirectoryRename);
+      src_adlfs_client.RenameFile(src_path, dest_path, options);
+      src_lease_guard.Forget();
+    } catch (const Storage::StorageException& exception) {
+      // 
https://learn.microsoft.com/en-gb/rest/api/storageservices/datalakestoragegen2/path/create
+      if (exception.StatusCode == Http::HttpStatusCode::NotFound) {
+        if (exception.ErrorCode == "PathNotFound") {
+          return PathNotFound(src);
+        }
+        // "FilesystemNotFound" could be triggered by the source or 
destination filesystem
+        // not existing, but since we already checked the source filesystem 
exists (and
+        // hold a lease to it), we can assume the destination filesystem is 
the one the
+        // doesn't exist.
+        if (exception.ErrorCode == "FilesystemNotFound" ||
+            exception.ErrorCode == "RenameDestinationParentPathNotFound") {
+          return DestinationParentPathNotFound(dest);
+        }
+      } else if (exception.StatusCode == Http::HttpStatusCode::Conflict &&
+                 exception.ErrorCode == "PathAlreadyExists") {
+        // "PathAlreadyExists" is only produced when the destination exists 
and is a
+        // non-empty directory, so we produce the appropriate error.
+        return NotEmpty(dest);
+      }
+      return ExceptionToStatus(exception, "Failed to rename '", src.all, "' to 
'",
+                               dest.all, "': ", src_adlfs_client.GetUrl());
+    }
+    return Status::OK();
+  }
+
+  Status MovePathUsingBlobsAPI(const AzureLocation& src, const AzureLocation& 
dest) {
+    DCHECK(!src.container.empty() && !src.path.empty());
+    DCHECK(!dest.container.empty() && !dest.path.empty());
+    if (src.container != dest.container) {
+      ARROW_ASSIGN_OR_RAISE(auto src_file_info, 
GetFileInfoOfPathWithinContainer(src));
+      if (src_file_info.type() == FileType::NotFound) {
+        return PathNotFound(src);
+      }
+      return CrossContainerMoveNotImplemented(src, dest);
+    }
+    return Status::NotImplemented("The Azure FileSystem is not fully 
implemented");
+  }
+
+  Status MovePath(const AzureLocation& src, const AzureLocation& dest) {
+    DCHECK(!src.container.empty() && !src.path.empty());
+    DCHECK(!dest.container.empty() && !dest.path.empty());
+    auto src_adlfs_client = GetFileSystemClient(src.container);
+    ARROW_ASSIGN_OR_RAISE(auto hns_support,
+                          HierarchicalNamespaceSupport(src_adlfs_client));
+    if (hns_support == HNSSupport::kContainerNotFound) {
+      return PathNotFound(src);
+    }
+    if (hns_support == HNSSupport::kEnabled) {
+      return MovePathWithDataLakeAPI(src_adlfs_client, src, dest);
+    }
+    DCHECK_EQ(hns_support, HNSSupport::kDisabled);
+    return MovePathUsingBlobsAPI(src, dest);
+  }
+
   Status CopyFile(const AzureLocation& src, const AzureLocation& dest) {
     RETURN_NOT_OK(ValidateFileLocation(src));
     RETURN_NOT_OK(ValidateFileLocation(dest));
     if (src == dest) {
       return Status::OK();
     }
-    auto dest_blob_client = 
blob_service_client_->GetBlobContainerClient(dest.container)
-                                .GetBlobClient(dest.path);
-    auto src_url = blob_service_client_->GetBlobContainerClient(src.container)
-                       .GetBlobClient(src.path)
-                       .GetUrl();
+    auto dest_blob_client = GetBlobClient(dest.container, dest.path);
+    auto src_url = GetBlobClient(src.container, src.path).GetUrl();
     try {
       dest_blob_client.CopyFromUri(src_url);
     } catch (const Storage::StorageException& exception) {
@@ -1731,6 +2388,9 @@ class AzureFileSystem::Impl {
   }
 };
 
+std::atomic<LeaseGuard::SteadyClock::time_point> 
LeaseGuard::latest_known_expiry_time_ =
+    SteadyClock::time_point{SteadyClock::duration::zero()};
+
 AzureFileSystem::AzureFileSystem(std::unique_ptr<Impl>&& impl)
     : FileSystem(impl->io_context()), impl_(std::move(impl)) {
   default_async_is_sync_ = false;
@@ -1772,19 +2432,7 @@ Result<FileInfo> AzureFileSystem::GetFileInfo(const 
std::string& path) {
     auto container_client = impl_->GetBlobContainerClient(location.container);
     return GetContainerPropsAsFileInfo(location, container_client);
   }
-  // There is a path to search within the container. Check HNS support to 
proceed.
-  auto adlfs_client = impl_->GetFileSystemClient(location.container);
-  ARROW_ASSIGN_OR_RAISE(auto hns_support,
-                        impl_->HierarchicalNamespaceSupport(adlfs_client));
-  if (hns_support == HNSSupport::kContainerNotFound) {
-    return FileInfo{location.all, FileType::NotFound};
-  }
-  if (hns_support == HNSSupport::kEnabled) {
-    return impl_->GetFileInfo(adlfs_client, location);
-  }
-  DCHECK_EQ(hns_support, HNSSupport::kDisabled);
-  auto container_client = impl_->GetBlobContainerClient(location.container);
-  return impl_->GetFileInfo(container_client, location);
+  return impl_->GetFileInfoOfPathWithinContainer(location);
 }
 
 Result<FileInfoVector> AzureFileSystem::GetFileInfo(const FileSelector& 
select) {
@@ -1900,7 +2548,24 @@ Status AzureFileSystem::DeleteFile(const std::string& 
path) {
 }
 
 Status AzureFileSystem::Move(const std::string& src, const std::string& dest) {
-  return Status::NotImplemented("The Azure FileSystem is not fully 
implemented");
+  ARROW_ASSIGN_OR_RAISE(auto src_location, AzureLocation::FromString(src));
+  ARROW_ASSIGN_OR_RAISE(auto dest_location, AzureLocation::FromString(dest));
+  if (src_location.container.empty()) {
+    return Status::Invalid("Move requires a non-empty source path.");
+  }
+  if (dest_location.container.empty()) {
+    return Status::Invalid("Move requires a non-empty destination path.");
+  }
+  if (src_location.path.empty()) {
+    if (dest_location.path.empty()) {
+      return impl_->RenameContainer(src_location, dest_location);
+    }
+    return impl_->MoveContainerToPath(src_location, dest_location);
+  }
+  if (dest_location.path.empty()) {
+    return impl_->CreateContainerFromPath(src_location, dest_location);
+  }
+  return impl_->MovePath(src_location, dest_location);
 }
 
 Status AzureFileSystem::CopyFile(const std::string& src, const std::string& 
dest) {
diff --git a/cpp/src/arrow/filesystem/azurefs.h 
b/cpp/src/arrow/filesystem/azurefs.h
index 55f89ba477..2a131e40c0 100644
--- a/cpp/src/arrow/filesystem/azurefs.h
+++ b/cpp/src/arrow/filesystem/azurefs.h
@@ -210,6 +210,25 @@ class ARROW_EXPORT AzureFileSystem : public FileSystem {
 
   Status DeleteFile(const std::string& path) override;
 
+  /// \brief Move / rename a file or directory.
+  ///
+  /// There are no files immediately at the root directory, so paths like
+  /// "/segment" always refer to a container of the storage account and are
+  /// treated as directories.
+  ///
+  /// If `dest` exists but the operation fails for some reason, `Move`
+  /// guarantees `dest` is not lost.
+  ///
+  /// Conditions for a successful move:
+  /// 1. `src` must exist.
+  /// 2. `dest` can't contain a strict path prefix of `src`. More generally,
+  ///    a directory can't be made a subdirectory of itself.
+  /// 3. If `dest` already exists and it's a file, `src` must also be a file.
+  ///    `dest` is then replaced by `src`.
+  /// 4. All components of `dest` must exist, except for the last.
+  /// 5. If `dest` already exists and it's a directory, `src` must also be a
+  ///    directory and `dest` must be empty. `dest` is then replaced by `src`
+  ///    and its contents.
   Status Move(const std::string& src, const std::string& dest) override;
 
   Status CopyFile(const std::string& src, const std::string& dest) override;
diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc 
b/cpp/src/arrow/filesystem/azurefs_test.cc
index 4d123028ea..c39a5b7d22 100644
--- a/cpp/src/arrow/filesystem/azurefs_test.cc
+++ b/cpp/src/arrow/filesystem/azurefs_test.cc
@@ -50,12 +50,15 @@
 #include "arrow/filesystem/path_util.h"
 #include "arrow/filesystem/test_util.h"
 #include "arrow/result.h"
+#include "arrow/status.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/util.h"
 #include "arrow/util/io_util.h"
 #include "arrow/util/key_value_metadata.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/pcg_random.h"
 #include "arrow/util/string.h"
+#include "arrow/util/unreachable.h"
 #include "arrow/util/value_parsing.h"
 
 namespace arrow {
@@ -335,7 +338,7 @@ TEST(AzureFileSystem, OptionsCompare) {
 
 struct PreexistingData {
  public:
-  using RNG = std::mt19937_64;
+  using RNG = random::pcg32_fast;
 
  public:
   const std::string container_name;
@@ -354,7 +357,10 @@ culpa qui officia deserunt mollit anim id est laborum.
   explicit PreexistingData(RNG& rng) : 
container_name{RandomContainerName(rng)} {}
 
   // Creates a path by concatenating the container name and the stem.
-  std::string ContainerPath(std::string_view stem) const {
+  std::string ContainerPath(std::string_view stem) const { return Path(stem); }
+
+  // Short alias to ContainerPath()
+  std::string Path(std::string_view stem) const {
     return ConcatAbstractPath(container_name, stem);
   }
 
@@ -391,7 +397,7 @@ culpa qui officia deserunt mollit anim id est laborum.
 class TestAzureFileSystem : public ::testing::Test {
  protected:
   // Set in constructor
-  std::mt19937_64 rng_;
+  random::pcg32_fast rng_;
 
   // Set in SetUp()
   int64_t debug_log_start_ = 0;
@@ -477,18 +483,41 @@ class TestAzureFileSystem : public ::testing::Test {
 
   Blobs::BlobContainerClient CreateContainer(const std::string& name) {
     auto container_client = blob_service_client_->GetBlobContainerClient(name);
-    (void)container_client.CreateIfNotExists();
+    ARROW_UNUSED(container_client.CreateIfNotExists());
     return container_client;
   }
 
+  DataLake::DataLakeFileSystemClient CreateFilesystem(const std::string& name) 
{
+    auto adlfs_client = datalake_service_client_->GetFileSystemClient(name);
+    ARROW_UNUSED(adlfs_client.CreateIfNotExists());
+    return adlfs_client;
+  }
+
   Blobs::BlobClient CreateBlob(Blobs::BlobContainerClient& container_client,
                                const std::string& name, const std::string& 
data = "") {
     auto blob_client = container_client.GetBlockBlobClient(name);
-    (void)blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(data.data()),
-                                 data.size());
+    ARROW_UNUSED(blob_client.UploadFrom(reinterpret_cast<const 
uint8_t*>(data.data()),
+                                        data.size()));
     return blob_client;
   }
 
+  DataLake::DataLakeFileClient CreateFile(
+      DataLake::DataLakeFileSystemClient& filesystem_client, const 
std::string& name,
+      const std::string& data = "") {
+    auto file_client = filesystem_client.GetFileClient(name);
+    ARROW_UNUSED(file_client.UploadFrom(reinterpret_cast<const 
uint8_t*>(data.data()),
+                                        data.size()));
+    return file_client;
+  }
+
+  DataLake::DataLakeDirectoryClient CreateDirectory(
+      DataLake::DataLakeFileSystemClient& adlfs_client, const std::string& 
name) {
+    EXPECT_TRUE(WithHierarchicalNamespace());
+    auto dir_client = adlfs_client.GetDirectoryClient(name);
+    dir_client.Create();
+    return dir_client;
+  }
+
   Blobs::Models::BlobProperties GetBlobProperties(const std::string& 
container_name,
                                                   const std::string& 
blob_name) {
     return blob_service_client_->GetBlobContainerClient(container_name)
@@ -507,8 +536,13 @@ class TestAzureFileSystem : public ::testing::Test {
 
   PreexistingData SetUpPreexistingData() {
     PreexistingData data(rng_);
-    auto container_client = CreateContainer(data.container_name);
-    CreateBlob(container_client, data.kObjectName, 
PreexistingData::kLoremIpsum);
+    if (WithHierarchicalNamespace()) {
+      auto filesystem_client = CreateFilesystem(data.container_name);
+      CreateFile(filesystem_client, data.kObjectName, 
PreexistingData::kLoremIpsum);
+    } else {
+      auto container_client = CreateContainer(data.container_name);
+      CreateBlob(container_client, data.kObjectName, 
PreexistingData::kLoremIpsum);
+    }
     return data;
   }
 
@@ -854,6 +888,393 @@ class TestAzureFileSystem : public ::testing::Test {
     const auto directory_path = data.RandomDirectoryPath(rng_);
     ASSERT_RAISES(IOError, fs()->DeleteDirContents(directory_path, false));
   }
+
+ private:
+  using StringMatcher =
+      
::testing::PolymorphicMatcher<::testing::internal::HasSubstrMatcher<std::string>>;
+
+  StringMatcher HasDirMoveToSubdirMessage(const std::string& src,
+                                          const std::string& dest) {
+    return ::testing::HasSubstr("Cannot Move to '" + dest + "' and make '" + 
src +
+                                "' a sub-directory of itself.");
+  }
+
+  StringMatcher HasCrossContainerNotImplementedMessage(const std::string& 
container_name,
+                                                       const std::string& 
dest) {
+    return ::testing::HasSubstr("Move of '" + container_name + "' to '" + dest 
+
+                                "' requires moving data between "
+                                "containers, which is not implemented.");
+  }
+
+  StringMatcher HasMissingParentDirMessage(const std::string& dest) {
+    return ::testing::HasSubstr("The parent directory of the destination path 
'" + dest +
+                                "' does not exist.");
+  }
+
+  /// \brief Expected POSIX semantics for the rename operation on multiple
+  /// scenarios.
+  ///
+  /// If the src doesn't exist, the error is always ENOENT, otherwise we are
+  /// left with the following combinations:
+  ///
+  /// 1. src's type
+  ///    a. File
+  ///    b. Directory
+  /// 2. dest's existence
+  ///    a. NotFound
+  ///    b. File
+  ///    c. Directory
+  ///       - empty
+  ///       - non-empty
+  /// 3. src path has a trailing slash (or not)
+  /// 4. dest path has a trailing slash (or not)
+  ///
+  /// Limitations: this function doesn't consider paths so it assumes that the
+  /// paths don't lead requests for moves that would make the source a subdir 
of
+  /// the destination.
+  ///
+  /// \param paths_are_equal src and dest paths without trailing slashes are 
equal
+  /// \return std::nullopt if success is expected in the scenario or the errno
+  /// if failure is expected.
+  static std::optional<int> RenameSemantics(FileType src_type, bool 
src_trailing_slash,
+                                            FileType dest_type, bool 
dest_trailing_slash,
+                                            bool dest_is_empty_dir = false,
+                                            bool paths_are_equal = false) {
+    DCHECK(src_type != FileType::Unknown && dest_type != FileType::Unknown);
+    DCHECK(!dest_is_empty_dir || dest_type == FileType::Directory)
+        << "dest_is_empty_dir must imply dest_type == FileType::Directory";
+    switch (src_type) {
+      case FileType::Unknown:
+        break;
+      case FileType::NotFound:
+        return {ENOENT};
+      case FileType::File:
+        switch (dest_type) {
+          case FileType::Unknown:
+            break;
+          case FileType::NotFound:
+            if (src_trailing_slash) {
+              return {ENOTDIR};
+            }
+            if (dest_trailing_slash) {
+              // A slash on the destination path requires that it exists,
+              // so a confirmation that it's a directory can be performed.
+              return {ENOENT};
+            }
+            return {};
+          case FileType::File:
+            if (src_trailing_slash || dest_trailing_slash) {
+              return {ENOTDIR};
+            }
+            // The existing file is replaced successfuly.
+            return {};
+          case FileType::Directory:
+            if (src_trailing_slash) {
+              return {ENOTDIR};
+            }
+            return EISDIR;
+        }
+        break;
+      case FileType::Directory:
+        switch (dest_type) {
+          case FileType::Unknown:
+            break;
+          case FileType::NotFound:
+            // We don't have to care about the slashes when the source is a 
directory.
+            return {};
+          case FileType::File:
+            return {ENOTDIR};
+          case FileType::Directory:
+            if (!paths_are_equal && !dest_is_empty_dir) {
+              return {ENOTEMPTY};
+            }
+            return {};
+        }
+        break;
+    }
+    Unreachable("Invalid parameters passed to RenameSemantics");
+  }
+
+  Status CheckExpectedErrno(const std::string& src, const std::string& dest,
+                            std::optional<int> expected_errno,
+                            const char* expected_errno_name, FileInfo* 
out_src_info) {
+    auto the_fs = fs();
+    const bool src_trailing_slash = internal::HasTrailingSlash(src);
+    const bool dest_trailing_slash = internal::HasTrailingSlash(dest);
+    const auto src_path = std::string{internal::RemoveTrailingSlash(src)};
+    const auto dest_path = std::string{internal::RemoveTrailingSlash(dest)};
+    ARROW_ASSIGN_OR_RAISE(*out_src_info, the_fs->GetFileInfo(src_path));
+    ARROW_ASSIGN_OR_RAISE(auto dest_info, the_fs->GetFileInfo(dest_path));
+    bool dest_is_empty_dir = false;
+    if (dest_info.type() == FileType::Directory) {
+      FileSelector select;
+      select.base_dir = dest_path;
+      select.recursive = false;
+      // TODO(ARROW-40014): investigate why this can't be false here
+      select.allow_not_found = true;
+      ARROW_ASSIGN_OR_RAISE(auto dest_contents, the_fs->GetFileInfo(select));
+      if (dest_contents.empty()) {
+        dest_is_empty_dir = true;
+      }
+    }
+    auto paths_are_equal = src_path == dest_path;
+    auto truly_expected_errno =
+        RenameSemantics(out_src_info->type(), src_trailing_slash, 
dest_info.type(),
+                        dest_trailing_slash, dest_is_empty_dir, 
paths_are_equal);
+    if (truly_expected_errno != expected_errno) {
+      if (expected_errno.has_value()) {
+        return Status::Invalid("expected_errno=", expected_errno_name, "=",
+                               *expected_errno,
+                               " used in ASSERT_MOVE is incorrect. "
+                               "POSIX semantics for this scenario require 
errno=",
+                               strerror(truly_expected_errno.value_or(0)));
+      } else {
+        DCHECK(truly_expected_errno.has_value());
+        return Status::Invalid(
+            "ASSERT_MOVE used to assert success in a scenario for which "
+            "POSIX semantics requires errno=",
+            strerror(*truly_expected_errno));
+      }
+    }
+    return Status::OK();
+  }
+
+  void AssertAfterMove(const std::string& src, const std::string& dest, 
FileType type) {
+    if (internal::RemoveTrailingSlash(src) != 
internal::RemoveTrailingSlash(dest)) {
+      AssertFileInfo(fs(), src, FileType::NotFound);
+    }
+    AssertFileInfo(fs(), dest, type);
+  }
+
+  static bool WithErrno(const Status& status, int expected_errno) {
+    auto* detail = status.detail().get();
+    return detail &&
+           arrow::internal::ErrnoFromStatusDetail(*detail).value_or(-1) == 
expected_errno;
+  }
+
+  std::optional<StringMatcher> MoveErrorMessageMatcher(const FileInfo& 
src_info,
+                                                       const std::string& src,
+                                                       const std::string& dest,
+                                                       int for_errno) {
+    switch (for_errno) {
+      case ENOENT: {
+        auto& path = src_info.type() == FileType::NotFound ? src : dest;
+        return ::testing::HasSubstr("Path does not exist '" + path + "'");
+      }
+      case ENOTEMPTY:
+        return ::testing::HasSubstr("Directory not empty: '" + dest + "'");
+    }
+    return std::nullopt;
+  }
+
+#define ASSERT_MOVE(src, dest, expected_errno)                                 
         \
+  do {                                                                         
         \
+    auto _src = (src);                                                         
         \
+    auto _dest = (dest);                                                       
         \
+    std::optional<int> _expected_errno = (expected_errno);                     
         \
+    FileInfo _src_info;                                                        
         \
+    ASSERT_OK(                                                                 
         \
+        CheckExpectedErrno(_src, _dest, _expected_errno, #expected_errno, 
&_src_info)); \
+    auto _move_st = ::arrow::internal::GenericToStatus(fs()->Move(_src, 
_dest));        \
+    if (_expected_errno.has_value()) {                                         
         \
+      if (WithErrno(_move_st, *_expected_errno)) {                             
         \
+        /* If the Move failed, the source should remain unchanged. */          
         \
+        AssertFileInfo(fs(), std::string{internal::RemoveTrailingSlash(_src)}, 
         \
+                       _src_info.type());                                      
         \
+        auto _message_matcher =                                                
         \
+            MoveErrorMessageMatcher(_src_info, _src, _dest, *_expected_errno); 
         \
+        if (_message_matcher.has_value()) {                                    
         \
+          EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, *_message_matcher, 
_move_st);        \
+        } else {                                                               
         \
+          SUCCEED();                                                           
         \
+        }                                                                      
         \
+      } else {                                                                 
         \
+        FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) 
         \
+               << "' did not fail with errno=" << #expected_errno;             
         \
+      }                                                                        
         \
+    } else {                                                                   
         \
+      if (!_move_st.ok()) {                                                    
         \
+        FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) 
         \
+               << "' failed with " << _move_st.ToString();                     
         \
+      } else {                                                                 
         \
+        AssertAfterMove(_src, _dest, _src_info.type());                        
         \
+      }                                                                        
         \
+    }                                                                          
         \
+  } while (false)
+
+#define ASSERT_MOVE_OK(src, dest) ASSERT_MOVE((src), (dest), std::nullopt)
+
+  // Tests for Move()
+
+ public:
+  void TestRenameContainer() {
+    EXPECT_OK_AND_ASSIGN(auto env, GetAzureEnv());
+    auto data = SetUpPreexistingData();
+    // Container exists, so renaming to the same name succeeds because it's a 
no-op.
+    ASSERT_MOVE_OK(data.container_name, data.container_name);
+    // Renaming a container that doesn't exist fails.
+    ASSERT_MOVE("missing-container", "missing-container", ENOENT);
+    ASSERT_MOVE("missing-container", data.container_name, ENOENT);
+    // Renaming a container to an existing non-empty container fails.
+    auto non_empty_container = PreexistingData::RandomContainerName(rng_);
+    auto non_empty_container_client = CreateContainer(non_empty_container);
+    CreateBlob(non_empty_container_client, "object1", 
PreexistingData::kLoremIpsum);
+    ASSERT_MOVE(data.container_name, non_empty_container, ENOTEMPTY);
+    // Renaming to an empty container fails to replace it
+    auto empty_container = PreexistingData::RandomContainerName(rng_);
+    auto empty_container_client = CreateContainer(empty_container);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError,
+        ::testing::HasSubstr("Unable to replace empty container: '" + 
empty_container +
+                             "'"),
+        fs()->Move(data.container_name, empty_container));
+    // Renaming to a non-existing container creates it
+    auto missing_container = PreexistingData::RandomContainerName(rng_);
+    AssertFileInfo(fs(), missing_container, FileType::NotFound);
+    if (env->backend() == AzureBackend::kAzurite) {
+      // Azurite returns a 201 Created for RenameBlobContainer, but the created
+      // container doesn't contain the blobs from the source container and
+      // the source container remains undeleted after the "rename".
+    } else {
+      // See Azure SDK issue/question:
+      // https://github.com/Azure/azure-sdk-for-cpp/issues/5262
+      EXPECT_RAISES_WITH_MESSAGE_THAT(
+          IOError,
+          ::testing::HasSubstr("The 'rename' operation is not supported on 
containers."),
+          fs()->Move(data.container_name, missing_container));
+      // ASSERT_MOVE_OK(data.container_name, missing_container);
+      // AssertFileInfo(fs(),
+      //                ConcatAbstractPath(missing_container,
+      //                PreexistingData::kObjectName), FileType::File);
+    }
+    // Renaming to an empty container can work if the source is also empty
+    auto new_empty_container = PreexistingData::RandomContainerName(rng_);
+    auto new_empty_container_client = CreateContainer(new_empty_container);
+    ASSERT_MOVE_OK(empty_container, new_empty_container);
+  }
+
+  void TestMoveContainerToPath() {
+    auto data = SetUpPreexistingData();
+    ASSERT_MOVE("missing-container", data.ContainerPath("new-subdir"), ENOENT);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid,
+        HasDirMoveToSubdirMessage(data.container_name, 
data.ContainerPath("new-subdir")),
+        fs()->Move(data.container_name, data.ContainerPath("new-subdir")));
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        NotImplemented,
+        HasCrossContainerNotImplementedMessage(data.container_name,
+                                               "missing-container/new-subdir"),
+        fs()->Move(data.container_name, "missing-container/new-subdir"));
+  }
+
+  void TestCreateContainerFromPath() {
+    auto data = SetUpPreexistingData();
+    auto missing_path = data.RandomDirectoryPath(rng_);
+    ASSERT_MOVE(missing_path, "new-container", ENOENT);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid,
+        ::testing::HasSubstr("Creating files at '/' is not possible, only 
directories."),
+        fs()->Move(data.ObjectPath(), "new-file"));
+    auto src_dir_path = data.RandomDirectoryPath(rng_);
+    ASSERT_OK(fs()->CreateDir(src_dir_path, false));
+    EXPECT_OK_AND_ASSIGN(auto src_dir_info, fs()->GetFileInfo(src_dir_path));
+    EXPECT_EQ(src_dir_info.type(), FileType::Directory);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        NotImplemented,
+        HasCrossContainerNotImplementedMessage(src_dir_path, "new-container"),
+        fs()->Move(src_dir_path, "new-container"));
+  }
+
+  void TestMovePath() {
+    Status st;
+    auto data = SetUpPreexistingData();
+    // When source doesn't exist.
+    ASSERT_MOVE("missing-container/src-path", data.ContainerPath("dest-path"), 
ENOENT);
+    auto missing_path1 = data.RandomDirectoryPath(rng_);
+    ASSERT_MOVE(missing_path1, "missing-container/path", ENOENT);
+
+    // But when source exists...
+    if (!WithHierarchicalNamespace()) {
+      // ...and containers are different, we get an error message telling 
cross-container
+      // moves are not implemented.
+      EXPECT_RAISES_WITH_MESSAGE_THAT(
+          NotImplemented,
+          HasCrossContainerNotImplementedMessage(data.ObjectPath(),
+                                                 "missing-container/path"),
+          fs()->Move(data.ObjectPath(), "missing-container/path"));
+      GTEST_SKIP() << "The rest of TestMovePath is not implemented for non-HNS 
scenarios";
+    }
+    auto adlfs_client =
+        datalake_service_client_->GetFileSystemClient(data.container_name);
+    // ...and dest.container doesn't exist.
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError, HasMissingParentDirMessage("missing-container/path"),
+        fs()->Move(data.ObjectPath(), "missing-container/path"));
+    AssertFileInfo(fs(), data.ObjectPath(), FileType::File);
+
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError, HasMissingParentDirMessage(data.Path("missing-subdir/file")),
+        fs()->Move(data.ObjectPath(), data.Path("missing-subdir/file")));
+    AssertFileInfo(fs(), data.ObjectPath(), FileType::File);
+
+    // src is a file and dest does not exists
+    ASSERT_MOVE_OK(data.ObjectPath(), data.Path("file0"));
+    ASSERT_MOVE(data.Path("file0/"), data.Path("file1"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file0"), data.Path("file1/"), ENOENT);
+    ASSERT_MOVE(data.Path("file0/"), data.Path("file1/"), ENOTDIR);
+    // "file0" exists
+
+    // src is a file and dest exists (as a file)
+    CreateFile(adlfs_client, PreexistingData::kObjectName, 
PreexistingData::kLoremIpsum);
+    CreateFile(adlfs_client, "file1", PreexistingData::kLoremIpsum);
+    ASSERT_MOVE_OK(data.ObjectPath(), data.Path("file0"));
+    ASSERT_MOVE(data.Path("file1/"), data.Path("file0"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file1"), data.Path("file0/"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file1/"), data.Path("file0/"), ENOTDIR);
+    // "file0" and "file1" exist
+
+    // src is a file and dest exists (as an empty dir)
+    CreateDirectory(adlfs_client, "subdir0");
+    ASSERT_MOVE(data.Path("file0"), data.Path("subdir0"), EISDIR);
+    ASSERT_MOVE(data.Path("file0/"), data.Path("subdir0"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file0"), data.Path("subdir0/"), EISDIR);
+    ASSERT_MOVE(data.Path("file0/"), data.Path("subdir0/"), ENOTDIR);
+
+    // src is a file and dest exists (as a non-empty dir)
+    CreateFile(adlfs_client, "subdir0/file-at-subdir");
+    ASSERT_MOVE(data.Path("file0"), data.Path("subdir0"), EISDIR);
+    ASSERT_MOVE(data.Path("file0/"), data.Path("subdir0"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file0"), data.Path("subdir0/"), EISDIR);
+    ASSERT_MOVE(data.Path("file0/"), data.Path("subdir0/"), ENOTDIR);
+    // "subdir0/file-at-subdir" exists
+
+    // src is a directory and dest does not exists
+    ASSERT_MOVE_OK(data.Path("subdir0"), data.Path("subdir1"));
+    ASSERT_MOVE_OK(data.Path("subdir1/"), data.Path("subdir2"));
+    ASSERT_MOVE_OK(data.Path("subdir2"), data.Path("subdir3/"));
+    ASSERT_MOVE_OK(data.Path("subdir3/"), data.Path("subdir4/"));
+    AssertFileInfo(fs(), data.Path("subdir4/file-at-subdir"), FileType::File);
+    // "subdir4/file-at-subdir" exists
+
+    // src is a directory and dest exists as an empty directory
+    CreateDirectory(adlfs_client, "subdir0");
+    CreateDirectory(adlfs_client, "subdir1");
+    CreateDirectory(adlfs_client, "subdir2");
+    CreateDirectory(adlfs_client, "subdir3");
+    ASSERT_MOVE_OK(data.Path("subdir4"), data.Path("subdir0"));
+    ASSERT_MOVE_OK(data.Path("subdir0/"), data.Path("subdir1"));
+    ASSERT_MOVE_OK(data.Path("subdir1"), data.Path("subdir2/"));
+    ASSERT_MOVE_OK(data.Path("subdir2/"), data.Path("subdir3/"));
+    AssertFileInfo(fs(), data.Path("subdir3/file-at-subdir"), FileType::File);
+    // "subdir3/file-at-subdir" exists
+
+    // src is directory and dest exists as a non-empty directory
+    CreateDirectory(adlfs_client, "subdir0");
+    ASSERT_MOVE(data.Path("subdir0"), data.Path("subdir3"), ENOTEMPTY);
+    ASSERT_MOVE(data.Path("subdir0/"), data.Path("subdir3"), ENOTEMPTY);
+    ASSERT_MOVE(data.Path("subdir0"), data.Path("subdir3/"), ENOTEMPTY);
+    ASSERT_MOVE(data.Path("subdir0/"), data.Path("subdir3/"), ENOTEMPTY);
+  }
 };
 
 void TestAzureFileSystem::TestDetectHierarchicalNamespace(bool 
trip_up_azurite) {
@@ -940,10 +1361,9 @@ void 
TestAzureFileSystem::TestGetFileInfoObjectWithNestedStructure() {
                  FileType::NotFound);
 
   if (WithHierarchicalNamespace()) {
-    datalake_service_client_->GetFileSystemClient(data.container_name)
-        .GetDirectoryClient("test-empty-object-dir")
-        .Create();
-
+    auto adlfs_client =
+        datalake_service_client_->GetFileSystemClient(data.container_name);
+    CreateDirectory(adlfs_client, "test-empty-object-dir");
     AssertFileInfo(fs(), data.ContainerPath("test-empty-object-dir"),
                    FileType::Directory);
   }
@@ -1108,6 +1528,20 @@ TYPED_TEST(TestAzureFileSystemOnAllScenarios, 
DeleteDirContentsFailureNonexisten
   this->TestDeleteDirContentsFailureNonexistent();
 }
 
+TYPED_TEST(TestAzureFileSystemOnAllScenarios, RenameContainer) {
+  this->TestRenameContainer();
+}
+
+TYPED_TEST(TestAzureFileSystemOnAllScenarios, MoveContainerToPath) {
+  this->TestMoveContainerToPath();
+}
+
+TYPED_TEST(TestAzureFileSystemOnAllScenarios, CreateContainerFromPath) {
+  this->TestCreateContainerFromPath();
+}
+
+TYPED_TEST(TestAzureFileSystemOnAllScenarios, MovePath) { 
this->TestMovePath(); }
+
 // Tests using Azurite (the local Azure emulator)
 
 TEST_F(TestAzuriteFileSystem, GetFileInfoSelector) {
@@ -1384,9 +1818,14 @@ TEST_F(TestAzuriteFileSystem, 
DeleteDirContentsFailureNonexistent) {
 
 TEST_F(TestAzuriteFileSystem, DeleteFileSuccess) {
   const auto container_name = PreexistingData::RandomContainerName(rng_);
-  ASSERT_OK(fs()->CreateDir(container_name));
-  const auto file_name = ConcatAbstractPath(container_name, "abc");
-  CreateFile(fs(), file_name, "data");
+  const auto file_name = ConcatAbstractPath(container_name, "filename");
+  if (WithHierarchicalNamespace()) {
+    auto adlfs_client = CreateFilesystem(container_name);
+    CreateFile(adlfs_client, "filename", "data");
+  } else {
+    auto container = CreateContainer(container_name);
+    CreateBlob(container, "filename", "data");
+  }
   arrow::fs::AssertFileInfo(fs(), file_name, FileType::File);
   ASSERT_OK(fs()->DeleteFile(file_name));
   arrow::fs::AssertFileInfo(fs(), file_name, FileType::NotFound);
@@ -1394,24 +1833,38 @@ TEST_F(TestAzuriteFileSystem, DeleteFileSuccess) {
 
 TEST_F(TestAzuriteFileSystem, DeleteFileFailureNonexistent) {
   const auto container_name = PreexistingData::RandomContainerName(rng_);
-  ASSERT_OK(fs()->CreateDir(container_name));
   const auto nonexistent_file_name = ConcatAbstractPath(container_name, 
"nonexistent");
+  if (WithHierarchicalNamespace()) {
+    ARROW_UNUSED(CreateFilesystem(container_name));
+  } else {
+    ARROW_UNUSED(CreateContainer(container_name));
+  }
   ASSERT_RAISES(IOError, fs()->DeleteFile(nonexistent_file_name));
 }
 
 TEST_F(TestAzuriteFileSystem, DeleteFileFailureContainer) {
   const auto container_name = PreexistingData::RandomContainerName(rng_);
-  ASSERT_OK(fs()->CreateDir(container_name));
+  if (WithHierarchicalNamespace()) {
+    ARROW_UNUSED(CreateFilesystem(container_name));
+  } else {
+    ARROW_UNUSED(CreateContainer(container_name));
+  }
   arrow::fs::AssertFileInfo(fs(), container_name, FileType::Directory);
   ASSERT_RAISES(IOError, fs()->DeleteFile(container_name));
 }
 
 TEST_F(TestAzuriteFileSystem, DeleteFileFailureDirectory) {
-  const auto directory_name =
-      ConcatAbstractPath(PreexistingData::RandomContainerName(rng_), 
"directory");
-  ASSERT_OK(fs()->CreateDir(directory_name));
-  arrow::fs::AssertFileInfo(fs(), directory_name, FileType::Directory);
-  ASSERT_RAISES(IOError, fs()->DeleteFile(directory_name));
+  auto container_name = PreexistingData::RandomContainerName(rng_);
+  if (WithHierarchicalNamespace()) {
+    auto adlfs_client = CreateFilesystem(container_name);
+    CreateDirectory(adlfs_client, "directory");
+  } else {
+    auto container = CreateContainer(container_name);
+    CreateBlob(container, "directory/");
+  }
+  auto directory_path = ConcatAbstractPath(container_name, "directory");
+  arrow::fs::AssertFileInfo(fs(), directory_path, FileType::Directory);
+  ASSERT_RAISES(IOError, fs()->DeleteFile(directory_path));
 }
 
 TEST_F(TestAzuriteFileSystem, CopyFileSuccessDestinationNonexistent) {
@@ -1542,7 +1995,7 @@ std::shared_ptr<const KeyValueMetadata> 
NormalizerKeyValueMetadata(
     auto value = metadata->value(i);
     if (key == "Content-Hash") {
       std::vector<uint8_t> output;
-      output.reserve(value.size() / 2);
+      output.resize(value.size() / 2);
       if (ParseHexValues(value, output.data()).ok()) {
         // Valid value
         value = std::string(value.size(), 'F');
diff --git a/cpp/src/arrow/filesystem/util_internal.cc 
b/cpp/src/arrow/filesystem/util_internal.cc
index 13f43d45db..8747f9683b 100644
--- a/cpp/src/arrow/filesystem/util_internal.cc
+++ b/cpp/src/arrow/filesystem/util_internal.cc
@@ -64,11 +64,21 @@ Status PathNotFound(std::string_view path) {
       .WithDetail(StatusDetailFromErrno(ENOENT));
 }
 
+Status IsADir(std::string_view path) {
+  return Status::IOError("Is a directory: '", path, "'")
+      .WithDetail(StatusDetailFromErrno(EISDIR));
+}
+
 Status NotADir(std::string_view path) {
   return Status::IOError("Not a directory: '", path, "'")
       .WithDetail(StatusDetailFromErrno(ENOTDIR));
 }
 
+Status NotEmpty(std::string_view path) {
+  return Status::IOError("Directory not empty: '", path, "'")
+      .WithDetail(StatusDetailFromErrno(ENOTEMPTY));
+}
+
 Status NotAFile(std::string_view path) {
   return Status::IOError("Not a regular file: '", path, "'");
 }
diff --git a/cpp/src/arrow/filesystem/util_internal.h 
b/cpp/src/arrow/filesystem/util_internal.h
index 29a51512d0..96cc5178a9 100644
--- a/cpp/src/arrow/filesystem/util_internal.h
+++ b/cpp/src/arrow/filesystem/util_internal.h
@@ -43,9 +43,15 @@ Status CopyStream(const std::shared_ptr<io::InputStream>& 
src,
 ARROW_EXPORT
 Status PathNotFound(std::string_view path);
 
+ARROW_EXPORT
+Status IsADir(std::string_view path);
+
 ARROW_EXPORT
 Status NotADir(std::string_view path);
 
+ARROW_EXPORT
+Status NotEmpty(std::string_view path);
+
 ARROW_EXPORT
 Status NotAFile(std::string_view path);
 
diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc
index 751ef28d41..b693336e09 100644
--- a/cpp/src/arrow/util/io_util.cc
+++ b/cpp/src/arrow/util/io_util.cc
@@ -449,6 +449,13 @@ std::shared_ptr<StatusDetail> StatusDetailFromErrno(int 
errnum) {
   return std::make_shared<ErrnoDetail>(errnum);
 }
 
+std::optional<int> ErrnoFromStatusDetail(const StatusDetail& detail) {
+  if (detail.type_id() == kErrnoDetailTypeId) {
+    return checked_cast<const ErrnoDetail&>(detail).errnum();
+  }
+  return std::nullopt;
+}
+
 #if _WIN32
 std::shared_ptr<StatusDetail> StatusDetailFromWinError(int errnum) {
   if (!errnum) {
diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h
index 113b1bdd93..bba71c0d80 100644
--- a/cpp/src/arrow/util/io_util.h
+++ b/cpp/src/arrow/util/io_util.h
@@ -23,6 +23,7 @@
 
 #include <atomic>
 #include <memory>
+#include <optional>
 #include <string>
 #include <utility>
 #include <vector>
@@ -264,6 +265,8 @@ std::string WinErrorMessage(int errnum);
 
 ARROW_EXPORT
 std::shared_ptr<StatusDetail> StatusDetailFromErrno(int errnum);
+ARROW_EXPORT
+std::optional<int> ErrnoFromStatusDetail(const StatusDetail& detail);
 #if _WIN32
 ARROW_EXPORT
 std::shared_ptr<StatusDetail> StatusDetailFromWinError(int errnum);

Reply via email to