This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 9746efca6 refactor(hdfs-native): restructure HdfsNativeBackend and 
introduce HdfsNativeCore (#6737)
9746efca6 is described below

commit 9746efca6aaa95776d467e7e5e88c5ec93dfd00d
Author: Kingsword <[email protected]>
AuthorDate: Fri Oct 24 23:19:40 2025 +0800

    refactor(hdfs-native): restructure HdfsNativeBackend and introduce 
HdfsNativeCore (#6737)
---
 core/src/services/hdfs_native/backend.rs | 221 ++++++++-----------------------
 core/src/services/hdfs_native/core.rs    | 214 ++++++++++++++++++++++++++++++
 core/src/services/hdfs_native/delete.rs  |  16 +--
 core/src/services/hdfs_native/mod.rs     |   1 +
 4 files changed, 272 insertions(+), 180 deletions(-)

diff --git a/core/src/services/hdfs_native/backend.rs 
b/core/src/services/hdfs_native/backend.rs
index 70136e6a1..39ac787b6 100644
--- a/core/src/services/hdfs_native/backend.rs
+++ b/core/src/services/hdfs_native/backend.rs
@@ -19,11 +19,10 @@ use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::sync::Arc;
 
-use hdfs_native::HdfsError;
-use hdfs_native::WriteOptions;
 use log::debug;
 
 use super::HDFS_NATIVE_SCHEME;
+use super::core::HdfsNativeCore;
 use super::delete::HdfsNativeDeleter;
 use super::error::parse_hdfs_error;
 use super::lister::HdfsNativeLister;
@@ -110,9 +109,37 @@ impl Builder for HdfsNativeBuilder {
 
         // need to check if root dir exists, create if not
         Ok(HdfsNativeBackend {
-            root,
-            client: Arc::new(client),
-            enable_append: self.config.enable_append,
+            core: Arc::new(HdfsNativeCore {
+                info: {
+                    let am = AccessorInfo::default();
+                    am.set_scheme(HDFS_NATIVE_SCHEME)
+                        .set_root(&root)
+                        .set_native_capability(Capability {
+                            stat: true,
+
+                            read: true,
+
+                            write: true,
+                            write_can_append: self.config.enable_append,
+
+                            create_dir: true,
+                            delete: true,
+
+                            list: true,
+
+                            rename: true,
+
+                            shared: true,
+
+                            ..Default::default()
+                        });
+
+                    am.into()
+                },
+                root,
+                client: Arc::new(client),
+                enable_append: self.config.enable_append,
+            }),
         })
     }
 }
@@ -128,15 +155,9 @@ impl Builder for HdfsNativeBuilder {
 /// Backend for hdfs-native services.
 #[derive(Debug, Clone)]
 pub struct HdfsNativeBackend {
-    pub root: String,
-    pub client: Arc<hdfs_native::Client>,
-    enable_append: bool,
+    core: Arc<HdfsNativeCore>,
 }
 
-/// hdfs_native::Client is thread-safe.
-unsafe impl Send for HdfsNativeBackend {}
-unsafe impl Sync for HdfsNativeBackend {}
-
 impl Access for HdfsNativeBackend {
     type Reader = HdfsNativeReader;
     type Writer = HdfsNativeWriter;
@@ -144,118 +165,29 @@ impl Access for HdfsNativeBackend {
     type Deleter = oio::OneShotDeleter<HdfsNativeDeleter>;
 
     fn info(&self) -> Arc<AccessorInfo> {
-        let am = AccessorInfo::default();
-        am.set_scheme(HDFS_NATIVE_SCHEME)
-            .set_root(&self.root)
-            .set_native_capability(Capability {
-                stat: true,
-
-                read: true,
-
-                write: true,
-                write_can_append: self.enable_append,
-
-                create_dir: true,
-                delete: true,
-
-                list: true,
-
-                rename: true,
-
-                shared: true,
-
-                ..Default::default()
-            });
-
-        am.into()
+        self.core.info.clone()
     }
 
     async fn create_dir(&self, path: &str, _args: OpCreateDir) -> 
Result<RpCreateDir> {
-        let p = build_rooted_abs_path(&self.root, path);
-
-        self.client
-            .mkdirs(&p, 0o777, true)
-            .await
-            .map_err(parse_hdfs_error)?;
-
+        self.core.hdfs_create_dir(path).await?;
         Ok(RpCreateDir::default())
     }
 
     async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
-        let p = build_rooted_abs_path(&self.root, path);
-
-        let status: hdfs_native::client::FileStatus = self
-            .client
-            .get_file_info(&p)
-            .await
-            .map_err(parse_hdfs_error)?;
-
-        let mode = if status.isdir {
-            EntryMode::DIR
-        } else {
-            EntryMode::FILE
-        };
-
-        let mut metadata = Metadata::new(mode);
-        metadata
-            .set_last_modified(Timestamp::from_millisecond(
-                status.modification_time as i64,
-            )?)
-            .set_content_length(status.length as u64);
-
-        Ok(RpStat::new(metadata))
+        let m = self.core.hdfs_stat(path).await?;
+        Ok(RpStat::new(m))
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let p = build_rooted_abs_path(&self.root, path);
+        let (f, offset, size) = self.core.hdfs_read(path, &args).await?;
 
-        let f = self.client.read(&p).await.map_err(parse_hdfs_error)?;
-
-        let r = HdfsNativeReader::new(
-            f,
-            args.range().offset() as _,
-            args.range().size().unwrap_or(u64::MAX) as _,
-        );
+        let r = HdfsNativeReader::new(f, offset as _, size as _);
 
         Ok((RpRead::new(), r))
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let target_path = build_rooted_abs_path(&self.root, path);
-        let mut initial_size = 0;
-
-        let target_exists = match 
self.client.get_file_info(&target_path).await {
-            Ok(status) => {
-                initial_size = status.length as u64;
-                true
-            }
-            Err(err) => match &err {
-                HdfsError::FileNotFound(_) => false,
-                _ => return Err(parse_hdfs_error(err)),
-            },
-        };
-
-        let f = if target_exists {
-            if args.append() {
-                assert!(self.enable_append, "append is not enabled");
-                self.client
-                    .append(&target_path)
-                    .await
-                    .map_err(parse_hdfs_error)?
-            } else {
-                initial_size = 0;
-                self.client
-                    .create(&target_path, 
WriteOptions::default().overwrite(true))
-                    .await
-                    .map_err(parse_hdfs_error)?
-            }
-        } else {
-            initial_size = 0;
-            self.client
-                .create(&target_path, WriteOptions::default())
-                .await
-                .map_err(parse_hdfs_error)?
-        };
+        let (f, initial_size) = self.core.hdfs_write(path, &args).await?;
 
         Ok((RpWrite::new(), HdfsNativeWriter::new(f, initial_size)))
     }
@@ -263,74 +195,27 @@ impl Access for HdfsNativeBackend {
     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
         Ok((
             RpDelete::default(),
-            
oio::OneShotDeleter::new(HdfsNativeDeleter::new(Arc::new(self.clone()))),
+            
oio::OneShotDeleter::new(HdfsNativeDeleter::new(Arc::clone(&self.core))),
         ))
     }
 
     async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        let p: String = build_rooted_abs_path(&self.root, path);
-
-        let isdir = match self.client.get_file_info(&p).await {
-            Ok(status) => status.isdir,
-            Err(err) => {
-                return match &err {
-                    HdfsError::FileNotFound(_) => Ok((RpList::default(), 
None)),
-                    _ => Err(parse_hdfs_error(err)),
-                };
-            }
-        };
-        let current_path = if isdir {
-            if !path.ends_with("/") {
-                Some(path.to_string() + "/")
-            } else {
-                Some(path.to_string())
-            }
-        } else {
-            None
-        };
-
-        Ok((
-            RpList::default(),
-            Some(HdfsNativeLister::new(
-                &self.root,
-                &self.client,
-                &p,
-                current_path,
+        match self.core.hdfs_list(path).await? {
+            Some((p, current_path)) => Ok((
+                RpList::default(),
+                Some(HdfsNativeLister::new(
+                    &self.core.root,
+                    &self.core.client,
+                    &p,
+                    current_path,
+                )),
             )),
-        ))
+            None => Ok((RpList::default(), None)),
+        }
     }
 
     async fn rename(&self, from: &str, to: &str, _args: OpRename) -> 
Result<RpRename> {
-        let from_path = build_rooted_abs_path(&self.root, from);
-        let to_path = build_rooted_abs_path(&self.root, to);
-        match self.client.get_file_info(&to_path).await {
-            Ok(status) => {
-                if status.isdir {
-                    return Err(Error::new(ErrorKind::IsADirectory, "path 
should be a file")
-                        .with_context("input", &to_path));
-                } else {
-                    self.client
-                        .delete(&to_path, true)
-                        .await
-                        .map_err(parse_hdfs_error)?;
-                }
-            }
-            Err(err) => match &err {
-                HdfsError::FileNotFound(_) => {
-                    self.client
-                        .create(&to_path, 
WriteOptions::default().create_parent(true))
-                        .await
-                        .map_err(parse_hdfs_error)?;
-                }
-                _ => return Err(parse_hdfs_error(err)),
-            },
-        };
-
-        self.client
-            .rename(&from_path, &to_path, true)
-            .await
-            .map_err(parse_hdfs_error)?;
-
+        self.core.hdfs_rename(from, to).await?;
         Ok(RpRename::default())
     }
 }
diff --git a/core/src/services/hdfs_native/core.rs 
b/core/src/services/hdfs_native/core.rs
new file mode 100644
index 000000000..353458ee5
--- /dev/null
+++ b/core/src/services/hdfs_native/core.rs
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use hdfs_native::HdfsError;
+use hdfs_native::WriteOptions;
+
+use super::error::parse_hdfs_error;
+use crate::raw::*;
+use crate::*;
+
+/// HdfsNativeCore contains code that directly interacts with HDFS Native 
client.
+#[derive(Clone)]
+pub struct HdfsNativeCore {
+    pub info: Arc<AccessorInfo>,
+    pub root: String,
+    pub client: Arc<hdfs_native::Client>,
+    pub enable_append: bool,
+}
+
+impl Debug for HdfsNativeCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("HdfsNativeCore")
+            .field("root", &self.root)
+            .field("enable_append", &self.enable_append)
+            .finish_non_exhaustive()
+    }
+}
+
+impl HdfsNativeCore {
+    pub async fn hdfs_create_dir(&self, path: &str) -> Result<()> {
+        let p = build_rooted_abs_path(&self.root, path);
+
+        self.client
+            .mkdirs(&p, 0o777, true)
+            .await
+            .map_err(parse_hdfs_error)?;
+
+        Ok(())
+    }
+
+    pub async fn hdfs_stat(&self, path: &str) -> Result<Metadata> {
+        let p = build_rooted_abs_path(&self.root, path);
+
+        let status: hdfs_native::client::FileStatus = self
+            .client
+            .get_file_info(&p)
+            .await
+            .map_err(parse_hdfs_error)?;
+
+        let mode = if status.isdir {
+            EntryMode::DIR
+        } else {
+            EntryMode::FILE
+        };
+
+        let mut metadata = Metadata::new(mode);
+        metadata
+            .set_last_modified(Timestamp::from_millisecond(
+                status.modification_time as i64,
+            )?)
+            .set_content_length(status.length as u64);
+
+        Ok(metadata)
+    }
+
+    pub async fn hdfs_read(
+        &self,
+        path: &str,
+        args: &OpRead,
+    ) -> Result<(hdfs_native::file::FileReader, u64, u64)> {
+        let p = build_rooted_abs_path(&self.root, path);
+
+        let f = self.client.read(&p).await.map_err(parse_hdfs_error)?;
+
+        let offset = args.range().offset();
+        let size = args.range().size().unwrap_or(u64::MAX);
+
+        Ok((f, offset, size))
+    }
+
+    pub async fn hdfs_write(
+        &self,
+        path: &str,
+        args: &OpWrite,
+    ) -> Result<(hdfs_native::file::FileWriter, u64)> {
+        let target_path = build_rooted_abs_path(&self.root, path);
+        let mut initial_size = 0;
+
+        let target_exists = match 
self.client.get_file_info(&target_path).await {
+            Ok(status) => {
+                initial_size = status.length as u64;
+                true
+            }
+            Err(err) => match &err {
+                HdfsError::FileNotFound(_) => false,
+                _ => return Err(parse_hdfs_error(err)),
+            },
+        };
+
+        let f = if target_exists {
+            if args.append() {
+                assert!(self.enable_append, "append is not enabled");
+                self.client
+                    .append(&target_path)
+                    .await
+                    .map_err(parse_hdfs_error)?
+            } else {
+                initial_size = 0;
+                self.client
+                    .create(&target_path, 
WriteOptions::default().overwrite(true))
+                    .await
+                    .map_err(parse_hdfs_error)?
+            }
+        } else {
+            initial_size = 0;
+            self.client
+                .create(&target_path, WriteOptions::default())
+                .await
+                .map_err(parse_hdfs_error)?
+        };
+
+        Ok((f, initial_size))
+    }
+
+    pub async fn hdfs_delete(&self, path: &str) -> Result<()> {
+        let p = build_rooted_abs_path(&self.root, path);
+
+        self.client
+            .delete(&p, true)
+            .await
+            .map_err(parse_hdfs_error)?;
+
+        Ok(())
+    }
+
+    pub async fn hdfs_list(&self, path: &str) -> Result<Option<(String, 
Option<String>)>> {
+        let p: String = build_rooted_abs_path(&self.root, path);
+
+        let isdir = match self.client.get_file_info(&p).await {
+            Ok(status) => status.isdir,
+            Err(err) => {
+                return match &err {
+                    HdfsError::FileNotFound(_) => Ok(None),
+                    _ => Err(parse_hdfs_error(err)),
+                };
+            }
+        };
+
+        let current_path = if isdir {
+            if !path.ends_with("/") {
+                Some(path.to_string() + "/")
+            } else {
+                Some(path.to_string())
+            }
+        } else {
+            None
+        };
+
+        Ok(Some((p, current_path)))
+    }
+
+    pub async fn hdfs_rename(&self, from: &str, to: &str) -> Result<()> {
+        let from_path = build_rooted_abs_path(&self.root, from);
+        let to_path = build_rooted_abs_path(&self.root, to);
+
+        match self.client.get_file_info(&to_path).await {
+            Ok(status) => {
+                if status.isdir {
+                    return Err(Error::new(ErrorKind::IsADirectory, "path 
should be a file")
+                        .with_context("input", &to_path));
+                } else {
+                    self.client
+                        .delete(&to_path, true)
+                        .await
+                        .map_err(parse_hdfs_error)?;
+                }
+            }
+            Err(err) => match &err {
+                HdfsError::FileNotFound(_) => {
+                    self.client
+                        .create(&to_path, 
WriteOptions::default().create_parent(true))
+                        .await
+                        .map_err(parse_hdfs_error)?;
+                }
+                _ => return Err(parse_hdfs_error(err)),
+            },
+        };
+
+        self.client
+            .rename(&from_path, &to_path, true)
+            .await
+            .map_err(parse_hdfs_error)?;
+
+        Ok(())
+    }
+}
diff --git a/core/src/services/hdfs_native/delete.rs 
b/core/src/services/hdfs_native/delete.rs
index 84888b0e8..648f15346 100644
--- a/core/src/services/hdfs_native/delete.rs
+++ b/core/src/services/hdfs_native/delete.rs
@@ -17,31 +17,23 @@
 
 use std::sync::Arc;
 
-use super::backend::HdfsNativeBackend;
-use super::error::parse_hdfs_error;
+use super::core::HdfsNativeCore;
 use crate::raw::*;
 use crate::*;
 
 pub struct HdfsNativeDeleter {
-    core: Arc<HdfsNativeBackend>,
+    core: Arc<HdfsNativeCore>,
 }
 
 impl HdfsNativeDeleter {
-    pub fn new(core: Arc<HdfsNativeBackend>) -> Self {
+    pub fn new(core: Arc<HdfsNativeCore>) -> Self {
         Self { core }
     }
 }
 
 impl oio::OneShotDelete for HdfsNativeDeleter {
     async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
-        let p = build_rooted_abs_path(&self.core.root, &path);
-
-        self.core
-            .client
-            .delete(&p, true)
-            .await
-            .map_err(parse_hdfs_error)?;
-
+        self.core.hdfs_delete(&path).await?;
         Ok(())
     }
 }
diff --git a/core/src/services/hdfs_native/mod.rs 
b/core/src/services/hdfs_native/mod.rs
index 625971426..dfc3fe7b1 100644
--- a/core/src/services/hdfs_native/mod.rs
+++ b/core/src/services/hdfs_native/mod.rs
@@ -17,6 +17,7 @@
 
 /// Default scheme for hdfs_native service.
 pub(super) const HDFS_NATIVE_SCHEME: &str = "hdfs_native";
+mod core;
 mod delete;
 mod error;
 mod lister;

Reply via email to