This is an automated email from the ASF dual-hosted git repository. kingsword09 pushed a commit to branch refactor-hdfs in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 7a0415257e0947c644fd1d394c3339dacf2e1cd7 Author: Kingsword <[email protected]> AuthorDate: Fri Oct 24 21:51:52 2025 +0800 refactor(hdfs): restructure HdfsBackend and introduce HdfsCore --- core/src/services/hdfs/backend.rs | 212 +++++++------------------------------- core/src/services/hdfs/core.rs | 206 ++++++++++++++++++++++++++++++++++++ core/src/services/hdfs/delete.rs | 6 +- core/src/services/hdfs/mod.rs | 1 + 4 files changed, 250 insertions(+), 175 deletions(-) diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index b3705e4c0..7a70f4e3f 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -18,13 +18,12 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::io; -use std::io::SeekFrom; -use std::path::PathBuf; use std::sync::Arc; use log::debug; use super::HDFS_SCHEME; +use super::core::HdfsCore; use super::delete::HdfsDeleter; use super::lister::HdfsLister; use super::reader::HdfsReader; @@ -165,35 +164,37 @@ impl Builder for HdfsBuilder { } Ok(HdfsBackend { - info: { - let am = AccessorInfo::default(); - am.set_scheme(HDFS_SCHEME) - .set_root(&root) - .set_native_capability(Capability { - stat: true, + core: Arc::new(HdfsCore { + info: { + let am = AccessorInfo::default(); + am.set_scheme(HDFS_SCHEME) + .set_root(&root) + .set_native_capability(Capability { + stat: true, - read: true, + read: true, - write: true, - write_can_append: self.config.enable_append, + write: true, + write_can_append: self.config.enable_append, - create_dir: true, - delete: true, + create_dir: true, + delete: true, - list: true, + list: true, - rename: true, + rename: true, - shared: true, + shared: true, - ..Default::default() - }); + ..Default::default() + }); - am.into() - }, - root, - atomic_write_dir, - client: Arc::new(client), + am.into() + }, + root, + atomic_write_dir, + client: Arc::new(client), + }), }) } } @@ -201,16 +202,9 @@ impl Builder for HdfsBuilder { /// Backend for hdfs services. #[derive(Debug, Clone)] pub struct HdfsBackend { - pub info: Arc<AccessorInfo>, - pub root: String, - atomic_write_dir: Option<String>, - pub client: Arc<hdrs::Client>, + core: Arc<HdfsCore>, } -/// hdrs::Client is thread-safe. -unsafe impl Send for HdfsBackend {} -unsafe impl Sync for HdfsBackend {} - impl Access for HdfsBackend { type Reader = HdfsReader<hdrs::AsyncFile>; type Writer = HdfsWriter<hdrs::AsyncFile>; @@ -218,54 +212,21 @@ impl Access for HdfsBackend { type Deleter = oio::OneShotDeleter<HdfsDeleter>; fn info(&self) -> Arc<AccessorInfo> { - self.info.clone() + self.core.info.clone() } async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> { - let p = build_rooted_abs_path(&self.root, path); - - self.client.create_dir(&p).map_err(new_std_io_error)?; - + self.core.hdfs_create_dir(path)?; Ok(RpCreateDir::default()) } async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> { - let p = build_rooted_abs_path(&self.root, path); - - let meta = self.client.metadata(&p).map_err(new_std_io_error)?; - - let mode = if meta.is_dir() { - EntryMode::DIR - } else if meta.is_file() { - EntryMode::FILE - } else { - EntryMode::Unknown - }; - let mut m = Metadata::new(mode); - m.set_content_length(meta.len()); - m.set_last_modified(Timestamp::try_from(meta.modified())?); - + let m = self.core.hdfs_stat(path)?; Ok(RpStat::new(m)) } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let p = build_rooted_abs_path(&self.root, path); - - let client = self.client.clone(); - let mut f = client - .open_file() - .read(true) - .async_open(&p) - .await - .map_err(new_std_io_error)?; - - if args.range().offset() != 0 { - use futures::AsyncSeekExt; - - f.seek(SeekFrom::Start(args.range().offset())) - .await - .map_err(new_std_io_error)?; - } + let f = self.core.hdfs_read(path, &args).await?; Ok(( RpRead::new(), @@ -274,50 +235,8 @@ impl Access for HdfsBackend { } async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let target_path = build_rooted_abs_path(&self.root, path); - let mut initial_size = 0; - let target_exists = match self.client.metadata(&target_path) { - Ok(meta) => { - initial_size = meta.len(); - true - } - Err(err) => { - if err.kind() != io::ErrorKind::NotFound { - return Err(new_std_io_error(err)); - } - false - } - }; - - let should_append = op.append() && target_exists; - let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| { - // If the target file exists, we should append to the end of it directly. - (!should_append).then_some(build_rooted_abs_path( - atomic_write_dir, - &build_tmp_path_of(path), - )) - }); - - if !target_exists { - let parent = get_parent(&target_path); - self.client.create_dir(parent).map_err(new_std_io_error)?; - } - if !should_append { - initial_size = 0; - } - - let mut open_options = self.client.open_file(); - open_options.create(true); - if should_append { - open_options.append(true); - } else { - open_options.write(true); - } - - let f = open_options - .async_open(tmp_path.as_ref().unwrap_or(&target_path)) - .await - .map_err(new_std_io_error)?; + let (target_path, tmp_path, f, target_exists, initial_size) = + self.core.hdfs_write(path, &op).await?; Ok(( RpWrite::new(), @@ -325,7 +244,7 @@ impl Access for HdfsBackend { target_path, tmp_path, f, - Arc::clone(&self.client), + Arc::clone(&self.core.client), target_exists, initial_size, ), @@ -335,73 +254,22 @@ impl Access for HdfsBackend { async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { Ok(( RpDelete::default(), - oio::OneShotDeleter::new(HdfsDeleter::new(Arc::new(self.clone()))), + oio::OneShotDeleter::new(HdfsDeleter::new(Arc::clone(&self.core))), )) } async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { - let p = build_rooted_abs_path(&self.root, path); - - let f = match self.client.read_dir(&p) { - Ok(f) => f, - Err(e) => { - return if e.kind() == io::ErrorKind::NotFound { - Ok((RpList::default(), None)) - } else { - Err(new_std_io_error(e)) - }; + match self.core.hdfs_list(path)? { + Some(f) => { + let rd = HdfsLister::new(&self.core.root, f, path); + Ok((RpList::default(), Some(rd))) } - }; - - let rd = HdfsLister::new(&self.root, f, path); - - Ok((RpList::default(), Some(rd))) + None => Ok((RpList::default(), None)), + } } async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> { - let from_path = build_rooted_abs_path(&self.root, from); - self.client.metadata(&from_path).map_err(new_std_io_error)?; - - let to_path = build_rooted_abs_path(&self.root, to); - let result = self.client.metadata(&to_path); - match result { - Err(err) => { - // Early return if other error happened. - if err.kind() != io::ErrorKind::NotFound { - return Err(new_std_io_error(err)); - } - - let parent = PathBuf::from(&to_path) - .parent() - .ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "path should have parent but not, it must be malformed", - ) - .with_context("input", &to_path) - })? - .to_path_buf(); - - self.client - .create_dir(&parent.to_string_lossy()) - .map_err(new_std_io_error)?; - } - Ok(metadata) => { - if metadata.is_file() { - self.client - .remove_file(&to_path) - .map_err(new_std_io_error)?; - } else { - return Err(Error::new(ErrorKind::IsADirectory, "path should be a file") - .with_context("input", &to_path)); - } - } - } - - self.client - .rename_file(&from_path, &to_path) - .map_err(new_std_io_error)?; - + self.core.hdfs_rename(from, to)?; Ok(RpRename::new()) } } diff --git a/core/src/services/hdfs/core.rs b/core/src/services/hdfs/core.rs new file mode 100644 index 000000000..6e8e08bdc --- /dev/null +++ b/core/src/services/hdfs/core.rs @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; +use std::io; +use std::io::SeekFrom; +use std::sync::Arc; + +use crate::raw::*; +use crate::*; + +/// HdfsCore contains code that directly interacts with HDFS. +#[derive(Clone)] +pub struct HdfsCore { + pub info: Arc<AccessorInfo>, + pub root: String, + pub atomic_write_dir: Option<String>, + pub client: Arc<hdrs::Client>, +} + +impl Debug for HdfsCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HdfsCore") + .field("root", &self.root) + .field("atomic_write_dir", &self.atomic_write_dir) + .finish_non_exhaustive() + } +} + +impl HdfsCore { + pub fn hdfs_create_dir(&self, path: &str) -> Result<()> { + let p = build_rooted_abs_path(&self.root, path); + self.client.create_dir(&p).map_err(new_std_io_error)?; + Ok(()) + } + + pub fn hdfs_stat(&self, path: &str) -> Result<Metadata> { + let p = build_rooted_abs_path(&self.root, path); + let meta = self.client.metadata(&p).map_err(new_std_io_error)?; + + let mode = if meta.is_dir() { + EntryMode::DIR + } else if meta.is_file() { + EntryMode::FILE + } else { + EntryMode::Unknown + }; + let mut m = Metadata::new(mode); + m.set_content_length(meta.len()); + m.set_last_modified(Timestamp::try_from(meta.modified())?); + + Ok(m) + } + + pub async fn hdfs_read(&self, path: &str, args: &OpRead) -> Result<hdrs::AsyncFile> { + let p = build_rooted_abs_path(&self.root, path); + + let client = self.client.clone(); + let mut f = client + .open_file() + .read(true) + .async_open(&p) + .await + .map_err(new_std_io_error)?; + + if args.range().offset() != 0 { + use futures::AsyncSeekExt; + + f.seek(SeekFrom::Start(args.range().offset())) + .await + .map_err(new_std_io_error)?; + } + + Ok(f) + } + + pub async fn hdfs_write( + &self, + path: &str, + op: &OpWrite, + ) -> Result<(String, Option<String>, hdrs::AsyncFile, bool, u64)> { + let target_path = build_rooted_abs_path(&self.root, path); + let mut initial_size = 0; + let target_exists = match self.client.metadata(&target_path) { + Ok(meta) => { + initial_size = meta.len(); + true + } + Err(err) => { + if err.kind() != io::ErrorKind::NotFound { + return Err(new_std_io_error(err)); + } + false + } + }; + + let should_append = op.append() && target_exists; + let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| { + // If the target file exists, we should append to the end of it directly. + (!should_append).then_some(build_rooted_abs_path( + atomic_write_dir, + &build_tmp_path_of(path), + )) + }); + + if !target_exists { + let parent = get_parent(&target_path); + self.client.create_dir(parent).map_err(new_std_io_error)?; + } + if !should_append { + initial_size = 0; + } + + let mut open_options = self.client.open_file(); + open_options.create(true); + if should_append { + open_options.append(true); + } else { + open_options.write(true); + } + + let f = open_options + .async_open(tmp_path.as_ref().unwrap_or(&target_path)) + .await + .map_err(new_std_io_error)?; + + Ok((target_path, tmp_path, f, target_exists, initial_size)) + } + + pub fn hdfs_list(&self, path: &str) -> Result<Option<hdrs::Readdir>> { + let p = build_rooted_abs_path(&self.root, path); + + match self.client.read_dir(&p) { + Ok(f) => Ok(Some(f)), + Err(e) => { + if e.kind() == io::ErrorKind::NotFound { + Ok(None) + } else { + Err(new_std_io_error(e)) + } + } + } + } + + pub fn hdfs_rename(&self, from: &str, to: &str) -> Result<()> { + let from_path = build_rooted_abs_path(&self.root, from); + self.client.metadata(&from_path).map_err(new_std_io_error)?; + + let to_path = build_rooted_abs_path(&self.root, to); + let result = self.client.metadata(&to_path); + match result { + Err(err) => { + // Early return if other error happened. + if err.kind() != io::ErrorKind::NotFound { + return Err(new_std_io_error(err)); + } + + let parent = std::path::PathBuf::from(&to_path) + .parent() + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "path should have parent but not, it must be malformed", + ) + .with_context("input", &to_path) + })? + .to_path_buf(); + + self.client + .create_dir(&parent.to_string_lossy()) + .map_err(new_std_io_error)?; + } + Ok(metadata) => { + if metadata.is_file() { + self.client + .remove_file(&to_path) + .map_err(new_std_io_error)?; + } else { + return Err(Error::new(ErrorKind::IsADirectory, "path should be a file") + .with_context("input", &to_path)); + } + } + } + + self.client + .rename_file(&from_path, &to_path) + .map_err(new_std_io_error)?; + + Ok(()) + } +} diff --git a/core/src/services/hdfs/delete.rs b/core/src/services/hdfs/delete.rs index a95478771..11471d794 100644 --- a/core/src/services/hdfs/delete.rs +++ b/core/src/services/hdfs/delete.rs @@ -18,16 +18,16 @@ use std::io; use std::sync::Arc; -use super::backend::HdfsBackend; +use super::core::HdfsCore; use crate::raw::*; use crate::*; pub struct HdfsDeleter { - core: Arc<HdfsBackend>, + core: Arc<HdfsCore>, } impl HdfsDeleter { - pub fn new(core: Arc<HdfsBackend>) -> Self { + pub fn new(core: Arc<HdfsCore>) -> Self { Self { core } } } diff --git a/core/src/services/hdfs/mod.rs b/core/src/services/hdfs/mod.rs index 240d1474a..9b35636f3 100644 --- a/core/src/services/hdfs/mod.rs +++ b/core/src/services/hdfs/mod.rs @@ -17,6 +17,7 @@ /// Default scheme for hdfs service. pub(super) const HDFS_SCHEME: &str = "hdfs"; +mod core; mod delete; mod lister; mod reader;
