This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new a3c2e288f refactor(hdfs): restructure HdfsBackend and introduce 
HdfsCore (#6736)
a3c2e288f is described below

commit a3c2e288f341ae6cd8ae46ec2dd55fdab36e760a
Author: Kingsword <[email protected]>
AuthorDate: Fri Oct 24 23:19:12 2025 +0800

    refactor(hdfs): restructure HdfsBackend and introduce HdfsCore (#6736)
---
 core/src/services/hdfs/backend.rs | 212 +++++++-------------------------------
 core/src/services/hdfs/core.rs    | 206 ++++++++++++++++++++++++++++++++++++
 core/src/services/hdfs/delete.rs  |   6 +-
 core/src/services/hdfs/mod.rs     |   1 +
 4 files changed, 250 insertions(+), 175 deletions(-)

diff --git a/core/src/services/hdfs/backend.rs 
b/core/src/services/hdfs/backend.rs
index b3705e4c0..7a70f4e3f 100644
--- a/core/src/services/hdfs/backend.rs
+++ b/core/src/services/hdfs/backend.rs
@@ -18,13 +18,12 @@
 use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::io;
-use std::io::SeekFrom;
-use std::path::PathBuf;
 use std::sync::Arc;
 
 use log::debug;
 
 use super::HDFS_SCHEME;
+use super::core::HdfsCore;
 use super::delete::HdfsDeleter;
 use super::lister::HdfsLister;
 use super::reader::HdfsReader;
@@ -165,35 +164,37 @@ impl Builder for HdfsBuilder {
         }
 
         Ok(HdfsBackend {
-            info: {
-                let am = AccessorInfo::default();
-                am.set_scheme(HDFS_SCHEME)
-                    .set_root(&root)
-                    .set_native_capability(Capability {
-                        stat: true,
+            core: Arc::new(HdfsCore {
+                info: {
+                    let am = AccessorInfo::default();
+                    am.set_scheme(HDFS_SCHEME)
+                        .set_root(&root)
+                        .set_native_capability(Capability {
+                            stat: true,
 
-                        read: true,
+                            read: true,
 
-                        write: true,
-                        write_can_append: self.config.enable_append,
+                            write: true,
+                            write_can_append: self.config.enable_append,
 
-                        create_dir: true,
-                        delete: true,
+                            create_dir: true,
+                            delete: true,
 
-                        list: true,
+                            list: true,
 
-                        rename: true,
+                            rename: true,
 
-                        shared: true,
+                            shared: true,
 
-                        ..Default::default()
-                    });
+                            ..Default::default()
+                        });
 
-                am.into()
-            },
-            root,
-            atomic_write_dir,
-            client: Arc::new(client),
+                    am.into()
+                },
+                root,
+                atomic_write_dir,
+                client: Arc::new(client),
+            }),
         })
     }
 }
