This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 8dfd6d94 feat(services/azdfs): support rename (#1929)
8dfd6d94 is described below
commit 8dfd6d942bf8b0c8efd42927e83d6bdbbdaf1c11
Author: Suyan <[email protected]>
AuthorDate: Thu Apr 13 01:09:01 2023 +0800
feat(services/azdfs): support rename (#1929)
* feat(services/azdfs): support rename
Signed-off-by: suyanhanx <[email protected]>
* try fix content-length
Signed-off-by: suyanhanx <[email protected]>
* ensure parent path before rename
Signed-off-by: suyanhanx <[email protected]>
* fix path
Signed-off-by: suyanhanx <[email protected]>
* fix ensure parent path
Signed-off-by: suyanhanx <[email protected]>
---------
Signed-off-by: suyanhanx <[email protected]>
---
core/src/services/azdfs/backend.rs | 30 ++++++++++++++++++++-
core/src/services/azdfs/core.rs | 54 ++++++++++++++++++++++++++++++++++++++
2 files changed, 83 insertions(+), 1 deletion(-)
diff --git a/core/src/services/azdfs/backend.rs
b/core/src/services/azdfs/backend.rs
index 4bdf402e..825e7466 100644
--- a/core/src/services/azdfs/backend.rs
+++ b/core/src/services/azdfs/backend.rs
@@ -57,6 +57,7 @@ const KNOWN_AZDFS_ENDPOINT_SUFFIX: &[&str] = &[
///
/// - [x] read
/// - [x] write
+/// - [x] rename
/// - [x] list
/// - [ ] ~~scan~~
/// - [ ] presign
@@ -308,7 +309,10 @@ impl Accessor for AzdfsBackend {
.set_root(&self.core.root)
.set_name(&self.core.filesystem)
.set_capabilities(
- AccessorCapability::Read | AccessorCapability::Write |
AccessorCapability::List,
+ AccessorCapability::Read
+ | AccessorCapability::Write
+ | AccessorCapability::Rename
+ | AccessorCapability::List,
)
.set_hints(AccessorHint::ReadStreamable);
@@ -369,6 +373,30 @@ impl Accessor for AzdfsBackend {
))
}
+ async fn rename(&self, from: &str, to: &str, _args: OpRename) ->
Result<RpRename> {
+ if let Some(resp) = self.core.azdfs_ensure_parent_path(to).await? {
+ let status = resp.status();
+ match status {
+ StatusCode::CREATED | StatusCode::CONFLICT => {
+ resp.into_body().consume().await?;
+ }
+ _ => return Err(parse_error(resp).await?),
+ }
+ }
+
+ let resp = self.core.azdfs_rename(from, to).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::CREATED => {
+ resp.into_body().consume().await?;
+ Ok(RpRename::default())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
// Stat root always returns a DIR.
if path == "/" {
diff --git a/core/src/services/azdfs/core.rs b/core/src/services/azdfs/core.rs
index 8386c4ba..c0dc70d1 100644
--- a/core/src/services/azdfs/core.rs
+++ b/core/src/services/azdfs/core.rs
@@ -32,6 +32,8 @@ use reqsign::AzureStorageSigner;
use crate::raw::*;
use crate::*;
+const X_MS_RENAME_SOURCE: &str = "x-ms-rename-source";
+
pub struct AzdfsCore {
pub filesystem: String,
pub root: String,
@@ -161,6 +163,30 @@ impl AzdfsCore {
Ok(req)
}
+ pub async fn azdfs_rename(&self, from: &str, to: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let source = build_abs_path(&self.root, from);
+ let target = build_abs_path(&self.root, to);
+
+ let url = format!(
+ "{}/{}/{}",
+ self.endpoint,
+ self.filesystem,
+ percent_encode_path(&target)
+ );
+
+ let mut req = Request::put(&url)
+ .header(
+ X_MS_RENAME_SOURCE,
+ format!("/{}/{}", self.filesystem,
percent_encode_path(&source)),
+ )
+ .header(CONTENT_LENGTH, 0)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+ self.send(req).await
+ }
+
/// ref:
https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update
pub fn azdfs_update_request(
&self,
@@ -267,4 +293,32 @@ impl AzdfsCore {
self.sign(&mut req).await?;
self.send(req).await
}
+
+ pub async fn azdfs_ensure_parent_path(
+ &self,
+ path: &str,
+ ) -> Result<Option<Response<IncomingAsyncBody>>> {
+ let abs_target_path = path.trim_end_matches('/').to_string();
+ let abs_target_path = abs_target_path.as_str();
+ let mut parts: Vec<&str> = abs_target_path
+ .split('/')
+ .filter(|x| !x.is_empty())
+ .collect();
+
+ if !parts.is_empty() {
+ parts.pop();
+ }
+
+ if !parts.is_empty() {
+ let parent_path = parts.join("/");
+ let mut req =
+ self.azdfs_create_request(&parent_path, "directory", None,
None, AsyncBody::Empty)?;
+
+ self.sign(&mut req).await?;
+
+ Ok(Some(self.send(req).await?))
+ } else {
+ Ok(None)
+ }
+ }
}