This is an automated email from the ASF dual-hosted git repository. kingsword09 pushed a commit to branch refactor-hdfs-native in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 3262da506d88df55f50db4e18246aeeccd0c2392 Author: Kingsword <[email protected]> AuthorDate: Fri Oct 24 22:12:45 2025 +0800 refactor(hdfs-native): restructure HdfsNativeBackend and introduce HdfsNativeCore --- core/src/services/hdfs_native/backend.rs | 221 ++++++++----------------------- core/src/services/hdfs_native/core.rs | 214 ++++++++++++++++++++++++++++++ core/src/services/hdfs_native/delete.rs | 16 +-- core/src/services/hdfs_native/mod.rs | 1 + 4 files changed, 272 insertions(+), 180 deletions(-) diff --git a/core/src/services/hdfs_native/backend.rs b/core/src/services/hdfs_native/backend.rs index 70136e6a1..39ac787b6 100644 --- a/core/src/services/hdfs_native/backend.rs +++ b/core/src/services/hdfs_native/backend.rs @@ -19,11 +19,10 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; -use hdfs_native::HdfsError; -use hdfs_native::WriteOptions; use log::debug; use super::HDFS_NATIVE_SCHEME; +use super::core::HdfsNativeCore; use super::delete::HdfsNativeDeleter; use super::error::parse_hdfs_error; use super::lister::HdfsNativeLister; @@ -110,9 +109,37 @@ impl Builder for HdfsNativeBuilder { // need to check if root dir exists, create if not Ok(HdfsNativeBackend { - root, - client: Arc::new(client), - enable_append: self.config.enable_append, + core: Arc::new(HdfsNativeCore { + info: { + let am = AccessorInfo::default(); + am.set_scheme(HDFS_NATIVE_SCHEME) + .set_root(&root) + .set_native_capability(Capability { + stat: true, + + read: true, + + write: true, + write_can_append: self.config.enable_append, + + create_dir: true, + delete: true, + + list: true, + + rename: true, + + shared: true, + + ..Default::default() + }); + + am.into() + }, + root, + client: Arc::new(client), + enable_append: self.config.enable_append, + }), }) } } @@ -128,15 +155,9 @@ impl Builder for HdfsNativeBuilder { /// Backend for hdfs-native services. #[derive(Debug, Clone)] pub struct HdfsNativeBackend { - pub root: String, - pub client: Arc<hdfs_native::Client>, - enable_append: bool, + core: Arc<HdfsNativeCore>, } -/// hdfs_native::Client is thread-safe. -unsafe impl Send for HdfsNativeBackend {} -unsafe impl Sync for HdfsNativeBackend {} - impl Access for HdfsNativeBackend { type Reader = HdfsNativeReader; type Writer = HdfsNativeWriter; @@ -144,118 +165,29 @@ impl Access for HdfsNativeBackend { type Deleter = oio::OneShotDeleter<HdfsNativeDeleter>; fn info(&self) -> Arc<AccessorInfo> { - let am = AccessorInfo::default(); - am.set_scheme(HDFS_NATIVE_SCHEME) - .set_root(&self.root) - .set_native_capability(Capability { - stat: true, - - read: true, - - write: true, - write_can_append: self.enable_append, - - create_dir: true, - delete: true, - - list: true, - - rename: true, - - shared: true, - - ..Default::default() - }); - - am.into() + self.core.info.clone() } async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> { - let p = build_rooted_abs_path(&self.root, path); - - self.client - .mkdirs(&p, 0o777, true) - .await - .map_err(parse_hdfs_error)?; - + self.core.hdfs_create_dir(path).await?; Ok(RpCreateDir::default()) } async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> { - let p = build_rooted_abs_path(&self.root, path); - - let status: hdfs_native::client::FileStatus = self - .client - .get_file_info(&p) - .await - .map_err(parse_hdfs_error)?; - - let mode = if status.isdir { - EntryMode::DIR - } else { - EntryMode::FILE - }; - - let mut metadata = Metadata::new(mode); - metadata - .set_last_modified(Timestamp::from_millisecond( - status.modification_time as i64, - )?) - .set_content_length(status.length as u64); - - Ok(RpStat::new(metadata)) + let m = self.core.hdfs_stat(path).await?; + Ok(RpStat::new(m)) } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let p = build_rooted_abs_path(&self.root, path); + let (f, offset, size) = self.core.hdfs_read(path, &args).await?; - let f = self.client.read(&p).await.map_err(parse_hdfs_error)?; - - let r = HdfsNativeReader::new( - f, - args.range().offset() as _, - args.range().size().unwrap_or(u64::MAX) as _, - ); + let r = HdfsNativeReader::new(f, offset as _, size as _); Ok((RpRead::new(), r)) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let target_path = build_rooted_abs_path(&self.root, path); - let mut initial_size = 0; - - let target_exists = match self.client.get_file_info(&target_path).await { - Ok(status) => { - initial_size = status.length as u64; - true - } - Err(err) => match &err { - HdfsError::FileNotFound(_) => false, - _ => return Err(parse_hdfs_error(err)), - }, - }; - - let f = if target_exists { - if args.append() { - assert!(self.enable_append, "append is not enabled"); - self.client - .append(&target_path) - .await - .map_err(parse_hdfs_error)? - } else { - initial_size = 0; - self.client - .create(&target_path, WriteOptions::default().overwrite(true)) - .await - .map_err(parse_hdfs_error)? - } - } else { - initial_size = 0; - self.client - .create(&target_path, WriteOptions::default()) - .await - .map_err(parse_hdfs_error)? - }; + let (f, initial_size) = self.core.hdfs_write(path, &args).await?; Ok((RpWrite::new(), HdfsNativeWriter::new(f, initial_size))) } @@ -263,74 +195,27 @@ impl Access for HdfsNativeBackend { async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { Ok(( RpDelete::default(), - oio::OneShotDeleter::new(HdfsNativeDeleter::new(Arc::new(self.clone()))), + oio::OneShotDeleter::new(HdfsNativeDeleter::new(Arc::clone(&self.core))), )) } async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> { - let p: String = build_rooted_abs_path(&self.root, path); - - let isdir = match self.client.get_file_info(&p).await { - Ok(status) => status.isdir, - Err(err) => { - return match &err { - HdfsError::FileNotFound(_) => Ok((RpList::default(), None)), - _ => Err(parse_hdfs_error(err)), - }; - } - }; - let current_path = if isdir { - if !path.ends_with("/") { - Some(path.to_string() + "/") - } else { - Some(path.to_string()) - } - } else { - None - }; - - Ok(( - RpList::default(), - Some(HdfsNativeLister::new( - &self.root, - &self.client, - &p, - current_path, + match self.core.hdfs_list(path).await? { + Some((p, current_path)) => Ok(( + RpList::default(), + Some(HdfsNativeLister::new( + &self.core.root, + &self.core.client, + &p, + current_path, + )), )), - )) + None => Ok((RpList::default(), None)), + } } async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> { - let from_path = build_rooted_abs_path(&self.root, from); - let to_path = build_rooted_abs_path(&self.root, to); - match self.client.get_file_info(&to_path).await { - Ok(status) => { - if status.isdir { - return Err(Error::new(ErrorKind::IsADirectory, "path should be a file") - .with_context("input", &to_path)); - } else { - self.client - .delete(&to_path, true) - .await - .map_err(parse_hdfs_error)?; - } - } - Err(err) => match &err { - HdfsError::FileNotFound(_) => { - self.client - .create(&to_path, WriteOptions::default().create_parent(true)) - .await - .map_err(parse_hdfs_error)?; - } - _ => return Err(parse_hdfs_error(err)), - }, - }; - - self.client - .rename(&from_path, &to_path, true) - .await - .map_err(parse_hdfs_error)?; - + self.core.hdfs_rename(from, to).await?; Ok(RpRename::default()) } } diff --git a/core/src/services/hdfs_native/core.rs b/core/src/services/hdfs_native/core.rs new file mode 100644 index 000000000..353458ee5 --- /dev/null +++ b/core/src/services/hdfs_native/core.rs @@ -0,0 +1,214 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use hdfs_native::HdfsError; +use hdfs_native::WriteOptions; + +use super::error::parse_hdfs_error; +use crate::raw::*; +use crate::*; + +/// HdfsNativeCore contains code that directly interacts with HDFS Native client. +#[derive(Clone)] +pub struct HdfsNativeCore { + pub info: Arc<AccessorInfo>, + pub root: String, + pub client: Arc<hdfs_native::Client>, + pub enable_append: bool, +} + +impl Debug for HdfsNativeCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HdfsNativeCore") + .field("root", &self.root) + .field("enable_append", &self.enable_append) + .finish_non_exhaustive() + } +} + +impl HdfsNativeCore { + pub async fn hdfs_create_dir(&self, path: &str) -> Result<()> { + let p = build_rooted_abs_path(&self.root, path); + + self.client + .mkdirs(&p, 0o777, true) + .await + .map_err(parse_hdfs_error)?; + + Ok(()) + } + + pub async fn hdfs_stat(&self, path: &str) -> Result<Metadata> { + let p = build_rooted_abs_path(&self.root, path); + + let status: hdfs_native::client::FileStatus = self + .client + .get_file_info(&p) + .await + .map_err(parse_hdfs_error)?; + + let mode = if status.isdir { + EntryMode::DIR + } else { + EntryMode::FILE + }; + + let mut metadata = Metadata::new(mode); + metadata + .set_last_modified(Timestamp::from_millisecond( + status.modification_time as i64, + )?) + .set_content_length(status.length as u64); + + Ok(metadata) + } + + pub async fn hdfs_read( + &self, + path: &str, + args: &OpRead, + ) -> Result<(hdfs_native::file::FileReader, u64, u64)> { + let p = build_rooted_abs_path(&self.root, path); + + let f = self.client.read(&p).await.map_err(parse_hdfs_error)?; + + let offset = args.range().offset(); + let size = args.range().size().unwrap_or(u64::MAX); + + Ok((f, offset, size)) + } + + pub async fn hdfs_write( + &self, + path: &str, + args: &OpWrite, + ) -> Result<(hdfs_native::file::FileWriter, u64)> { + let target_path = build_rooted_abs_path(&self.root, path); + let mut initial_size = 0; + + let target_exists = match self.client.get_file_info(&target_path).await { + Ok(status) => { + initial_size = status.length as u64; + true + } + Err(err) => match &err { + HdfsError::FileNotFound(_) => false, + _ => return Err(parse_hdfs_error(err)), + }, + }; + + let f = if target_exists { + if args.append() { + assert!(self.enable_append, "append is not enabled"); + self.client + .append(&target_path) + .await + .map_err(parse_hdfs_error)? + } else { + initial_size = 0; + self.client + .create(&target_path, WriteOptions::default().overwrite(true)) + .await + .map_err(parse_hdfs_error)? + } + } else { + initial_size = 0; + self.client + .create(&target_path, WriteOptions::default()) + .await + .map_err(parse_hdfs_error)? + }; + + Ok((f, initial_size)) + } + + pub async fn hdfs_delete(&self, path: &str) -> Result<()> { + let p = build_rooted_abs_path(&self.root, path); + + self.client + .delete(&p, true) + .await + .map_err(parse_hdfs_error)?; + + Ok(()) + } + + pub async fn hdfs_list(&self, path: &str) -> Result<Option<(String, Option<String>)>> { + let p: String = build_rooted_abs_path(&self.root, path); + + let isdir = match self.client.get_file_info(&p).await { + Ok(status) => status.isdir, + Err(err) => { + return match &err { + HdfsError::FileNotFound(_) => Ok(None), + _ => Err(parse_hdfs_error(err)), + }; + } + }; + + let current_path = if isdir { + if !path.ends_with("/") { + Some(path.to_string() + "/") + } else { + Some(path.to_string()) + } + } else { + None + }; + + Ok(Some((p, current_path))) + } + + pub async fn hdfs_rename(&self, from: &str, to: &str) -> Result<()> { + let from_path = build_rooted_abs_path(&self.root, from); + let to_path = build_rooted_abs_path(&self.root, to); + + match self.client.get_file_info(&to_path).await { + Ok(status) => { + if status.isdir { + return Err(Error::new(ErrorKind::IsADirectory, "path should be a file") + .with_context("input", &to_path)); + } else { + self.client + .delete(&to_path, true) + .await + .map_err(parse_hdfs_error)?; + } + } + Err(err) => match &err { + HdfsError::FileNotFound(_) => { + self.client + .create(&to_path, WriteOptions::default().create_parent(true)) + .await + .map_err(parse_hdfs_error)?; + } + _ => return Err(parse_hdfs_error(err)), + }, + }; + + self.client + .rename(&from_path, &to_path, true) + .await + .map_err(parse_hdfs_error)?; + + Ok(()) + } +} diff --git a/core/src/services/hdfs_native/delete.rs b/core/src/services/hdfs_native/delete.rs index 84888b0e8..648f15346 100644 --- a/core/src/services/hdfs_native/delete.rs +++ b/core/src/services/hdfs_native/delete.rs @@ -17,31 +17,23 @@ use std::sync::Arc; -use super::backend::HdfsNativeBackend; -use super::error::parse_hdfs_error; +use super::core::HdfsNativeCore; use crate::raw::*; use crate::*; pub struct HdfsNativeDeleter { - core: Arc<HdfsNativeBackend>, + core: Arc<HdfsNativeCore>, } impl HdfsNativeDeleter { - pub fn new(core: Arc<HdfsNativeBackend>) -> Self { + pub fn new(core: Arc<HdfsNativeCore>) -> Self { Self { core } } } impl oio::OneShotDelete for HdfsNativeDeleter { async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { - let p = build_rooted_abs_path(&self.core.root, &path); - - self.core - .client - .delete(&p, true) - .await - .map_err(parse_hdfs_error)?; - + self.core.hdfs_delete(&path).await?; Ok(()) } } diff --git a/core/src/services/hdfs_native/mod.rs b/core/src/services/hdfs_native/mod.rs index 625971426..dfc3fe7b1 100644 --- a/core/src/services/hdfs_native/mod.rs +++ b/core/src/services/hdfs_native/mod.rs @@ -17,6 +17,7 @@ /// Default scheme for hdfs_native service. pub(super) const HDFS_NATIVE_SCHEME: &str = "hdfs_native"; +mod core; mod delete; mod error; mod lister;