@@ -201,16 +202,9 @@ impl Builder for HdfsBuilder {
 /// Backend for hdfs services.
 #[derive(Debug, Clone)]
 pub struct HdfsBackend {
-    pub info: Arc<AccessorInfo>,
-    pub root: String,
-    atomic_write_dir: Option<String>,
-    pub client: Arc<hdrs::Client>,
+    core: Arc<HdfsCore>,
 }
 
-/// hdrs::Client is thread-safe.
-unsafe impl Send for HdfsBackend {}
-unsafe impl Sync for HdfsBackend {}
-
 impl Access for HdfsBackend {
     type Reader = HdfsReader<hdrs::AsyncFile>;
     type Writer = HdfsWriter<hdrs::AsyncFile>;
@@ -218,54 +212,21 @@ impl Access for HdfsBackend {
     type Deleter = oio::OneShotDeleter<HdfsDeleter>;
 
     fn info(&self) -> Arc<AccessorInfo> {
-        self.info.clone()
+        self.core.info.clone()
     }
 
     async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
-        let p = build_rooted_abs_path(&self.root, path);
-
-        self.client.create_dir(&p).map_err(new_std_io_error)?;
-
+        self.core.hdfs_create_dir(path)?;
         Ok(RpCreateDir::default())
     }
 
     async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
-        let p = build_rooted_abs_path(&self.root, path);
-
-        let meta = self.client.metadata(&p).map_err(new_std_io_error)?;
-
-        let mode = if meta.is_dir() {
-            EntryMode::DIR
-        } else if meta.is_file() {
-            EntryMode::FILE
-        } else {
-            EntryMode::Unknown
-        };
-        let mut m = Metadata::new(mode);
-        m.set_content_length(meta.len());
-        m.set_last_modified(Timestamp::try_from(meta.modified())?);
-
+        let m = self.core.hdfs_stat(path)?;
         Ok(RpStat::new(m))
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let p = build_rooted_abs_path(&self.root, path);
-
-        let client = self.client.clone();
-        let mut f = client
-            .open_file()
-            .read(true)
-            .async_open(&p)
-            .await
-            .map_err(new_std_io_error)?;
-
-        if args.range().offset() != 0 {
-            use futures::AsyncSeekExt;
-
-            f.seek(SeekFrom::Start(args.range().offset()))
-                .await
-                .map_err(new_std_io_error)?;
-        }
+        let f = self.core.hdfs_read(path, &args).await?;
 
         Ok((
             RpRead::new(),
@@ -274,50 +235,8 @@ impl Access for HdfsBackend {
     }
 
     async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let target_path = build_rooted_abs_path(&self.root, path);
-        let mut initial_size = 0;
-        let target_exists = match self.client.metadata(&target_path) {
-            Ok(meta) => {
-                initial_size = meta.len();
-                true
-            }
-            Err(err) => {
-                if err.kind() != io::ErrorKind::NotFound {
-                    return Err(new_std_io_error(err));
-                }
-                false
-            }
-        };
-
-        let should_append = op.append() && target_exists;
-        let tmp_path = 
self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| {
-            // If the target file exists, we should append to the end of it 
directly.
-            (!should_append).then_some(build_rooted_abs_path(
-                atomic_write_dir,
-                &build_tmp_path_of(path),
-            ))
-        });
-
-        if !target_exists {
-            let parent = get_parent(&target_path);
-            self.client.create_dir(parent).map_err(new_std_io_error)?;
-        }
-        if !should_append {
-            initial_size = 0;
-        }
-
-        let mut open_options = self.client.open_file();
-        open_options.create(true);
-        if should_append {
-            open_options.append(true);
-        } else {
-            open_options.write(true);
-        }
-
-        let f = open_options
-            .async_open(tmp_path.as_ref().unwrap_or(&target_path))
-            .await
-            .map_err(new_std_io_error)?;
+        let (target_path, tmp_path, f, target_exists, initial_size) =
+            self.core.hdfs_write(path, &op).await?;
 
         Ok((
             RpWrite::new(),
@@ -325,7 +244,7 @@ impl Access for HdfsBackend {
                 target_path,
                 tmp_path,
                 f,
-                Arc::clone(&self.client),
+                Arc::clone(&self.core.client),
                 target_exists,
                 initial_size,
             ),
@@ -335,73 +254,22 @@ impl Access for HdfsBackend {
     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
         Ok((
             RpDelete::default(),
-            oio::OneShotDeleter::new(HdfsDeleter::new(Arc::new(self.clone()))),
+            oio::OneShotDeleter::new(HdfsDeleter::new(Arc::clone(&self.core))),
         ))
     }
 
     async fn list(&self, path: &str, _: OpList) -> Result<(RpList, 
Self::Lister)> {
-        let p = build_rooted_abs_path(&self.root, path);
-
-        let f = match self.client.read_dir(&p) {
-            Ok(f) => f,
-            Err(e) => {
-                return if e.kind() == io::ErrorKind::NotFound {
-                    Ok((RpList::default(), None))
-                } else {
-                    Err(new_std_io_error(e))
-                };
+        match self.core.hdfs_list(path)? {
+            Some(f) => {
+                let rd = HdfsLister::new(&self.core.root, f, path);
+                Ok((RpList::default(), Some(rd)))
             }
-        };
-
-        let rd = HdfsLister::new(&self.root, f, path);
-
-        Ok((RpList::default(), Some(rd)))
+            None => Ok((RpList::default(), None)),
+        }
     }
 
     async fn rename(&self, from: &str, to: &str, _args: OpRename) -> 
Result<RpRename> {
-        let from_path = build_rooted_abs_path(&self.root, from);
-        self.client.metadata(&from_path).map_err(new_std_io_error)?;
-
-        let to_path = build_rooted_abs_path(&self.root, to);
-        let result = self.client.metadata(&to_path);
-        match result {
-            Err(err) => {
-                // Early return if other error happened.
-                if err.kind() != io::ErrorKind::NotFound {
-                    return Err(new_std_io_error(err));
-                }
-
-                let parent = PathBuf::from(&to_path)
-                    .parent()
-                    .ok_or_else(|| {
-                        Error::new(
-                            ErrorKind::Unexpected,
-                            "path should have parent but not, it must be 
malformed",
-                        )
-                        .with_context("input", &to_path)
-                    })?
-                    .to_path_buf();
-
-                self.client
-                    .create_dir(&parent.to_string_lossy())
-                    .map_err(new_std_io_error)?;
-            }
-            Ok(metadata) => {
-                if metadata.is_file() {
-                    self.client
-                        .remove_file(&to_path)
-                        .map_err(new_std_io_error)?;
-                } else {
-                    return Err(Error::new(ErrorKind::IsADirectory, "path 
should be a file")
-                        .with_context("input", &to_path));
-                }
-            }
-        }
-
-        self.client
-            .rename_file(&from_path, &to_path)
-            .map_err(new_std_io_error)?;
-
+        self.core.hdfs_rename(from, to)?;
         Ok(RpRename::new())
     }
 }
diff --git a/core/src/services/hdfs/core.rs b/core/src/services/hdfs/core.rs
new file mode 100644
index 000000000..6e8e08bdc
--- /dev/null
+++ b/core/src/services/hdfs/core.rs
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::io;
+use std::io::SeekFrom;
+use std::sync::Arc;
+
+use crate::raw::*;
+use crate::*;
+
+/// HdfsCore contains code that directly interacts with HDFS.
+#[derive(Clone)]
+pub struct HdfsCore {
+    pub info: Arc<AccessorInfo>,
+    pub root: String,
+    pub atomic_write_dir: Option<String>,
+    pub client: Arc<hdrs::Client>,
+}
+
+impl Debug for HdfsCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("HdfsCore")
+            .field("root", &self.root)
+            .field("atomic_write_dir", &self.atomic_write_dir)
+            .finish_non_exhaustive()
+    }
+}
+
+impl HdfsCore {
+    pub fn hdfs_create_dir(&self, path: &str) -> Result<()> {
+        let p = build_rooted_abs_path(&self.root, path);
+        self.client.create_dir(&p).map_err(new_std_io_error)?;
+        Ok(())
+    }
+
+    pub fn hdfs_stat(&self, path: &str) -> Result<Metadata> {
+        let p = build_rooted_abs_path(&self.root, path);
+        let meta = self.client.metadata(&p).map_err(new_std_io_error)?;
+
+        let mode = if meta.is_dir() {
+            EntryMode::DIR
+        } else if meta.is_file() {
+            EntryMode::FILE
+        } else {
+            EntryMode::Unknown
+        };
+        let mut m = Metadata::new(mode);
+        m.set_content_length(meta.len());
+        m.set_last_modified(Timestamp::try_from(meta.modified())?);
+
+        Ok(m)
+    }
+
+    pub async fn hdfs_read(&self, path: &str, args: &OpRead) -> 
Result<hdrs::AsyncFile> {
+        let p = build_rooted_abs_path(&self.root, path);
+
+        let client = self.client.clone();
+        let mut f = client
+            .open_file()
+            .read(true)
+            .async_open(&p)
+            .await
+            .map_err(new_std_io_error)?;
+
+        if args.range().offset() != 0 {
+            use futures::AsyncSeekExt;
+
+            f.seek(SeekFrom::Start(args.range().offset()))
+                .await
+                .map_err(new_std_io_error)?;
+        }
+
+        Ok(f)
+    }
+
+    pub async fn hdfs_write(
+        &self,
+        path: &str,
+        op: &OpWrite,
+    ) -> Result<(String, Option<String>, hdrs::AsyncFile, bool, u64)> {
+        let target_path = build_rooted_abs_path(&self.root, path);
+        let mut initial_size = 0;
+        let target_exists = match self.client.metadata(&target_path) {
+            Ok(meta) => {
+                initial_size = meta.len();
+                true
+            }
+            Err(err) => {
+                if err.kind() != io::ErrorKind::NotFound {
+                    return Err(new_std_io_error(err));
+                }
+                false
+            }
+        };
+
+        let should_append = op.append() && target_exists;
+        let tmp_path = 
self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| {
+            // If the target file exists, we should append to the end of it 
directly.
+            (!should_append).then_some(build_rooted_abs_path(
+                atomic_write_dir,
+                &build_tmp_path_of(path),
+            ))
+        });
+
+        if !target_exists {
+            let parent = get_parent(&target_path);
+            self.client.create_dir(parent).map_err(new_std_io_error)?;
+        }
+        if !should_append {
+            initial_size = 0;
+        }
+
+        let mut open_options = self.client.open_file();
+        open_options.create(true);
+        if should_append {
+            open_options.append(true);
+        } else {
+            open_options.write(true);
+        }
+
+        let f = open_options
+            .async_open(tmp_path.as_ref().unwrap_or(&target_path))
+            .await
+            .map_err(new_std_io_error)?;
+
+        Ok((target_path, tmp_path, f, target_exists, initial_size))
+    }
+
+    pub fn hdfs_list(&self, path: &str) -> Result<Option<hdrs::Readdir>> {
+        let p = build_rooted_abs_path(&self.root, path);
+
+        match self.client.read_dir(&p) {
+            Ok(f) => Ok(Some(f)),
+            Err(e) => {
+                if e.kind() == io::ErrorKind::NotFound {
+                    Ok(None)
+                } else {
+                    Err(new_std_io_error(e))
+                }
+            }
+        }
+    }
+
+    pub fn hdfs_rename(&self, from: &str, to: &str) -> Result<()> {
+        let from_path = build_rooted_abs_path(&self.root, from);
+        self.client.metadata(&from_path).map_err(new_std_io_error)?;
+
+        let to_path = build_rooted_abs_path(&self.root, to);
+        let result = self.client.metadata(&to_path);
+        match result {
+            Err(err) => {
+                // Early return if other error happened.
+                if err.kind() != io::ErrorKind::NotFound {
+                    return Err(new_std_io_error(err));
+                }
+
+                let parent = std::path::PathBuf::from(&to_path)
+                    .parent()
+                    .ok_or_else(|| {
+                        Error::new(
+                            ErrorKind::Unexpected,
+                            "path should have parent but not, it must be 
malformed",
+                        )
+                        .with_context("input", &to_path)
+                    })?
+                    .to_path_buf();
+
+                self.client
+                    .create_dir(&parent.to_string_lossy())
+                    .map_err(new_std_io_error)?;
+            }
+            Ok(metadata) => {
+                if metadata.is_file() {
+                    self.client
+                        .remove_file(&to_path)
+                        .map_err(new_std_io_error)?;
+                } else {
+                    return Err(Error::new(ErrorKind::IsADirectory, "path 
should be a file")
+                        .with_context("input", &to_path));
+                }
+            }
+        }
+
+        self.client
+            .rename_file(&from_path, &to_path)
+            .map_err(new_std_io_error)?;
+
+        Ok(())
+    }
+}
diff --git a/core/src/services/hdfs/delete.rs b/core/src/services/hdfs/delete.rs
index a95478771..11471d794 100644
--- a/core/src/services/hdfs/delete.rs
+++ b/core/src/services/hdfs/delete.rs
@@ -18,16 +18,16 @@
 use std::io;
 use std::sync::Arc;
 
-use super::backend::HdfsBackend;
+use super::core::HdfsCore;
 use crate::raw::*;
 use crate::*;
 
 pub struct HdfsDeleter {
-    core: Arc<HdfsBackend>,
+    core: Arc<HdfsCore>,
 }
 
 impl HdfsDeleter {
-    pub fn new(core: Arc<HdfsBackend>) -> Self {
+    pub fn new(core: Arc<HdfsCore>) -> Self {
         Self { core }
     }
 }
diff --git a/core/src/services/hdfs/mod.rs b/core/src/services/hdfs/mod.rs
index 240d1474a..9b35636f3 100644
--- a/core/src/services/hdfs/mod.rs
+++ b/core/src/services/hdfs/mod.rs
@@ -17,6 +17,7 @@
 
 /// Default scheme for hdfs service.
 pub(super) const HDFS_SCHEME: &str = "hdfs";
+mod core;
 mod delete;
 mod lister;
 mod reader;

Reply via email to