kou commented on code in PR #39904:
URL: https://github.com/apache/arrow/pull/39904#discussion_r1475591027


##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -481,6 +487,12 @@ class TestAzureFileSystem : public ::testing::Test {
     return container_client;
   }
 
+  DataLake::DataLakeFileSystemClient CreateFilesystem(const std::string& name) 
{
+    auto adlfs_client = datalake_service_client_->GetFileSystemClient(name);
+    (void)adlfs_client.CreateIfNotExists();

Review Comment:
   ```suggestion
       ARROW_UNUSED(adlfs_client.CreateIfNotExists());
   ```



##########
cpp/src/arrow/filesystem/azurefs.h:
##########
@@ -210,6 +210,25 @@ class ARROW_EXPORT AzureFileSystem : public FileSystem {
 
   Status DeleteFile(const std::string& path) override;
 
+  /// \brief Move / rename a file or directory.
+  ///
+  /// There are no files immediately at the root directory, so paths like
+  /// "/segment" always refer to a container of the storage account and are
+  /// treated as directories.
+  ///
+  /// If `dest` exists but the operation fails for some reason, `Move`
+  /// guarantees `dest` is not lost.
+  ///
+  /// Conditions for a successful move:
+  /// 1. `src` must exist.
+  /// 2. `dest` can't contain a strict path prefix of `src`. More generally,
+  ///    a directory can't be made a subdirectory of itself.
+  /// 3. If `dest` already exists and its a file, `src` must also be a file.

Review Comment:
   ```suggestion
     /// 3. If `dest` already exists and it's a file, `src` must also be a file.
   ```



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -942,6 +972,183 @@ FileInfo FileInfoFromBlob(std::string_view container,
   return info;
 }
 
+/// \brief RAII-style guard for releasing a lease on a blob or container.
+///
+/// The guard should be constructed right after a successful 
BlobLeaseClient::Acquire()
+/// call. Use std::optional<LeaseGuard> to declare a guard in outer scope and 
construct it
+/// later with std::optional::emplace(...).
+///
+/// Leases expire automatically, but explicit release means concurrent clients 
or
+/// ourselves when trying new operations on the same blob or container don't 
have
+/// to wait for the lease to expire by itself.
+///
+/// Learn more about leases at
+/// https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob
+class LeaseGuard {
+ public:
+  using SteadyClock = std::chrono::steady_clock;
+
+ private:
+  /// \brief The time when the lease expires or is broken.
+  ///
+  /// The lease is not guaranteed to be valid until this time, but it is 
guaranteed to
+  /// be expired after this time. In other words, this is an overestimation of
+  /// the true time_point.
+  SteadyClock::time_point break_or_expires_at_;
+  const std::unique_ptr<Blobs::BlobLeaseClient> lease_client_;
+  bool release_attempt_pending_ = true;
+
+  /// \brief The latest known expiry time of a lease guarded by this class
+  /// that failed to be released or was forgotten by calling Forget().
+  static std::atomic<SteadyClock::time_point> latest_known_expiry_time_;
+
+  /// \brief The maximum lease duration supported by Azure Storage.
+  static constexpr std::chrono::seconds kMaxLeaseDuration{60};
+
+ public:
+  LeaseGuard(std::unique_ptr<Blobs::BlobLeaseClient> lease_client,
+             std::chrono::seconds lease_duration)
+      : break_or_expires_at_(SteadyClock::now() +
+                             std::min(kMaxLeaseDuration, lease_duration)),
+        lease_client_(std::move(lease_client)) {
+    DCHECK(lease_duration <= kMaxLeaseDuration);
+    DCHECK(this->lease_client_);
+  }
+
+  ARROW_DISALLOW_COPY_AND_ASSIGN(LeaseGuard);
+
+  ~LeaseGuard() {
+    // No point in trying any error handling here other than the debug 
checking. The lease
+    // will eventually expire on the backend without any intervention from us 
(just much
+    // later than if we released it).
+    [[maybe_unused]] auto status = Release();
+    ARROW_LOG(DEBUG) << status;
+  }
+
+  bool PendingRelease() const {
+    return release_attempt_pending_ && SteadyClock::now() <= 
break_or_expires_at_;
+  }
+
+ private:
+  Status DoRelease() {
+    DCHECK(release_attempt_pending_);
+    try {
+      lease_client_->Release();
+    } catch (const Storage::StorageException& exception) {
+      return ExceptionToStatus(exception, "Failed to release the ",
+                               lease_client_->GetLeaseId(), " lease on '",
+                               lease_client_->GetUrl(), "'");
+    }
+    return Status::OK();
+  }
+
+ public:
+  std::string LeaseId() const { return lease_client_->GetLeaseId(); }
+
+  bool StillValidFor(SteadyClock::duration expected_time_left) const {
+    return SteadyClock::now() + expected_time_left < break_or_expires_at_;
+  }
+
+  /// \brief Break the lease.
+  ///
+  /// The lease will stay in the "Breaking" state for break_period seconds or
+  /// less if the lease is expiring before that.
+  ///
+  /// 
https://learn.microsoft.com/en-us/rest/api/storageservices/lease-container#outcomes-of-use-attempts-on-containers-by-lease-state
+  /// 
https://learn.microsoft.com/en-us/rest/api/storageservices/lease-blob#outcomes-of-use-attempts-on-blobs-by-lease-state
+  Status Break(Azure::Nullable<std::chrono::seconds> break_period = {}) {
+    auto remaining_time_ms = [this]() {
+      const auto remaing_time = break_or_expires_at_ - SteadyClock::now();
+      return 
std::chrono::duration_cast<std::chrono::milliseconds>(remaing_time);
+    };
+#ifndef NDEBUG
+    if (break_period.HasValue() && !StillValidFor(*break_period)) {
+      ARROW_LOG(WARNING)
+          << "Azure Storage: requested break_period ("
+          << break_period.ValueOr(std::chrono::seconds{0}).count()
+          << "s) is too long or lease duration is too short for all the 
operations "
+             "performed so far (lease expires in "
+          << remaining_time_ms().count() << "ms)";
+    }
+#endif
+    Blobs::BreakLeaseOptions options;
+    options.BreakPeriod = break_period;
+    try {
+      lease_client_->Break(options);
+      break_or_expires_at_ =
+          std::min(break_or_expires_at_,
+                   SteadyClock::now() + 
break_period.ValueOr(std::chrono::seconds{0}));
+    } catch (const Storage::StorageException& exception) {
+      return ExceptionToStatus(
+          exception, "Failed to break the ", lease_client_->GetLeaseId(), " 
lease on '",
+          lease_client_->GetUrl(), "' expiring in ", 
remaining_time_ms().count(), "ms");
+    }
+    return Status::OK();
+  }
+
+  /// \brief Break the lease before deleting or renaming the resource.
+  ///
+  /// Calling this is recommended when the resource for which the lease was 
acquired is
+  /// about to be deleted as there is no way of releasing the lease after 
that, we can
+  /// only forget about it. The break_period should be a conservative estimate 
of the time
+  /// it takes to delete/rename the resource.
+  ///
+  /// If break_period is too small, the delete/rename will fail with a lease 
conflict,
+  /// and if it's too large the only consequence is that a lease on a 
non-existent
+  /// resource will remain in the "Breaking" state for a while blocking others
+  /// from recreating the resource.
+  void BreakBeforeDeletion(std::chrono::seconds break_period) {
+    ARROW_CHECK_OK(Break(break_period));
+  }
+
+  ARROW_NOINLINE Status Release() {
+    if (!PendingRelease()) {
+      return Status::OK();
+    }
+    auto status = DoRelease();
+    if (!status.ok()) {
+      Forget();
+      return status;
+    }
+    release_attempt_pending_ = false;
+    return Status::OK();
+  }
+
+  /// \brief Prevent any release attempts in the destructor.
+  ///
+  /// When it's known they would certainly fail.
+  /// \see LeaseGuard::BreakBeforeDeletion()
+  ARROW_NOINLINE void Forget() {
+    if (!PendingRelease()) {
+      release_attempt_pending_ = false;
+      return;
+    }
+    release_attempt_pending_ = false;
+    // Remember the latest known expiry time so we can gracefully handle lease
+    // acquisition failures by waiting until the latest forgotten lease.
+    auto latest = latest_known_expiry_time_.load(std::memory_order_relaxed);
+    while (
+        latest < break_or_expires_at_ &&
+        !latest_known_expiry_time_.compare_exchange_weak(latest, 
break_or_expires_at_)) {
+    }
+    DCHECK_GE(latest_known_expiry_time_.load(), break_or_expires_at_);

Review Comment:
   Is it safe?
   I think that `latest_known_expiry_time_` may be changed between 
`latest_known_expiry_time_.compare_exchange_weak()` and 
`latest_known_expiry_time_.load()`.



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -854,6 +888,397 @@ class TestAzureFileSystem : public ::testing::Test {
     const auto directory_path = data.RandomDirectoryPath(rng_);
     ASSERT_RAISES(IOError, fs()->DeleteDirContents(directory_path, false));
   }
+
+ private:
+  using StringMatcher =
+      
::testing::PolymorphicMatcher<::testing::internal::HasSubstrMatcher<std::string>>;
+
+  StringMatcher HasDirMoveToSubdirMessage(const std::string& src,
+                                          const std::string& dest) {
+    return ::testing::HasSubstr("Cannot Move to '" + dest + "' and make '" + 
src +
+                                "' a sub-directory of itself.");
+  }
+
+  StringMatcher HasCrossContainerNotImplementedMessage(const std::string& 
container_name,
+                                                       const std::string& 
dest) {
+    return ::testing::HasSubstr("Move of '" + container_name + "' to '" + dest 
+
+                                "' requires moving data between "
+                                "containers, which is not implemented.");
+  }
+
+  StringMatcher HasMissingParentDirMessage(const std::string& dest) {
+    return ::testing::HasSubstr("The parent directory of the destination path 
'" + dest +
+                                "' does not exist.");
+  }
+
+  /// \brief Expected POSIX semantics for the rename operation on multiple
+  /// scenarios.
+  ///
+  /// If the src doesn't exist, the error is always ENOENT, otherwise we are
+  /// left with the following combinations:
+  ///
+  /// 1. src's type
+  ///    a. File
+  ///    b. Directory
+  /// 2. dest's existence
+  ///    a. NotFound
+  ///    b. File
+  ///    c. Directory
+  ///       - empty
+  ///       - non-empty
+  /// 3. src path has a trailing slash (or not)
+  /// 4. dest path has a trailing slash (or not)
+  ///
+  /// Limitations: this function doesn't consider paths so it assumes that the
+  /// paths don't lead requests for moves that would make the source a subdir 
of
+  /// the destination.
+  ///
+  /// \param paths_are_equal src and dest paths without trailing slashes are 
equal
+  /// \return std::nullopt if success is expected in the scenario or the errno
+  /// if failure is expected.
+  static std::optional<int> RenameSemantics(FileType src_type, bool 
src_trailing_slash,
+                                            FileType dest_type, bool 
dest_trailing_slash,
+                                            bool dest_is_empty_dir = false,
+                                            bool paths_are_equal = false) {
+    DCHECK(src_type != FileType::Unknown && dest_type != FileType::Unknown);
+    DCHECK(!dest_is_empty_dir || dest_type == FileType::Directory)
+        << "dest_is_empty_dir must imply dest_type == FileType::Directory";
+    switch (src_type) {
+      case FileType::Unknown:
+        break;
+      case FileType::NotFound:
+        return {ENOENT};
+      case FileType::File:
+        switch (dest_type) {
+          case FileType::Unknown:
+            break;
+          case FileType::NotFound:
+            if (src_trailing_slash) {
+              return {ENOTDIR};
+            }
+            if (dest_trailing_slash) {
+              // A slash on the destination path requires that it exists,
+              // so a confirmation that it's a directory can be performed.
+              return {ENOENT};
+            }
+            return {};
+          case FileType::File:
+            if (src_trailing_slash || dest_trailing_slash) {
+              return {ENOTDIR};
+            }
+            // The existing file is replaced successfuly.
+            return {};
+          case FileType::Directory:
+            if (src_trailing_slash) {
+              return {ENOTDIR};
+            }
+            return EISDIR;
+        }
+        break;
+      case FileType::Directory:
+        switch (dest_type) {
+          case FileType::Unknown:
+            break;
+          case FileType::NotFound:
+            // We don't have to care about the slashes when the source is a 
directory.
+            return {};
+          case FileType::File:
+            return {ENOTDIR};
+          case FileType::Directory:
+            if (!paths_are_equal && !dest_is_empty_dir) {
+              return {ENOTEMPTY};
+            }
+            return {};
+        }
+        break;
+    }
+    Unreachable("Invalid parameters passed to RenameSemantics");
+  }
+
+  Status CheckExpectedErrno(const std::string& src, const std::string& dest,
+                            std::optional<int> expected_errno,
+                            const char* expected_errno_name, FileInfo* 
out_src_info) {
+    auto the_fs = fs();
+    const bool src_trailing_slash = internal::HasTrailingSlash(src);
+    const bool dest_trailing_slash = internal::HasTrailingSlash(dest);
+    const auto src_path = std::string{internal::RemoveTrailingSlash(src)};
+    const auto dest_path = std::string{internal::RemoveTrailingSlash(dest)};
+    ARROW_ASSIGN_OR_RAISE(*out_src_info, the_fs->GetFileInfo(src_path));
+    ARROW_ASSIGN_OR_RAISE(auto dest_info, the_fs->GetFileInfo(dest_path));
+    bool dest_is_empty_dir = false;
+    if (dest_info.type() == FileType::Directory) {
+      FileSelector select;
+      select.base_dir = dest_path;
+      select.recursive = false;
+      // TODO(felipecrv): investigate why this can't be false
+      select.allow_not_found = true;
+      ARROW_ASSIGN_OR_RAISE(auto dest_contents, the_fs->GetFileInfo(select));
+      if (dest_contents.empty()) {
+        dest_is_empty_dir = true;
+      }
+    }
+    auto paths_are_equal = src_path == dest_path;
+    auto truly_expected_errno =
+        RenameSemantics(out_src_info->type(), src_trailing_slash, 
dest_info.type(),
+                        dest_trailing_slash, dest_is_empty_dir, 
paths_are_equal);
+    if (truly_expected_errno != expected_errno) {
+      if (expected_errno.has_value()) {
+        return Status::Invalid("expected_errno=", expected_errno_name, "=",
+                               *expected_errno,
+                               " used in ASSERT_MOVE is incorrect. "
+                               "POSIX semantics for this scenario require 
errno=",
+                               strerror(truly_expected_errno.value_or(0)));
+      } else {
+        DCHECK(truly_expected_errno.has_value());
+        return Status::Invalid(
+            "ASSERT_MOVE used to assert success in a scenario for which "
+            "POSIX semantics requires errno=",
+            strerror(*truly_expected_errno));
+      }
+    }
+    return Status::OK();
+  }
+
+  void AssertAfterMove(const std::string& src, const std::string& dest, 
FileType type) {
+    if (internal::RemoveTrailingSlash(src) != 
internal::RemoveTrailingSlash(dest)) {
+      AssertFileInfo(fs(), src, FileType::NotFound);
+    }
+    AssertFileInfo(fs(), dest, type);
+  }
+
+  static bool WithErrno(const Status& status, int expected_errno) {
+    auto* detail = status.detail().get();
+    return detail &&
+           arrow::internal::ErrnoFromStatusDetail(*detail).value_or(-1) == 
expected_errno;
+  }
+
+  std::optional<StringMatcher> MoveErrorMessageMatcher(const FileInfo& 
src_info,
+                                                       const std::string& src,
+                                                       const std::string& dest,
+                                                       int for_errno) {
+    switch (for_errno) {
+      case ENOENT: {
+        auto& path = src_info.type() == FileType::NotFound ? src : dest;
+        return ::testing::HasSubstr("Path does not exist '" + path + "'");
+      }
+      case ENOTEMPTY:
+        return ::testing::HasSubstr("Directory not empty: '" + dest + "'");
+    }
+    return std::nullopt;
+  }
+
+#define ASSERT_MOVE(src, dest, expected_errno)                                 
         \
+  do {                                                                         
         \
+    auto _src = (src);                                                         
         \
+    auto _dest = (dest);                                                       
         \
+    std::optional<int> _expected_errno = (expected_errno);                     
         \
+    FileInfo _src_info;                                                        
         \
+    ASSERT_OK(                                                                 
         \
+        CheckExpectedErrno(_src, _dest, _expected_errno, #expected_errno, 
&_src_info)); \
+    auto _move_st = ::arrow::internal::GenericToStatus(fs()->Move(_src, 
_dest));        \
+    if (_expected_errno.has_value()) {                                         
         \
+      if (WithErrno(_move_st, *_expected_errno)) {                             
         \
+        /* If the Move failed, the source should remain unchanged. */          
         \
+        AssertFileInfo(fs(), std::string{internal::RemoveTrailingSlash(_src)}, 
         \
+                       _src_info.type());                                      
         \
+        auto _message_matcher =                                                
         \
+            MoveErrorMessageMatcher(_src_info, _src, _dest, *_expected_errno); 
         \
+        if (_message_matcher.has_value()) {                                    
         \
+          EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, *_message_matcher, 
_move_st);        \
+        } else {                                                               
         \
+          SUCCEED();                                                           
         \
+        }                                                                      
         \
+      } else {                                                                 
         \
+        FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) 
         \
+               << "' did not fail with errno=" << #expected_errno;             
         \
+      }                                                                        
         \
+    } else {                                                                   
         \
+      if (!_move_st.ok()) {                                                    
         \
+        FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) 
         \
+               << "' failed with " << _move_st.ToString();                     
         \
+      } else {                                                                 
         \
+        AssertAfterMove(_src, _dest, _src_info.type());                        
         \
+      }                                                                        
         \
+    }                                                                          
         \
+  } while (false)
+
+#define ASSERT_MOVE_OK(src, dest) ASSERT_MOVE((src), (dest), std::nullopt)
+
+  // Tests for Move()
+
+ public:
+  void TestRenameContainer() {
+    EXPECT_OK_AND_ASSIGN(auto env, GetAzureEnv());
+    auto data = SetUpPreexistingData();
+    // Container exists, so renaming to the same name succeeds because it's a 
no-op.
+    ASSERT_MOVE_OK(data.container_name, data.container_name);
+    // Renaming a container that doesn't exist fails.
+    ASSERT_MOVE("missing-container", "missing-container", ENOENT);
+    ASSERT_MOVE("missing-container", data.container_name, ENOENT);
+    // Renaming a container to an existing non-empty container fails.
+    auto non_empty_container = PreexistingData::RandomContainerName(rng_);
+    auto non_empty_container_client = CreateContainer(non_empty_container);
+    CreateBlob(non_empty_container_client, "object1", 
PreexistingData::kLoremIpsum);
+    ASSERT_MOVE(data.container_name, non_empty_container, ENOTEMPTY);
+    // Renaming to an empty container fails to replace it
+    auto empty_container = PreexistingData::RandomContainerName(rng_);
+    auto empty_container_client = CreateContainer(empty_container);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError,
+        ::testing::HasSubstr("Unable to replace empty container: '" + 
empty_container +
+                             "'"),
+        fs()->Move(data.container_name, empty_container));
+    // Renaming to a non-existing container creates it
+    auto new_container = PreexistingData::RandomContainerName(rng_);
+    AssertFileInfo(fs(), new_container, FileType::NotFound);
+    if (env->backend() == AzureBackend::kAzurite) {
+      // Azurite returns a 201 Created for RenameBlobContainer, but the created
+      // container doesn't contain the blobs from the source container and
+      // the source container reamins undeleted after the "rename".

Review Comment:
   Could you report this to Azurite?



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1690,17 +1922,413 @@ class AzureFileSystem::Impl {
     }
   }
 
+  /// \brief Create a BlobLeaseClient and acquire a lease on the container.
+  ///
+  /// \param allow_missing_container if true, a nullptr may be returned when 
the container
+  /// doesn't exist, otherwise a PathNotFound(location) error is produced 
right away
+  /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and
+  /// optional (nullptr denotes container not found)
+  Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireContainerLease(
+      const AzureLocation& location, std::chrono::seconds lease_duration,
+      bool allow_missing_container = false, bool retry_allowed = true) {
+    DCHECK(!location.container.empty());
+    auto container_client = GetBlobContainerClient(location.container);
+    auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId();
+    auto container_url = container_client.GetUrl();
+    auto lease_client = std::make_unique<Blobs::BlobLeaseClient>(
+        std::move(container_client), std::move(lease_id));
+    try {
+      auto result = lease_client->Acquire(lease_duration);

Review Comment:
   Do we need `[[maybe_unused]]` here?



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1690,17 +1922,413 @@ class AzureFileSystem::Impl {
     }
   }
 
+  /// \brief Create a BlobLeaseClient and acquire a lease on the container.
+  ///
+  /// \param allow_missing_container if true, a nullptr may be returned when 
the container
+  /// doesn't exist, otherwise a PathNotFound(location) error is produced 
right away
+  /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and
+  /// optional (nullptr denotes container not found)
+  Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireContainerLease(
+      const AzureLocation& location, std::chrono::seconds lease_duration,
+      bool allow_missing_container = false, bool retry_allowed = true) {
+    DCHECK(!location.container.empty());
+    auto container_client = GetBlobContainerClient(location.container);
+    auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId();
+    auto container_url = container_client.GetUrl();
+    auto lease_client = std::make_unique<Blobs::BlobLeaseClient>(
+        std::move(container_client), std::move(lease_id));
+    try {
+      auto result = lease_client->Acquire(lease_duration);
+      DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId());
+    } catch (const Storage::StorageException& exception) {
+      if (IsContainerNotFound(exception)) {
+        if (allow_missing_container) {
+          return nullptr;
+        }
+        return PathNotFound(location);
+      } else if (exception.StatusCode == Http::HttpStatusCode::Conflict &&
+                 exception.ErrorCode == "LeaseAlreadyPresent") {
+        if (retry_allowed) {
+          LeaseGuard::WaitUntilLatestKnownExpiryTime();
+          return AcquireContainerLease(location, lease_duration, 
allow_missing_container,
+                                       /*retry_allowed=*/false);
+        }
+      }
+      return ExceptionToStatus(exception, "Failed to acquire a lease on 
container '",
+                               location.container, "': ", container_url);
+    }
+    return lease_client;
+  }
+
+  /// \brief Create a BlobLeaseClient and acquire a lease on a blob/file (or
+  /// directory if Hierarchical Namespace is supported).
+  ///
+  /// \param allow_missing if true, a nullptr may be returned when the blob
+  /// doesn't exist, otherwise a PathNotFound(location) error is produced 
right away
+  /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and
+  /// optional (nullptr denotes blob not found)
+  Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireBlobLease(

Review Comment:
   It seems that most codes are duplicated with `AcquireContainerLease()`. Can 
we unify them?



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -854,6 +888,397 @@ class TestAzureFileSystem : public ::testing::Test {
     const auto directory_path = data.RandomDirectoryPath(rng_);
     ASSERT_RAISES(IOError, fs()->DeleteDirContents(directory_path, false));
   }
+
+ private:
+  using StringMatcher =
+      
::testing::PolymorphicMatcher<::testing::internal::HasSubstrMatcher<std::string>>;
+
+  StringMatcher HasDirMoveToSubdirMessage(const std::string& src,
+                                          const std::string& dest) {
+    return ::testing::HasSubstr("Cannot Move to '" + dest + "' and make '" + 
src +
+                                "' a sub-directory of itself.");
+  }
+
+  StringMatcher HasCrossContainerNotImplementedMessage(const std::string& 
container_name,
+                                                       const std::string& 
dest) {
+    return ::testing::HasSubstr("Move of '" + container_name + "' to '" + dest 
+
+                                "' requires moving data between "
+                                "containers, which is not implemented.");
+  }
+
+  StringMatcher HasMissingParentDirMessage(const std::string& dest) {
+    return ::testing::HasSubstr("The parent directory of the destination path 
'" + dest +
+                                "' does not exist.");
+  }
+
+  /// \brief Expected POSIX semantics for the rename operation on multiple
+  /// scenarios.
+  ///
+  /// If the src doesn't exist, the error is always ENOENT, otherwise we are
+  /// left with the following combinations:
+  ///
+  /// 1. src's type
+  ///    a. File
+  ///    b. Directory
+  /// 2. dest's existence
+  ///    a. NotFound
+  ///    b. File
+  ///    c. Directory
+  ///       - empty
+  ///       - non-empty
+  /// 3. src path has a trailing slash (or not)
+  /// 4. dest path has a trailing slash (or not)
+  ///
+  /// Limitations: this function doesn't consider paths so it assumes that the
+  /// paths don't lead requests for moves that would make the source a subdir 
of
+  /// the destination.
+  ///
+  /// \param paths_are_equal src and dest paths without trailing slashes are 
equal
+  /// \return std::nullopt if success is expected in the scenario or the errno
+  /// if failure is expected.
+  static std::optional<int> RenameSemantics(FileType src_type, bool 
src_trailing_slash,
+                                            FileType dest_type, bool 
dest_trailing_slash,
+                                            bool dest_is_empty_dir = false,
+                                            bool paths_are_equal = false) {
+    DCHECK(src_type != FileType::Unknown && dest_type != FileType::Unknown);
+    DCHECK(!dest_is_empty_dir || dest_type == FileType::Directory)
+        << "dest_is_empty_dir must imply dest_type == FileType::Directory";
+    switch (src_type) {
+      case FileType::Unknown:
+        break;
+      case FileType::NotFound:
+        return {ENOENT};
+      case FileType::File:
+        switch (dest_type) {
+          case FileType::Unknown:
+            break;
+          case FileType::NotFound:
+            if (src_trailing_slash) {
+              return {ENOTDIR};
+            }
+            if (dest_trailing_slash) {
+              // A slash on the destination path requires that it exists,
+              // so a confirmation that it's a directory can be performed.
+              return {ENOENT};
+            }
+            return {};
+          case FileType::File:
+            if (src_trailing_slash || dest_trailing_slash) {
+              return {ENOTDIR};
+            }
+            // The existing file is replaced successfuly.
+            return {};
+          case FileType::Directory:
+            if (src_trailing_slash) {
+              return {ENOTDIR};
+            }
+            return EISDIR;
+        }
+        break;
+      case FileType::Directory:
+        switch (dest_type) {
+          case FileType::Unknown:
+            break;
+          case FileType::NotFound:
+            // We don't have to care about the slashes when the source is a 
directory.
+            return {};
+          case FileType::File:
+            return {ENOTDIR};
+          case FileType::Directory:
+            if (!paths_are_equal && !dest_is_empty_dir) {
+              return {ENOTEMPTY};
+            }
+            return {};
+        }
+        break;
+    }
+    Unreachable("Invalid parameters passed to RenameSemantics");
+  }
+
+  Status CheckExpectedErrno(const std::string& src, const std::string& dest,
+                            std::optional<int> expected_errno,
+                            const char* expected_errno_name, FileInfo* 
out_src_info) {
+    auto the_fs = fs();
+    const bool src_trailing_slash = internal::HasTrailingSlash(src);
+    const bool dest_trailing_slash = internal::HasTrailingSlash(dest);
+    const auto src_path = std::string{internal::RemoveTrailingSlash(src)};
+    const auto dest_path = std::string{internal::RemoveTrailingSlash(dest)};
+    ARROW_ASSIGN_OR_RAISE(*out_src_info, the_fs->GetFileInfo(src_path));
+    ARROW_ASSIGN_OR_RAISE(auto dest_info, the_fs->GetFileInfo(dest_path));
+    bool dest_is_empty_dir = false;
+    if (dest_info.type() == FileType::Directory) {
+      FileSelector select;
+      select.base_dir = dest_path;
+      select.recursive = false;
+      // TODO(felipecrv): investigate why this can't be false
+      select.allow_not_found = true;
+      ARROW_ASSIGN_OR_RAISE(auto dest_contents, the_fs->GetFileInfo(select));
+      if (dest_contents.empty()) {
+        dest_is_empty_dir = true;
+      }
+    }
+    auto paths_are_equal = src_path == dest_path;
+    auto truly_expected_errno =
+        RenameSemantics(out_src_info->type(), src_trailing_slash, 
dest_info.type(),
+                        dest_trailing_slash, dest_is_empty_dir, 
paths_are_equal);
+    if (truly_expected_errno != expected_errno) {
+      if (expected_errno.has_value()) {
+        return Status::Invalid("expected_errno=", expected_errno_name, "=",
+                               *expected_errno,
+                               " used in ASSERT_MOVE is incorrect. "
+                               "POSIX semantics for this scenario require 
errno=",
+                               strerror(truly_expected_errno.value_or(0)));
+      } else {
+        DCHECK(truly_expected_errno.has_value());
+        return Status::Invalid(
+            "ASSERT_MOVE used to assert success in a scenario for which "
+            "POSIX semantics requires errno=",
+            strerror(*truly_expected_errno));
+      }
+    }
+    return Status::OK();
+  }
+
+  void AssertAfterMove(const std::string& src, const std::string& dest, 
FileType type) {
+    if (internal::RemoveTrailingSlash(src) != 
internal::RemoveTrailingSlash(dest)) {
+      AssertFileInfo(fs(), src, FileType::NotFound);
+    }
+    AssertFileInfo(fs(), dest, type);
+  }
+
+  static bool WithErrno(const Status& status, int expected_errno) {
+    auto* detail = status.detail().get();
+    return detail &&
+           arrow::internal::ErrnoFromStatusDetail(*detail).value_or(-1) == 
expected_errno;
+  }
+
+  std::optional<StringMatcher> MoveErrorMessageMatcher(const FileInfo& 
src_info,
+                                                       const std::string& src,
+                                                       const std::string& dest,
+                                                       int for_errno) {
+    switch (for_errno) {
+      case ENOENT: {
+        auto& path = src_info.type() == FileType::NotFound ? src : dest;
+        return ::testing::HasSubstr("Path does not exist '" + path + "'");
+      }
+      case ENOTEMPTY:
+        return ::testing::HasSubstr("Directory not empty: '" + dest + "'");
+    }
+    return std::nullopt;
+  }
+
+#define ASSERT_MOVE(src, dest, expected_errno)                                 
         \
+  do {                                                                         
         \
+    auto _src = (src);                                                         
         \
+    auto _dest = (dest);                                                       
         \
+    std::optional<int> _expected_errno = (expected_errno);                     
         \
+    FileInfo _src_info;                                                        
         \
+    ASSERT_OK(                                                                 
         \
+        CheckExpectedErrno(_src, _dest, _expected_errno, #expected_errno, 
&_src_info)); \
+    auto _move_st = ::arrow::internal::GenericToStatus(fs()->Move(_src, 
_dest));        \
+    if (_expected_errno.has_value()) {                                         
         \
+      if (WithErrno(_move_st, *_expected_errno)) {                             
         \
+        /* If the Move failed, the source should remain unchanged. */          
         \
+        AssertFileInfo(fs(), std::string{internal::RemoveTrailingSlash(_src)}, 
         \
+                       _src_info.type());                                      
         \
+        auto _message_matcher =                                                
         \
+            MoveErrorMessageMatcher(_src_info, _src, _dest, *_expected_errno); 
         \
+        if (_message_matcher.has_value()) {                                    
         \
+          EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, *_message_matcher, 
_move_st);        \
+        } else {                                                               
         \
+          SUCCEED();                                                           
         \
+        }                                                                      
         \
+      } else {                                                                 
         \
+        FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) 
         \
+               << "' did not fail with errno=" << #expected_errno;             
         \
+      }                                                                        
         \
+    } else {                                                                   
         \
+      if (!_move_st.ok()) {                                                    
         \
+        FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) 
         \
+               << "' failed with " << _move_st.ToString();                     
         \
+      } else {                                                                 
         \
+        AssertAfterMove(_src, _dest, _src_info.type());                        
         \
+      }                                                                        
         \
+    }                                                                          
         \
+  } while (false)
+
+#define ASSERT_MOVE_OK(src, dest) ASSERT_MOVE((src), (dest), std::nullopt)
+
+  // Tests for Move()
+
+ public:
+  void TestRenameContainer() {
+    EXPECT_OK_AND_ASSIGN(auto env, GetAzureEnv());
+    auto data = SetUpPreexistingData();
+    // Container exists, so renaming to the same name succeeds because it's a 
no-op.
+    ASSERT_MOVE_OK(data.container_name, data.container_name);
+    // Renaming a container that doesn't exist fails.
+    ASSERT_MOVE("missing-container", "missing-container", ENOENT);
+    ASSERT_MOVE("missing-container", data.container_name, ENOENT);
+    // Renaming a container to an existing non-empty container fails.
+    auto non_empty_container = PreexistingData::RandomContainerName(rng_);
+    auto non_empty_container_client = CreateContainer(non_empty_container);
+    CreateBlob(non_empty_container_client, "object1", 
PreexistingData::kLoremIpsum);
+    ASSERT_MOVE(data.container_name, non_empty_container, ENOTEMPTY);
+    // Renaming to an empty container fails to replace it
+    auto empty_container = PreexistingData::RandomContainerName(rng_);
+    auto empty_container_client = CreateContainer(empty_container);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError,
+        ::testing::HasSubstr("Unable to replace empty container: '" + 
empty_container +
+                             "'"),
+        fs()->Move(data.container_name, empty_container));
+    // Renaming to a non-existing container creates it
+    auto new_container = PreexistingData::RandomContainerName(rng_);

Review Comment:
   How about using `nonexistent_container` or something?
   I feel that `new_container` doesn't imply that it doesn't exist yet.



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1690,17 +1922,413 @@ class AzureFileSystem::Impl {
     }
   }
 
+  /// \brief Create a BlobLeaseClient and acquire a lease on the container.
+  ///
+  /// \param allow_missing_container if true, a nullptr may be returned when 
the container
+  /// doesn't exist, otherwise a PathNotFound(location) error is produced 
right away
+  /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and
+  /// optional (nullptr denotes container not found)
+  Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireContainerLease(
+      const AzureLocation& location, std::chrono::seconds lease_duration,
+      bool allow_missing_container = false, bool retry_allowed = true) {
+    DCHECK(!location.container.empty());
+    auto container_client = GetBlobContainerClient(location.container);
+    auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId();
+    auto container_url = container_client.GetUrl();
+    auto lease_client = std::make_unique<Blobs::BlobLeaseClient>(
+        std::move(container_client), std::move(lease_id));
+    try {
+      auto result = lease_client->Acquire(lease_duration);
+      DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId());
+    } catch (const Storage::StorageException& exception) {
+      if (IsContainerNotFound(exception)) {
+        if (allow_missing_container) {
+          return nullptr;
+        }
+        return PathNotFound(location);
+      } else if (exception.StatusCode == Http::HttpStatusCode::Conflict &&
+                 exception.ErrorCode == "LeaseAlreadyPresent") {
+        if (retry_allowed) {
+          LeaseGuard::WaitUntilLatestKnownExpiryTime();
+          return AcquireContainerLease(location, lease_duration, 
allow_missing_container,
+                                       /*retry_allowed=*/false);
+        }
+      }
+      return ExceptionToStatus(exception, "Failed to acquire a lease on 
container '",
+                               location.container, "': ", container_url);
+    }
+    return lease_client;
+  }
+
+  /// \brief Create a BlobLeaseClient and acquire a lease on a blob/file (or
+  /// directory if Hierarchical Namespace is supported).
+  ///
+  /// \param allow_missing if true, a nullptr may be returned when the blob
+  /// doesn't exist, otherwise a PathNotFound(location) error is produced 
right away
+  /// \return A BlobLeaseClient is wrapped as a unique_ptr so it's moveable and
+  /// optional (nullptr denotes blob not found)
+  Result<std::unique_ptr<Blobs::BlobLeaseClient>> AcquireBlobLease(
+      const AzureLocation& location, std::chrono::seconds lease_duration,
+      bool allow_missing = false, bool retry_allowed = true) {
+    DCHECK(!location.container.empty() && !location.path.empty());
+    auto path = std::string{internal::RemoveTrailingSlash(location.path)};
+    auto blob_client = GetBlobClient(location.container, std::move(path));
+    auto lease_id = Blobs::BlobLeaseClient::CreateUniqueLeaseId();
+    auto blob_url = blob_client.GetUrl();
+    auto lease_client = 
std::make_unique<Blobs::BlobLeaseClient>(std::move(blob_client),
+                                                                 
std::move(lease_id));
+    try {
+      [[maybe_unused]] auto result = lease_client->Acquire(lease_duration);
+      DCHECK_EQ(result.Value.LeaseId, lease_client->GetLeaseId());
+    } catch (const Storage::StorageException& exception) {
+      if (exception.StatusCode == Http::HttpStatusCode::NotFound) {
+        if (allow_missing) {
+          return nullptr;
+        }
+        return PathNotFound(location);
+      } else if (exception.StatusCode == Http::HttpStatusCode::Conflict &&
+                 exception.ErrorCode == "LeaseAlreadyPresent") {
+        if (retry_allowed) {
+          LeaseGuard::WaitUntilLatestKnownExpiryTime();
+          return AcquireBlobLease(location, lease_duration, allow_missing,
+                                  /*retry_allowed=*/false);
+        }
+      }
+      return ExceptionToStatus(exception, "Failed to acquire a lease on file 
'",
+                               location.all, "': ", blob_url);
+    }
+    return lease_client;
+  }
+
+  static constexpr auto kLeaseDuration = std::chrono::seconds{15};
+  static constexpr auto kTimeNeededForContainerDeletion = 
std::chrono::seconds{3};
+  static constexpr auto kTimeNeededForContainerRename = 
std::chrono::seconds{3};
+  static constexpr auto kTimeNeededForFileOrDirectoryRename = 
std::chrono::seconds{3};
+  static constexpr auto kTimeNeededForEmptyDirectoryDeletion = 
std::chrono::seconds{3};
+
+  /// The conditions for a successful container rename are derived from the
+  /// conditions for a successful `Move("/$src.container", 
"/$dest.container")`.
+  /// The numbers here match the list in `Moove`.

Review Comment:
   ```suggestion
     /// The numbers here match the list in `Move`.
   ```



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -854,6 +888,397 @@ class TestAzureFileSystem : public ::testing::Test {
     const auto directory_path = data.RandomDirectoryPath(rng_);
     ASSERT_RAISES(IOError, fs()->DeleteDirContents(directory_path, false));
   }
+
+ private:
+  using StringMatcher =
+      
::testing::PolymorphicMatcher<::testing::internal::HasSubstrMatcher<std::string>>;
+
+  StringMatcher HasDirMoveToSubdirMessage(const std::string& src,
+                                          const std::string& dest) {
+    return ::testing::HasSubstr("Cannot Move to '" + dest + "' and make '" + 
src +
+                                "' a sub-directory of itself.");
+  }
+
+  StringMatcher HasCrossContainerNotImplementedMessage(const std::string& 
container_name,
+                                                       const std::string& 
dest) {
+    return ::testing::HasSubstr("Move of '" + container_name + "' to '" + dest 
+
+                                "' requires moving data between "
+                                "containers, which is not implemented.");
+  }
+
+  StringMatcher HasMissingParentDirMessage(const std::string& dest) {
+    return ::testing::HasSubstr("The parent directory of the destination path 
'" + dest +
+                                "' does not exist.");
+  }
+
+  /// \brief Expected POSIX semantics for the rename operation on multiple
+  /// scenarios.
+  ///
+  /// If the src doesn't exist, the error is always ENOENT, otherwise we are
+  /// left with the following combinations:
+  ///
+  /// 1. src's type
+  ///    a. File
+  ///    b. Directory
+  /// 2. dest's existence
+  ///    a. NotFound
+  ///    b. File
+  ///    c. Directory
+  ///       - empty
+  ///       - non-empty
+  /// 3. src path has a trailing slash (or not)
+  /// 4. dest path has a trailing slash (or not)
+  ///
+  /// Limitations: this function doesn't consider paths so it assumes that the
+  /// paths don't lead requests for moves that would make the source a subdir 
of
+  /// the destination.
+  ///
+  /// \param paths_are_equal src and dest paths without trailing slashes are 
equal
+  /// \return std::nullopt if success is expected in the scenario or the errno
+  /// if failure is expected.
+  static std::optional<int> RenameSemantics(FileType src_type, bool 
src_trailing_slash,
+                                            FileType dest_type, bool 
dest_trailing_slash,
+                                            bool dest_is_empty_dir = false,
+                                            bool paths_are_equal = false) {
+    DCHECK(src_type != FileType::Unknown && dest_type != FileType::Unknown);
+    DCHECK(!dest_is_empty_dir || dest_type == FileType::Directory)
+        << "dest_is_empty_dir must imply dest_type == FileType::Directory";
+    switch (src_type) {
+      case FileType::Unknown:
+        break;
+      case FileType::NotFound:
+        return {ENOENT};
+      case FileType::File:
+        switch (dest_type) {
+          case FileType::Unknown:
+            break;
+          case FileType::NotFound:
+            if (src_trailing_slash) {
+              return {ENOTDIR};
+            }
+            if (dest_trailing_slash) {
+              // A slash on the destination path requires that it exists,
+              // so a confirmation that it's a directory can be performed.
+              return {ENOENT};
+            }
+            return {};
+          case FileType::File:
+            if (src_trailing_slash || dest_trailing_slash) {
+              return {ENOTDIR};
+            }
+            // The existing file is replaced successfuly.
+            return {};
+          case FileType::Directory:
+            if (src_trailing_slash) {
+              return {ENOTDIR};
+            }
+            return EISDIR;
+        }
+        break;
+      case FileType::Directory:
+        switch (dest_type) {
+          case FileType::Unknown:
+            break;
+          case FileType::NotFound:
+            // We don't have to care about the slashes when the source is a 
directory.
+            return {};
+          case FileType::File:
+            return {ENOTDIR};
+          case FileType::Directory:
+            if (!paths_are_equal && !dest_is_empty_dir) {
+              return {ENOTEMPTY};
+            }
+            return {};
+        }
+        break;
+    }
+    Unreachable("Invalid parameters passed to RenameSemantics");
+  }
+
+  Status CheckExpectedErrno(const std::string& src, const std::string& dest,
+                            std::optional<int> expected_errno,
+                            const char* expected_errno_name, FileInfo* 
out_src_info) {
+    auto the_fs = fs();
+    const bool src_trailing_slash = internal::HasTrailingSlash(src);
+    const bool dest_trailing_slash = internal::HasTrailingSlash(dest);
+    const auto src_path = std::string{internal::RemoveTrailingSlash(src)};
+    const auto dest_path = std::string{internal::RemoveTrailingSlash(dest)};
+    ARROW_ASSIGN_OR_RAISE(*out_src_info, the_fs->GetFileInfo(src_path));
+    ARROW_ASSIGN_OR_RAISE(auto dest_info, the_fs->GetFileInfo(dest_path));
+    bool dest_is_empty_dir = false;
+    if (dest_info.type() == FileType::Directory) {
+      FileSelector select;
+      select.base_dir = dest_path;
+      select.recursive = false;
+      // TODO(felipecrv): investigate why this can't be false
+      select.allow_not_found = true;
+      ARROW_ASSIGN_OR_RAISE(auto dest_contents, the_fs->GetFileInfo(select));
+      if (dest_contents.empty()) {
+        dest_is_empty_dir = true;
+      }
+    }
+    auto paths_are_equal = src_path == dest_path;
+    auto truly_expected_errno =
+        RenameSemantics(out_src_info->type(), src_trailing_slash, 
dest_info.type(),
+                        dest_trailing_slash, dest_is_empty_dir, 
paths_are_equal);
+    if (truly_expected_errno != expected_errno) {
+      if (expected_errno.has_value()) {
+        return Status::Invalid("expected_errno=", expected_errno_name, "=",
+                               *expected_errno,
+                               " used in ASSERT_MOVE is incorrect. "
+                               "POSIX semantics for this scenario require 
errno=",
+                               strerror(truly_expected_errno.value_or(0)));
+      } else {
+        DCHECK(truly_expected_errno.has_value());
+        return Status::Invalid(
+            "ASSERT_MOVE used to assert success in a scenario for which "
+            "POSIX semantics requires errno=",
+            strerror(*truly_expected_errno));
+      }
+    }
+    return Status::OK();
+  }
+
+  void AssertAfterMove(const std::string& src, const std::string& dest, 
FileType type) {
+    if (internal::RemoveTrailingSlash(src) != 
internal::RemoveTrailingSlash(dest)) {
+      AssertFileInfo(fs(), src, FileType::NotFound);
+    }
+    AssertFileInfo(fs(), dest, type);
+  }
+
+  static bool WithErrno(const Status& status, int expected_errno) {
+    auto* detail = status.detail().get();
+    return detail &&
+           arrow::internal::ErrnoFromStatusDetail(*detail).value_or(-1) == 
expected_errno;
+  }
+
+  std::optional<StringMatcher> MoveErrorMessageMatcher(const FileInfo& 
src_info,
+                                                       const std::string& src,
+                                                       const std::string& dest,
+                                                       int for_errno) {
+    switch (for_errno) {
+      case ENOENT: {
+        auto& path = src_info.type() == FileType::NotFound ? src : dest;
+        return ::testing::HasSubstr("Path does not exist '" + path + "'");
+      }
+      case ENOTEMPTY:
+        return ::testing::HasSubstr("Directory not empty: '" + dest + "'");
+    }
+    return std::nullopt;
+  }
+
+#define ASSERT_MOVE(src, dest, expected_errno)                                 
         \
+  do {                                                                         
         \
+    auto _src = (src);                                                         
         \
+    auto _dest = (dest);                                                       
         \
+    std::optional<int> _expected_errno = (expected_errno);                     
         \
+    FileInfo _src_info;                                                        
         \
+    ASSERT_OK(                                                                 
         \
+        CheckExpectedErrno(_src, _dest, _expected_errno, #expected_errno, 
&_src_info)); \
+    auto _move_st = ::arrow::internal::GenericToStatus(fs()->Move(_src, 
_dest));        \
+    if (_expected_errno.has_value()) {                                         
         \
+      if (WithErrno(_move_st, *_expected_errno)) {                             
         \
+        /* If the Move failed, the source should remain unchanged. */          
         \
+        AssertFileInfo(fs(), std::string{internal::RemoveTrailingSlash(_src)}, 
         \
+                       _src_info.type());                                      
         \
+        auto _message_matcher =                                                
         \
+            MoveErrorMessageMatcher(_src_info, _src, _dest, *_expected_errno); 
         \
+        if (_message_matcher.has_value()) {                                    
         \
+          EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, *_message_matcher, 
_move_st);        \
+        } else {                                                               
         \
+          SUCCEED();                                                           
         \
+        }                                                                      
         \
+      } else {                                                                 
         \
+        FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) 
         \
+               << "' did not fail with errno=" << #expected_errno;             
         \
+      }                                                                        
         \
+    } else {                                                                   
         \
+      if (!_move_st.ok()) {                                                    
         \
+        FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) 
         \
+               << "' failed with " << _move_st.ToString();                     
         \
+      } else {                                                                 
         \
+        AssertAfterMove(_src, _dest, _src_info.type());                        
         \
+      }                                                                        
         \
+    }                                                                          
         \
+  } while (false)
+
+#define ASSERT_MOVE_OK(src, dest) ASSERT_MOVE((src), (dest), std::nullopt)
+
+  // Tests for Move()
+
+ public:
+  void TestRenameContainer() {
+    EXPECT_OK_AND_ASSIGN(auto env, GetAzureEnv());
+    auto data = SetUpPreexistingData();
+    // Container exists, so renaming to the same name succeeds because it's a 
no-op.
+    ASSERT_MOVE_OK(data.container_name, data.container_name);
+    // Renaming a container that doesn't exist fails.
+    ASSERT_MOVE("missing-container", "missing-container", ENOENT);
+    ASSERT_MOVE("missing-container", data.container_name, ENOENT);
+    // Renaming a container to an existing non-empty container fails.
+    auto non_empty_container = PreexistingData::RandomContainerName(rng_);
+    auto non_empty_container_client = CreateContainer(non_empty_container);
+    CreateBlob(non_empty_container_client, "object1", 
PreexistingData::kLoremIpsum);
+    ASSERT_MOVE(data.container_name, non_empty_container, ENOTEMPTY);
+    // Renaming to an empty container fails to replace it
+    auto empty_container = PreexistingData::RandomContainerName(rng_);
+    auto empty_container_client = CreateContainer(empty_container);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError,
+        ::testing::HasSubstr("Unable to replace empty container: '" + 
empty_container +
+                             "'"),
+        fs()->Move(data.container_name, empty_container));
+    // Renaming to a non-existing container creates it
+    auto new_container = PreexistingData::RandomContainerName(rng_);
+    AssertFileInfo(fs(), new_container, FileType::NotFound);
+    if (env->backend() == AzureBackend::kAzurite) {
+      // Azurite returns a 201 Created for RenameBlobContainer, but the created
+      // container doesn't contain the blobs from the source container and
+      // the source container reamins undeleted after the "rename".
+    } else {
+      // See Azure SDK issue/question:
+      // https://github.com/Azure/azure-sdk-for-cpp/issues/5262
+      EXPECT_RAISES_WITH_MESSAGE_THAT(
+          IOError,
+          ::testing::HasSubstr("The 'rename' operation is not supported on 
containers."),
+          fs()->Move(data.container_name, new_container));
+      // ASSERT_MOVE_OK(data.container_name, new_container);
+      // AssertFileInfo(fs(),
+      //                ConcatAbstractPath(new_container, 
PreexistingData::kObjectName),
+      //                FileType::File);
+    }
+    // Renaming to an empty container can work if the source is also empty
+    auto new_empty_container = PreexistingData::RandomContainerName(rng_);
+    auto new_empty_container_client = CreateContainer(new_empty_container);
+    ASSERT_MOVE_OK(empty_container, new_empty_container);
+  }
+
+  void TestMoveContainerToPath() {
+    auto data = SetUpPreexistingData();
+    ASSERT_MOVE("missing-container", data.ContainerPath("new-subdir"), ENOENT);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid,
+        HasDirMoveToSubdirMessage(data.container_name, 
data.ContainerPath("new-subdir")),
+        fs()->Move(data.container_name, data.ContainerPath("new-subdir")));
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        NotImplemented,
+        HasCrossContainerNotImplementedMessage(data.container_name,
+                                               "a-container/new-subdir"),
+        fs()->Move(data.container_name, "a-container/new-subdir"));
+  }
+
+  void TestCreateContainerFromPath() {
+    auto data = SetUpPreexistingData();
+    auto missing_path = data.RandomDirectoryPath(rng_);
+    ASSERT_MOVE(missing_path, "new-container", ENOENT);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid,
+        ::testing::HasSubstr("Creating files at '/' is not possible, only 
directories."),
+        fs()->Move(data.ObjectPath(), "new-file"));
+    auto src_dir_path = data.RandomDirectoryPath(rng_);
+    ASSERT_OK(fs()->CreateDir(src_dir_path, false));
+    EXPECT_OK_AND_ASSIGN(auto src_dir_info, fs()->GetFileInfo(src_dir_path));
+    EXPECT_EQ(src_dir_info.type(), FileType::Directory);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        NotImplemented,
+        HasCrossContainerNotImplementedMessage(src_dir_path, "new-container"),
+        fs()->Move(src_dir_path, "new-container"));
+  }
+
+  void TestMovePaths() {
+    Status st;
+    auto data = SetUpPreexistingData();
+    // When source doesn't exist.
+    ASSERT_MOVE("missing-container/src-path", "a-container/dest-path", ENOENT);
+    auto missing_path1 = data.RandomDirectoryPath(rng_);
+    ASSERT_MOVE(missing_path1, "missing-container/path", ENOENT);
+
+    // But when source exists...
+    if (!WithHierarchicalNamespace()) {
+      // ...and containers are different, we get an error message telling 
cross-container
+      // moves are not implemented.
+      EXPECT_RAISES_WITH_MESSAGE_THAT(
+          NotImplemented,
+          HasCrossContainerNotImplementedMessage(data.ObjectPath(),
+                                                 "missing-container/path"),
+          fs()->Move(data.ObjectPath(), "missing-container/path"));
+      GTEST_SKIP()
+          << "The rest of TestMovePaths is not implemented for non-HNS 
scenarios";
+    }
+    auto adlfs_client =
+        datalake_service_client_->GetFileSystemClient(data.container_name);
+    // ...and dest.container doesn't exist.
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError, HasMissingParentDirMessage("missing-container/path"),
+        fs()->Move(data.ObjectPath(), "missing-container/path"));
+    AssertFileInfo(fs(), data.ObjectPath(), FileType::File);
+
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError, HasMissingParentDirMessage(data.Path("missing-subdir/file")),
+        fs()->Move(data.ObjectPath(), data.Path("missing-subdir/file")));
+    AssertFileInfo(fs(), data.ObjectPath(), FileType::File);
+
+    // src is a file and dest does not exists
+    ASSERT_MOVE_OK(data.ObjectPath(), data.Path("file0"));
+    ASSERT_MOVE(data.Path("file0/"), data.Path("file1"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file0"), data.Path("file1/"), ENOENT);
+    ASSERT_MOVE(data.Path("file0/"), data.Path("file1/"), ENOTDIR);
+    // "file0" exists
+
+    // src is a file and dest exists (as a file)
+    CreateFile(adlfs_client, PreexistingData::kObjectName, 
PreexistingData::kLoremIpsum);
+    CreateFile(adlfs_client, "file1", PreexistingData::kLoremIpsum);
+    ASSERT_MOVE_OK(data.ObjectPath(), data.Path("file0"));
+    ASSERT_MOVE(data.Path("file1/"), data.Path("file0"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file1"), data.Path("file0/"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file1/"), data.Path("file0/"), ENOTDIR);
+    // "file1" and "file2" exist

Review Comment:
   ```suggestion
       // "file0" and "file1" exist
   ```



##########
cpp/src/arrow/filesystem/azurefs.h:
##########
@@ -210,6 +210,25 @@ class ARROW_EXPORT AzureFileSystem : public FileSystem {
 
   Status DeleteFile(const std::string& path) override;
 
+  /// \brief Move / rename a file or directory.
+  ///
+  /// There are no files immediately at the root directory, so paths like
+  /// "/segment" always refer to a container of the storage account and are
+  /// treated as directories.
+  ///
+  /// If `dest` exists but the operation fails for some reason, `Move`
+  /// guarantees `dest` is not lost.
+  ///
+  /// Conditions for a successful move:
+  /// 1. `src` must exist.
+  /// 2. `dest` can't contain a strict path prefix of `src`. More generally,
+  ///    a directory can't be made a subdirectory of itself.
+  /// 3. If `dest` already exists and its a file, `src` must also be a file.
+  ///    `dest` is then replaced by `src`.
+  /// 4. All components of `dest` must exist, except for the last.
+  /// 5. If `dest` already exists and its a directory, `src` must also be a

Review Comment:
   ```suggestion
     /// 5. If `dest` already exists and it's a directory, `src` must also be a
   ```



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -854,6 +888,397 @@ class TestAzureFileSystem : public ::testing::Test {
     const auto directory_path = data.RandomDirectoryPath(rng_);
     ASSERT_RAISES(IOError, fs()->DeleteDirContents(directory_path, false));
   }
+
+ private:
+  using StringMatcher =
+      
::testing::PolymorphicMatcher<::testing::internal::HasSubstrMatcher<std::string>>;
+
+  StringMatcher HasDirMoveToSubdirMessage(const std::string& src,
+                                          const std::string& dest) {
+    return ::testing::HasSubstr("Cannot Move to '" + dest + "' and make '" + 
src +
+                                "' a sub-directory of itself.");
+  }
+
+  StringMatcher HasCrossContainerNotImplementedMessage(const std::string& 
container_name,
+                                                       const std::string& 
dest) {
+    return ::testing::HasSubstr("Move of '" + container_name + "' to '" + dest 
+
+                                "' requires moving data between "
+                                "containers, which is not implemented.");
+  }
+
+  StringMatcher HasMissingParentDirMessage(const std::string& dest) {
+    return ::testing::HasSubstr("The parent directory of the destination path 
'" + dest +
+                                "' does not exist.");
+  }
+
+  /// \brief Expected POSIX semantics for the rename operation on multiple
+  /// scenarios.
+  ///
+  /// If the src doesn't exist, the error is always ENOENT, otherwise we are
+  /// left with the following combinations:
+  ///
+  /// 1. src's type
+  ///    a. File
+  ///    b. Directory
+  /// 2. dest's existence
+  ///    a. NotFound
+  ///    b. File
+  ///    c. Directory
+  ///       - empty
+  ///       - non-empty
+  /// 3. src path has a trailing slash (or not)
+  /// 4. dest path has a trailing slash (or not)
+  ///
+  /// Limitations: this function doesn't consider paths so it assumes that the
+  /// paths don't lead requests for moves that would make the source a subdir 
of
+  /// the destination.
+  ///
+  /// \param paths_are_equal src and dest paths without trailing slashes are 
equal
+  /// \return std::nullopt if success is expected in the scenario or the errno
+  /// if failure is expected.
+  static std::optional<int> RenameSemantics(FileType src_type, bool 
src_trailing_slash,
+                                            FileType dest_type, bool 
dest_trailing_slash,
+                                            bool dest_is_empty_dir = false,
+                                            bool paths_are_equal = false) {
+    DCHECK(src_type != FileType::Unknown && dest_type != FileType::Unknown);
+    DCHECK(!dest_is_empty_dir || dest_type == FileType::Directory)
+        << "dest_is_empty_dir must imply dest_type == FileType::Directory";
+    switch (src_type) {
+      case FileType::Unknown:
+        break;
+      case FileType::NotFound:
+        return {ENOENT};
+      case FileType::File:
+        switch (dest_type) {
+          case FileType::Unknown:
+            break;
+          case FileType::NotFound:
+            if (src_trailing_slash) {
+              return {ENOTDIR};
+            }
+            if (dest_trailing_slash) {
+              // A slash on the destination path requires that it exists,
+              // so a confirmation that it's a directory can be performed.
+              return {ENOENT};
+            }
+            return {};
+          case FileType::File:
+            if (src_trailing_slash || dest_trailing_slash) {
+              return {ENOTDIR};
+            }
+            // The existing file is replaced successfuly.
+            return {};
+          case FileType::Directory:
+            if (src_trailing_slash) {
+              return {ENOTDIR};
+            }
+            return EISDIR;
+        }
+        break;
+      case FileType::Directory:
+        switch (dest_type) {
+          case FileType::Unknown:
+            break;
+          case FileType::NotFound:
+            // We don't have to care about the slashes when the source is a 
directory.
+            return {};
+          case FileType::File:
+            return {ENOTDIR};
+          case FileType::Directory:
+            if (!paths_are_equal && !dest_is_empty_dir) {
+              return {ENOTEMPTY};
+            }
+            return {};
+        }
+        break;
+    }
+    Unreachable("Invalid parameters passed to RenameSemantics");
+  }
+
+  Status CheckExpectedErrno(const std::string& src, const std::string& dest,
+                            std::optional<int> expected_errno,
+                            const char* expected_errno_name, FileInfo* 
out_src_info) {
+    auto the_fs = fs();
+    const bool src_trailing_slash = internal::HasTrailingSlash(src);
+    const bool dest_trailing_slash = internal::HasTrailingSlash(dest);
+    const auto src_path = std::string{internal::RemoveTrailingSlash(src)};
+    const auto dest_path = std::string{internal::RemoveTrailingSlash(dest)};
+    ARROW_ASSIGN_OR_RAISE(*out_src_info, the_fs->GetFileInfo(src_path));
+    ARROW_ASSIGN_OR_RAISE(auto dest_info, the_fs->GetFileInfo(dest_path));
+    bool dest_is_empty_dir = false;
+    if (dest_info.type() == FileType::Directory) {
+      FileSelector select;
+      select.base_dir = dest_path;
+      select.recursive = false;
+      // TODO(felipecrv): investigate why this can't be false
+      select.allow_not_found = true;
+      ARROW_ASSIGN_OR_RAISE(auto dest_contents, the_fs->GetFileInfo(select));
+      if (dest_contents.empty()) {
+        dest_is_empty_dir = true;
+      }
+    }
+    auto paths_are_equal = src_path == dest_path;
+    auto truly_expected_errno =
+        RenameSemantics(out_src_info->type(), src_trailing_slash, 
dest_info.type(),
+                        dest_trailing_slash, dest_is_empty_dir, 
paths_are_equal);
+    if (truly_expected_errno != expected_errno) {
+      if (expected_errno.has_value()) {
+        return Status::Invalid("expected_errno=", expected_errno_name, "=",
+                               *expected_errno,
+                               " used in ASSERT_MOVE is incorrect. "
+                               "POSIX semantics for this scenario require 
errno=",
+                               strerror(truly_expected_errno.value_or(0)));
+      } else {
+        DCHECK(truly_expected_errno.has_value());
+        return Status::Invalid(
+            "ASSERT_MOVE used to assert success in a scenario for which "
+            "POSIX semantics requires errno=",
+            strerror(*truly_expected_errno));
+      }
+    }
+    return Status::OK();
+  }
+
+  void AssertAfterMove(const std::string& src, const std::string& dest, 
FileType type) {
+    if (internal::RemoveTrailingSlash(src) != 
internal::RemoveTrailingSlash(dest)) {
+      AssertFileInfo(fs(), src, FileType::NotFound);
+    }
+    AssertFileInfo(fs(), dest, type);
+  }
+
+  static bool WithErrno(const Status& status, int expected_errno) {
+    auto* detail = status.detail().get();
+    return detail &&
+           arrow::internal::ErrnoFromStatusDetail(*detail).value_or(-1) == 
expected_errno;
+  }
+
+  std::optional<StringMatcher> MoveErrorMessageMatcher(const FileInfo& 
src_info,
+                                                       const std::string& src,
+                                                       const std::string& dest,
+                                                       int for_errno) {
+    switch (for_errno) {
+      case ENOENT: {
+        auto& path = src_info.type() == FileType::NotFound ? src : dest;
+        return ::testing::HasSubstr("Path does not exist '" + path + "'");
+      }
+      case ENOTEMPTY:
+        return ::testing::HasSubstr("Directory not empty: '" + dest + "'");
+    }
+    return std::nullopt;
+  }
+
+#define ASSERT_MOVE(src, dest, expected_errno)                                 
         \
+  do {                                                                         
         \
+    auto _src = (src);                                                         
         \
+    auto _dest = (dest);                                                       
         \
+    std::optional<int> _expected_errno = (expected_errno);                     
         \
+    FileInfo _src_info;                                                        
         \
+    ASSERT_OK(                                                                 
         \
+        CheckExpectedErrno(_src, _dest, _expected_errno, #expected_errno, 
&_src_info)); \
+    auto _move_st = ::arrow::internal::GenericToStatus(fs()->Move(_src, 
_dest));        \
+    if (_expected_errno.has_value()) {                                         
         \
+      if (WithErrno(_move_st, *_expected_errno)) {                             
         \
+        /* If the Move failed, the source should remain unchanged. */          
         \
+        AssertFileInfo(fs(), std::string{internal::RemoveTrailingSlash(_src)}, 
         \
+                       _src_info.type());                                      
         \
+        auto _message_matcher =                                                
         \
+            MoveErrorMessageMatcher(_src_info, _src, _dest, *_expected_errno); 
         \
+        if (_message_matcher.has_value()) {                                    
         \
+          EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, *_message_matcher, 
_move_st);        \
+        } else {                                                               
         \
+          SUCCEED();                                                           
         \
+        }                                                                      
         \
+      } else {                                                                 
         \
+        FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) 
         \
+               << "' did not fail with errno=" << #expected_errno;             
         \
+      }                                                                        
         \
+    } else {                                                                   
         \
+      if (!_move_st.ok()) {                                                    
         \
+        FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) 
         \
+               << "' failed with " << _move_st.ToString();                     
         \
+      } else {                                                                 
         \
+        AssertAfterMove(_src, _dest, _src_info.type());                        
         \
+      }                                                                        
         \
+    }                                                                          
         \
+  } while (false)
+
+#define ASSERT_MOVE_OK(src, dest) ASSERT_MOVE((src), (dest), std::nullopt)
+
+  // Tests for Move()
+
+ public:
+  void TestRenameContainer() {
+    EXPECT_OK_AND_ASSIGN(auto env, GetAzureEnv());
+    auto data = SetUpPreexistingData();
+    // Container exists, so renaming to the same name succeeds because it's a 
no-op.
+    ASSERT_MOVE_OK(data.container_name, data.container_name);
+    // Renaming a container that doesn't exist fails.
+    ASSERT_MOVE("missing-container", "missing-container", ENOENT);
+    ASSERT_MOVE("missing-container", data.container_name, ENOENT);
+    // Renaming a container to an existing non-empty container fails.
+    auto non_empty_container = PreexistingData::RandomContainerName(rng_);
+    auto non_empty_container_client = CreateContainer(non_empty_container);
+    CreateBlob(non_empty_container_client, "object1", 
PreexistingData::kLoremIpsum);
+    ASSERT_MOVE(data.container_name, non_empty_container, ENOTEMPTY);
+    // Renaming to an empty container fails to replace it
+    auto empty_container = PreexistingData::RandomContainerName(rng_);
+    auto empty_container_client = CreateContainer(empty_container);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError,
+        ::testing::HasSubstr("Unable to replace empty container: '" + 
empty_container +
+                             "'"),
+        fs()->Move(data.container_name, empty_container));
+    // Renaming to a non-existing container creates it
+    auto new_container = PreexistingData::RandomContainerName(rng_);
+    AssertFileInfo(fs(), new_container, FileType::NotFound);
+    if (env->backend() == AzureBackend::kAzurite) {
+      // Azurite returns a 201 Created for RenameBlobContainer, but the created
+      // container doesn't contain the blobs from the source container and
+      // the source container reamins undeleted after the "rename".
+    } else {
+      // See Azure SDK issue/question:
+      // https://github.com/Azure/azure-sdk-for-cpp/issues/5262
+      EXPECT_RAISES_WITH_MESSAGE_THAT(
+          IOError,
+          ::testing::HasSubstr("The 'rename' operation is not supported on 
containers."),
+          fs()->Move(data.container_name, new_container));
+      // ASSERT_MOVE_OK(data.container_name, new_container);
+      // AssertFileInfo(fs(),
+      //                ConcatAbstractPath(new_container, 
PreexistingData::kObjectName),
+      //                FileType::File);
+    }
+    // Renaming to an empty container can work if the source is also empty
+    auto new_empty_container = PreexistingData::RandomContainerName(rng_);
+    auto new_empty_container_client = CreateContainer(new_empty_container);
+    ASSERT_MOVE_OK(empty_container, new_empty_container);
+  }
+
+  void TestMoveContainerToPath() {
+    auto data = SetUpPreexistingData();
+    ASSERT_MOVE("missing-container", data.ContainerPath("new-subdir"), ENOENT);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid,
+        HasDirMoveToSubdirMessage(data.container_name, 
data.ContainerPath("new-subdir")),
+        fs()->Move(data.container_name, data.ContainerPath("new-subdir")));
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        NotImplemented,
+        HasCrossContainerNotImplementedMessage(data.container_name,
+                                               "a-container/new-subdir"),
+        fs()->Move(data.container_name, "a-container/new-subdir"));
+  }
+
+  void TestCreateContainerFromPath() {
+    auto data = SetUpPreexistingData();
+    auto missing_path = data.RandomDirectoryPath(rng_);
+    ASSERT_MOVE(missing_path, "new-container", ENOENT);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid,
+        ::testing::HasSubstr("Creating files at '/' is not possible, only 
directories."),
+        fs()->Move(data.ObjectPath(), "new-file"));
+    auto src_dir_path = data.RandomDirectoryPath(rng_);
+    ASSERT_OK(fs()->CreateDir(src_dir_path, false));
+    EXPECT_OK_AND_ASSIGN(auto src_dir_info, fs()->GetFileInfo(src_dir_path));
+    EXPECT_EQ(src_dir_info.type(), FileType::Directory);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        NotImplemented,
+        HasCrossContainerNotImplementedMessage(src_dir_path, "new-container"),
+        fs()->Move(src_dir_path, "new-container"));
+  }
+
+  void TestMovePaths() {
+    Status st;
+    auto data = SetUpPreexistingData();
+    // When source doesn't exist.
+    ASSERT_MOVE("missing-container/src-path", "a-container/dest-path", ENOENT);
+    auto missing_path1 = data.RandomDirectoryPath(rng_);
+    ASSERT_MOVE(missing_path1, "missing-container/path", ENOENT);
+
+    // But when source exists...
+    if (!WithHierarchicalNamespace()) {
+      // ...and containers are different, we get an error message telling 
cross-container
+      // moves are not implemented.
+      EXPECT_RAISES_WITH_MESSAGE_THAT(
+          NotImplemented,
+          HasCrossContainerNotImplementedMessage(data.ObjectPath(),
+                                                 "missing-container/path"),
+          fs()->Move(data.ObjectPath(), "missing-container/path"));
+      GTEST_SKIP()
+          << "The rest of TestMovePaths is not implemented for non-HNS 
scenarios";
+    }
+    auto adlfs_client =
+        datalake_service_client_->GetFileSystemClient(data.container_name);
+    // ...and dest.container doesn't exist.
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError, HasMissingParentDirMessage("missing-container/path"),
+        fs()->Move(data.ObjectPath(), "missing-container/path"));
+    AssertFileInfo(fs(), data.ObjectPath(), FileType::File);
+
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError, HasMissingParentDirMessage(data.Path("missing-subdir/file")),
+        fs()->Move(data.ObjectPath(), data.Path("missing-subdir/file")));
+    AssertFileInfo(fs(), data.ObjectPath(), FileType::File);
+
+    // src is a file and dest does not exists
+    ASSERT_MOVE_OK(data.ObjectPath(), data.Path("file0"));
+    ASSERT_MOVE(data.Path("file0/"), data.Path("file1"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file0"), data.Path("file1/"), ENOENT);
+    ASSERT_MOVE(data.Path("file0/"), data.Path("file1/"), ENOTDIR);
+    // "file0" exists
+
+    // src is a file and dest exists (as a file)
+    CreateFile(adlfs_client, PreexistingData::kObjectName, 
PreexistingData::kLoremIpsum);
+    CreateFile(adlfs_client, "file1", PreexistingData::kLoremIpsum);
+    ASSERT_MOVE_OK(data.ObjectPath(), data.Path("file0"));
+    ASSERT_MOVE(data.Path("file1/"), data.Path("file0"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file1"), data.Path("file0/"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file1/"), data.Path("file0/"), ENOTDIR);
+    // "file1" and "file2" exist
+
+    // src is a file and dest exists (as an empty dir)
+    CreateDirectory(adlfs_client, "subdir0");
+    ASSERT_MOVE(data.Path("file0"), data.Path("subdir0"), EISDIR);
+    ASSERT_MOVE(data.Path("file0/"), data.Path("subdir0"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file0"), data.Path("subdir0/"), EISDIR);
+    ASSERT_MOVE(data.Path("file0/"), data.Path("subdir0/"), ENOTDIR);
+
+    // src is a file and dest exists (as a non-empty dir)
+    CreateFile(adlfs_client, "subdir0/file-at-subdir");
+    ASSERT_MOVE(data.Path("file0"), data.Path("subdir0"), EISDIR);
+    ASSERT_MOVE(data.Path("file0/"), data.Path("subdir0"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file0"), data.Path("subdir0/"), EISDIR);
+    ASSERT_MOVE(data.Path("file0/"), data.Path("subdir0/"), ENOTDIR);
+    // "subdir0/file-at-subdir" exists
+
+    // src is a directory and dest does not exists
+    CreateDirectory(adlfs_client, "subdir0");

Review Comment:
   It seems that this is needless.



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -854,6 +888,397 @@ class TestAzureFileSystem : public ::testing::Test {
     const auto directory_path = data.RandomDirectoryPath(rng_);
     ASSERT_RAISES(IOError, fs()->DeleteDirContents(directory_path, false));
   }
+
+ private:
+  using StringMatcher =
+      
::testing::PolymorphicMatcher<::testing::internal::HasSubstrMatcher<std::string>>;
+
+  StringMatcher HasDirMoveToSubdirMessage(const std::string& src,
+                                          const std::string& dest) {
+    return ::testing::HasSubstr("Cannot Move to '" + dest + "' and make '" + 
src +
+                                "' a sub-directory of itself.");
+  }
+
+  StringMatcher HasCrossContainerNotImplementedMessage(const std::string& 
container_name,
+                                                       const std::string& 
dest) {
+    return ::testing::HasSubstr("Move of '" + container_name + "' to '" + dest 
+
+                                "' requires moving data between "
+                                "containers, which is not implemented.");
+  }
+
+  StringMatcher HasMissingParentDirMessage(const std::string& dest) {
+    return ::testing::HasSubstr("The parent directory of the destination path 
'" + dest +
+                                "' does not exist.");
+  }
+
+  /// \brief Expected POSIX semantics for the rename operation on multiple
+  /// scenarios.
+  ///
+  /// If the src doesn't exist, the error is always ENOENT, otherwise we are
+  /// left with the following combinations:
+  ///
+  /// 1. src's type
+  ///    a. File
+  ///    b. Directory
+  /// 2. dest's existence
+  ///    a. NotFound
+  ///    b. File
+  ///    c. Directory
+  ///       - empty
+  ///       - non-empty
+  /// 3. src path has a trailing slash (or not)
+  /// 4. dest path has a trailing slash (or not)
+  ///
+  /// Limitations: this function doesn't consider paths so it assumes that the
+  /// paths don't lead requests for moves that would make the source a subdir 
of
+  /// the destination.
+  ///
+  /// \param paths_are_equal src and dest paths without trailing slashes are 
equal
+  /// \return std::nullopt if success is expected in the scenario or the errno
+  /// if failure is expected.
+  static std::optional<int> RenameSemantics(FileType src_type, bool 
src_trailing_slash,
+                                            FileType dest_type, bool 
dest_trailing_slash,
+                                            bool dest_is_empty_dir = false,
+                                            bool paths_are_equal = false) {
+    DCHECK(src_type != FileType::Unknown && dest_type != FileType::Unknown);
+    DCHECK(!dest_is_empty_dir || dest_type == FileType::Directory)
+        << "dest_is_empty_dir must imply dest_type == FileType::Directory";
+    switch (src_type) {
+      case FileType::Unknown:
+        break;
+      case FileType::NotFound:
+        return {ENOENT};
+      case FileType::File:
+        switch (dest_type) {
+          case FileType::Unknown:
+            break;
+          case FileType::NotFound:
+            if (src_trailing_slash) {
+              return {ENOTDIR};
+            }
+            if (dest_trailing_slash) {
+              // A slash on the destination path requires that it exists,
+              // so a confirmation that it's a directory can be performed.
+              return {ENOENT};
+            }
+            return {};
+          case FileType::File:
+            if (src_trailing_slash || dest_trailing_slash) {
+              return {ENOTDIR};
+            }
+            // The existing file is replaced successfuly.
+            return {};
+          case FileType::Directory:
+            if (src_trailing_slash) {
+              return {ENOTDIR};
+            }
+            return EISDIR;
+        }
+        break;
+      case FileType::Directory:
+        switch (dest_type) {
+          case FileType::Unknown:
+            break;
+          case FileType::NotFound:
+            // We don't have to care about the slashes when the source is a 
directory.
+            return {};
+          case FileType::File:
+            return {ENOTDIR};
+          case FileType::Directory:
+            if (!paths_are_equal && !dest_is_empty_dir) {
+              return {ENOTEMPTY};
+            }
+            return {};
+        }
+        break;
+    }
+    Unreachable("Invalid parameters passed to RenameSemantics");
+  }
+
+  Status CheckExpectedErrno(const std::string& src, const std::string& dest,
+                            std::optional<int> expected_errno,
+                            const char* expected_errno_name, FileInfo* 
out_src_info) {
+    auto the_fs = fs();
+    const bool src_trailing_slash = internal::HasTrailingSlash(src);
+    const bool dest_trailing_slash = internal::HasTrailingSlash(dest);
+    const auto src_path = std::string{internal::RemoveTrailingSlash(src)};
+    const auto dest_path = std::string{internal::RemoveTrailingSlash(dest)};
+    ARROW_ASSIGN_OR_RAISE(*out_src_info, the_fs->GetFileInfo(src_path));
+    ARROW_ASSIGN_OR_RAISE(auto dest_info, the_fs->GetFileInfo(dest_path));
+    bool dest_is_empty_dir = false;
+    if (dest_info.type() == FileType::Directory) {
+      FileSelector select;
+      select.base_dir = dest_path;
+      select.recursive = false;
+      // TODO(felipecrv): investigate why this can't be false
+      select.allow_not_found = true;
+      ARROW_ASSIGN_OR_RAISE(auto dest_contents, the_fs->GetFileInfo(select));
+      if (dest_contents.empty()) {
+        dest_is_empty_dir = true;
+      }
+    }
+    auto paths_are_equal = src_path == dest_path;
+    auto truly_expected_errno =
+        RenameSemantics(out_src_info->type(), src_trailing_slash, 
dest_info.type(),
+                        dest_trailing_slash, dest_is_empty_dir, 
paths_are_equal);
+    if (truly_expected_errno != expected_errno) {
+      if (expected_errno.has_value()) {
+        return Status::Invalid("expected_errno=", expected_errno_name, "=",
+                               *expected_errno,
+                               " used in ASSERT_MOVE is incorrect. "
+                               "POSIX semantics for this scenario require 
errno=",
+                               strerror(truly_expected_errno.value_or(0)));
+      } else {
+        DCHECK(truly_expected_errno.has_value());
+        return Status::Invalid(
+            "ASSERT_MOVE used to assert success in a scenario for which "
+            "POSIX semantics requires errno=",
+            strerror(*truly_expected_errno));
+      }
+    }
+    return Status::OK();
+  }
+
+  void AssertAfterMove(const std::string& src, const std::string& dest, 
FileType type) {
+    if (internal::RemoveTrailingSlash(src) != 
internal::RemoveTrailingSlash(dest)) {
+      AssertFileInfo(fs(), src, FileType::NotFound);
+    }
+    AssertFileInfo(fs(), dest, type);
+  }
+
+  static bool WithErrno(const Status& status, int expected_errno) {
+    auto* detail = status.detail().get();
+    return detail &&
+           arrow::internal::ErrnoFromStatusDetail(*detail).value_or(-1) == 
expected_errno;
+  }
+
+  std::optional<StringMatcher> MoveErrorMessageMatcher(const FileInfo& 
src_info,
+                                                       const std::string& src,
+                                                       const std::string& dest,
+                                                       int for_errno) {
+    switch (for_errno) {
+      case ENOENT: {
+        auto& path = src_info.type() == FileType::NotFound ? src : dest;
+        return ::testing::HasSubstr("Path does not exist '" + path + "'");
+      }
+      case ENOTEMPTY:
+        return ::testing::HasSubstr("Directory not empty: '" + dest + "'");
+    }
+    return std::nullopt;
+  }
+
+#define ASSERT_MOVE(src, dest, expected_errno)                                 
         \
+  do {                                                                         
         \
+    auto _src = (src);                                                         
         \
+    auto _dest = (dest);                                                       
         \
+    std::optional<int> _expected_errno = (expected_errno);                     
         \
+    FileInfo _src_info;                                                        
         \
+    ASSERT_OK(                                                                 
         \
+        CheckExpectedErrno(_src, _dest, _expected_errno, #expected_errno, 
&_src_info)); \
+    auto _move_st = ::arrow::internal::GenericToStatus(fs()->Move(_src, 
_dest));        \
+    if (_expected_errno.has_value()) {                                         
         \
+      if (WithErrno(_move_st, *_expected_errno)) {                             
         \
+        /* If the Move failed, the source should remain unchanged. */          
         \
+        AssertFileInfo(fs(), std::string{internal::RemoveTrailingSlash(_src)}, 
         \
+                       _src_info.type());                                      
         \
+        auto _message_matcher =                                                
         \
+            MoveErrorMessageMatcher(_src_info, _src, _dest, *_expected_errno); 
         \
+        if (_message_matcher.has_value()) {                                    
         \
+          EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, *_message_matcher, 
_move_st);        \
+        } else {                                                               
         \
+          SUCCEED();                                                           
         \
+        }                                                                      
         \
+      } else {                                                                 
         \
+        FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) 
         \
+               << "' did not fail with errno=" << #expected_errno;             
         \
+      }                                                                        
         \
+    } else {                                                                   
         \
+      if (!_move_st.ok()) {                                                    
         \
+        FAIL() << "Move '" ARROW_STRINGIFY(src) "' to '" ARROW_STRINGIFY(dest) 
         \
+               << "' failed with " << _move_st.ToString();                     
         \
+      } else {                                                                 
         \
+        AssertAfterMove(_src, _dest, _src_info.type());                        
         \
+      }                                                                        
         \
+    }                                                                          
         \
+  } while (false)
+
+#define ASSERT_MOVE_OK(src, dest) ASSERT_MOVE((src), (dest), std::nullopt)
+
+  // Tests for Move()
+
+ public:
+  void TestRenameContainer() {
+    EXPECT_OK_AND_ASSIGN(auto env, GetAzureEnv());
+    auto data = SetUpPreexistingData();
+    // Container exists, so renaming to the same name succeeds because it's a 
no-op.
+    ASSERT_MOVE_OK(data.container_name, data.container_name);
+    // Renaming a container that doesn't exist fails.
+    ASSERT_MOVE("missing-container", "missing-container", ENOENT);
+    ASSERT_MOVE("missing-container", data.container_name, ENOENT);
+    // Renaming a container to an existing non-empty container fails.
+    auto non_empty_container = PreexistingData::RandomContainerName(rng_);
+    auto non_empty_container_client = CreateContainer(non_empty_container);
+    CreateBlob(non_empty_container_client, "object1", 
PreexistingData::kLoremIpsum);
+    ASSERT_MOVE(data.container_name, non_empty_container, ENOTEMPTY);
+    // Renaming to an empty container fails to replace it
+    auto empty_container = PreexistingData::RandomContainerName(rng_);
+    auto empty_container_client = CreateContainer(empty_container);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError,
+        ::testing::HasSubstr("Unable to replace empty container: '" + 
empty_container +
+                             "'"),
+        fs()->Move(data.container_name, empty_container));
+    // Renaming to a non-existing container creates it
+    auto new_container = PreexistingData::RandomContainerName(rng_);
+    AssertFileInfo(fs(), new_container, FileType::NotFound);
+    if (env->backend() == AzureBackend::kAzurite) {
+      // Azurite returns a 201 Created for RenameBlobContainer, but the created
+      // container doesn't contain the blobs from the source container and
+      // the source container reamins undeleted after the "rename".
+    } else {
+      // See Azure SDK issue/question:
+      // https://github.com/Azure/azure-sdk-for-cpp/issues/5262
+      EXPECT_RAISES_WITH_MESSAGE_THAT(
+          IOError,
+          ::testing::HasSubstr("The 'rename' operation is not supported on 
containers."),
+          fs()->Move(data.container_name, new_container));
+      // ASSERT_MOVE_OK(data.container_name, new_container);
+      // AssertFileInfo(fs(),
+      //                ConcatAbstractPath(new_container, 
PreexistingData::kObjectName),
+      //                FileType::File);
+    }
+    // Renaming to an empty container can work if the source is also empty
+    auto new_empty_container = PreexistingData::RandomContainerName(rng_);
+    auto new_empty_container_client = CreateContainer(new_empty_container);
+    ASSERT_MOVE_OK(empty_container, new_empty_container);
+  }
+
+  void TestMoveContainerToPath() {
+    auto data = SetUpPreexistingData();
+    ASSERT_MOVE("missing-container", data.ContainerPath("new-subdir"), ENOENT);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid,
+        HasDirMoveToSubdirMessage(data.container_name, 
data.ContainerPath("new-subdir")),
+        fs()->Move(data.container_name, data.ContainerPath("new-subdir")));
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        NotImplemented,
+        HasCrossContainerNotImplementedMessage(data.container_name,
+                                               "a-container/new-subdir"),
+        fs()->Move(data.container_name, "a-container/new-subdir"));
+  }
+
+  void TestCreateContainerFromPath() {
+    auto data = SetUpPreexistingData();
+    auto missing_path = data.RandomDirectoryPath(rng_);
+    ASSERT_MOVE(missing_path, "new-container", ENOENT);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid,
+        ::testing::HasSubstr("Creating files at '/' is not possible, only 
directories."),
+        fs()->Move(data.ObjectPath(), "new-file"));
+    auto src_dir_path = data.RandomDirectoryPath(rng_);
+    ASSERT_OK(fs()->CreateDir(src_dir_path, false));
+    EXPECT_OK_AND_ASSIGN(auto src_dir_info, fs()->GetFileInfo(src_dir_path));
+    EXPECT_EQ(src_dir_info.type(), FileType::Directory);
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        NotImplemented,
+        HasCrossContainerNotImplementedMessage(src_dir_path, "new-container"),
+        fs()->Move(src_dir_path, "new-container"));
+  }
+
+  void TestMovePaths() {
+    Status st;
+    auto data = SetUpPreexistingData();
+    // When source doesn't exist.
+    ASSERT_MOVE("missing-container/src-path", "a-container/dest-path", ENOENT);
+    auto missing_path1 = data.RandomDirectoryPath(rng_);
+    ASSERT_MOVE(missing_path1, "missing-container/path", ENOENT);
+
+    // But when source exists...
+    if (!WithHierarchicalNamespace()) {
+      // ...and containers are different, we get an error message telling 
cross-container
+      // moves are not implemented.
+      EXPECT_RAISES_WITH_MESSAGE_THAT(
+          NotImplemented,
+          HasCrossContainerNotImplementedMessage(data.ObjectPath(),
+                                                 "missing-container/path"),
+          fs()->Move(data.ObjectPath(), "missing-container/path"));
+      GTEST_SKIP()
+          << "The rest of TestMovePaths is not implemented for non-HNS 
scenarios";
+    }
+    auto adlfs_client =
+        datalake_service_client_->GetFileSystemClient(data.container_name);
+    // ...and dest.container doesn't exist.
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError, HasMissingParentDirMessage("missing-container/path"),
+        fs()->Move(data.ObjectPath(), "missing-container/path"));
+    AssertFileInfo(fs(), data.ObjectPath(), FileType::File);
+
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        IOError, HasMissingParentDirMessage(data.Path("missing-subdir/file")),
+        fs()->Move(data.ObjectPath(), data.Path("missing-subdir/file")));
+    AssertFileInfo(fs(), data.ObjectPath(), FileType::File);
+
+    // src is a file and dest does not exists
+    ASSERT_MOVE_OK(data.ObjectPath(), data.Path("file0"));
+    ASSERT_MOVE(data.Path("file0/"), data.Path("file1"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file0"), data.Path("file1/"), ENOENT);
+    ASSERT_MOVE(data.Path("file0/"), data.Path("file1/"), ENOTDIR);
+    // "file0" exists
+
+    // src is a file and dest exists (as a file)
+    CreateFile(adlfs_client, PreexistingData::kObjectName, 
PreexistingData::kLoremIpsum);
+    CreateFile(adlfs_client, "file1", PreexistingData::kLoremIpsum);
+    ASSERT_MOVE_OK(data.ObjectPath(), data.Path("file0"));
+    ASSERT_MOVE(data.Path("file1/"), data.Path("file0"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file1"), data.Path("file0/"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file1/"), data.Path("file0/"), ENOTDIR);
+    // "file1" and "file2" exist
+
+    // src is a file and dest exists (as an empty dir)
+    CreateDirectory(adlfs_client, "subdir0");
+    ASSERT_MOVE(data.Path("file0"), data.Path("subdir0"), EISDIR);
+    ASSERT_MOVE(data.Path("file0/"), data.Path("subdir0"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file0"), data.Path("subdir0/"), EISDIR);
+    ASSERT_MOVE(data.Path("file0/"), data.Path("subdir0/"), ENOTDIR);
+
+    // src is a file and dest exists (as a non-empty dir)
+    CreateFile(adlfs_client, "subdir0/file-at-subdir");
+    ASSERT_MOVE(data.Path("file0"), data.Path("subdir0"), EISDIR);
+    ASSERT_MOVE(data.Path("file0/"), data.Path("subdir0"), ENOTDIR);
+    ASSERT_MOVE(data.Path("file0"), data.Path("subdir0/"), EISDIR);
+    ASSERT_MOVE(data.Path("file0/"), data.Path("subdir0/"), ENOTDIR);
+    // "subdir0/file-at-subdir" exists
+
+    // src is a directory and dest does not exists
+    CreateDirectory(adlfs_client, "subdir0");
+    ASSERT_MOVE_OK(data.Path("subdir0"), data.Path("subdir1"));
+    ASSERT_MOVE_OK(data.Path("subdir1/"), data.Path("subdir2"));
+    ASSERT_MOVE_OK(data.Path("subdir2"), data.Path("subdir3/"));
+    ASSERT_MOVE_OK(data.Path("subdir3/"), data.Path("subdir4/"));
+    AssertFileInfo(fs(), data.Path("subdir4/file-at-subdir"), FileType::File);
+    // "subdir4/file-at-subdir" exists
+
+    // src is a directory and dest exists as an empty directory
+    CreateDirectory(adlfs_client, "subdir0");
+    CreateDirectory(adlfs_client, "subdir1");
+    CreateDirectory(adlfs_client, "subdir2");
+    CreateDirectory(adlfs_client, "subdir3");
+    ASSERT_MOVE_OK(data.Path("subdir4"), data.Path("subdir0"));
+    ASSERT_MOVE_OK(data.Path("subdir0/"), data.Path("subdir1"));
+    ASSERT_MOVE_OK(data.Path("subdir1"), data.Path("subdir2/"));
+    ASSERT_MOVE_OK(data.Path("subdir2/"), data.Path("subdir3/"));
+    AssertFileInfo(fs(), data.Path("subdir3/file-at-subdir"), FileType::File);
+    // "subdir3/file-at-subdir" exists
+
+    // src is directory and dest exists as a non-empty directory
+    CreateDirectory(adlfs_client, "subdir0");
+    CreateDirectory(adlfs_client, "subdir1");
+    CreateDirectory(adlfs_client, "subdir2");

Review Comment:
   It seems that they are needless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to