This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new a3c2e288f refactor(hdfs): restructure HdfsBackend and introduce
HdfsCore (#6736)
a3c2e288f is described below
commit a3c2e288f341ae6cd8ae46ec2dd55fdab36e760a
Author: Kingsword <[email protected]>
AuthorDate: Fri Oct 24 23:19:12 2025 +0800
refactor(hdfs): restructure HdfsBackend and introduce HdfsCore (#6736)
---
core/src/services/hdfs/backend.rs | 212 +++++++-------------------------------
core/src/services/hdfs/core.rs | 206 ++++++++++++++++++++++++++++++++++++
core/src/services/hdfs/delete.rs | 6 +-
core/src/services/hdfs/mod.rs | 1 +
4 files changed, 250 insertions(+), 175 deletions(-)
diff --git a/core/src/services/hdfs/backend.rs
b/core/src/services/hdfs/backend.rs
index b3705e4c0..7a70f4e3f 100644
--- a/core/src/services/hdfs/backend.rs
+++ b/core/src/services/hdfs/backend.rs
@@ -18,13 +18,12 @@
use std::fmt::Debug;
use std::fmt::Formatter;
use std::io;
-use std::io::SeekFrom;
-use std::path::PathBuf;
use std::sync::Arc;
use log::debug;
use super::HDFS_SCHEME;
+use super::core::HdfsCore;
use super::delete::HdfsDeleter;
use super::lister::HdfsLister;
use super::reader::HdfsReader;
@@ -165,35 +164,37 @@ impl Builder for HdfsBuilder {
}
Ok(HdfsBackend {
- info: {
- let am = AccessorInfo::default();
- am.set_scheme(HDFS_SCHEME)
- .set_root(&root)
- .set_native_capability(Capability {
- stat: true,
+ core: Arc::new(HdfsCore {
+ info: {
+ let am = AccessorInfo::default();
+ am.set_scheme(HDFS_SCHEME)
+ .set_root(&root)
+ .set_native_capability(Capability {
+ stat: true,
- read: true,
+ read: true,
- write: true,
- write_can_append: self.config.enable_append,
+ write: true,
+ write_can_append: self.config.enable_append,
- create_dir: true,
- delete: true,
+ create_dir: true,
+ delete: true,
- list: true,
+ list: true,
- rename: true,
+ rename: true,
- shared: true,
+ shared: true,
- ..Default::default()
- });
+ ..Default::default()
+ });
- am.into()
- },
- root,
- atomic_write_dir,
- client: Arc::new(client),
+ am.into()
+ },
+ root,
+ atomic_write_dir,
+ client: Arc::new(client),
+ }),
})
}
}
@@ -201,16 +202,9 @@ impl Builder for HdfsBuilder {
/// Backend for hdfs services.
#[derive(Debug, Clone)]
pub struct HdfsBackend {
- pub info: Arc<AccessorInfo>,
- pub root: String,
- atomic_write_dir: Option<String>,
- pub client: Arc<hdrs::Client>,
+ core: Arc<HdfsCore>,
}
-/// hdrs::Client is thread-safe.
-unsafe impl Send for HdfsBackend {}
-unsafe impl Sync for HdfsBackend {}
-
impl Access for HdfsBackend {
type Reader = HdfsReader<hdrs::AsyncFile>;
type Writer = HdfsWriter<hdrs::AsyncFile>;
@@ -218,54 +212,21 @@ impl Access for HdfsBackend {
type Deleter = oio::OneShotDeleter<HdfsDeleter>;
fn info(&self) -> Arc<AccessorInfo> {
- self.info.clone()
+ self.core.info.clone()
}
async fn create_dir(&self, path: &str, _: OpCreateDir) ->
Result<RpCreateDir> {
- let p = build_rooted_abs_path(&self.root, path);
-
- self.client.create_dir(&p).map_err(new_std_io_error)?;
-
+ self.core.hdfs_create_dir(path)?;
Ok(RpCreateDir::default())
}
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
- let p = build_rooted_abs_path(&self.root, path);
-
- let meta = self.client.metadata(&p).map_err(new_std_io_error)?;
-
- let mode = if meta.is_dir() {
- EntryMode::DIR
- } else if meta.is_file() {
- EntryMode::FILE
- } else {
- EntryMode::Unknown
- };
- let mut m = Metadata::new(mode);
- m.set_content_length(meta.len());
- m.set_last_modified(Timestamp::try_from(meta.modified())?);
-
+ let m = self.core.hdfs_stat(path)?;
Ok(RpStat::new(m))
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
- let p = build_rooted_abs_path(&self.root, path);
-
- let client = self.client.clone();
- let mut f = client
- .open_file()
- .read(true)
- .async_open(&p)
- .await
- .map_err(new_std_io_error)?;
-
- if args.range().offset() != 0 {
- use futures::AsyncSeekExt;
-
- f.seek(SeekFrom::Start(args.range().offset()))
- .await
- .map_err(new_std_io_error)?;
- }
+ let f = self.core.hdfs_read(path, &args).await?;
Ok((
RpRead::new(),
@@ -274,50 +235,8 @@ impl Access for HdfsBackend {
}
async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- let target_path = build_rooted_abs_path(&self.root, path);
- let mut initial_size = 0;
- let target_exists = match self.client.metadata(&target_path) {
- Ok(meta) => {
- initial_size = meta.len();
- true
- }
- Err(err) => {
- if err.kind() != io::ErrorKind::NotFound {
- return Err(new_std_io_error(err));
- }
- false
- }
- };
-
- let should_append = op.append() && target_exists;
- let tmp_path =
self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| {
- // If the target file exists, we should append to the end of it
directly.
- (!should_append).then_some(build_rooted_abs_path(
- atomic_write_dir,
- &build_tmp_path_of(path),
- ))
- });
-
- if !target_exists {
- let parent = get_parent(&target_path);
- self.client.create_dir(parent).map_err(new_std_io_error)?;
- }
- if !should_append {
- initial_size = 0;
- }
-
- let mut open_options = self.client.open_file();
- open_options.create(true);
- if should_append {
- open_options.append(true);
- } else {
- open_options.write(true);
- }
-
- let f = open_options
- .async_open(tmp_path.as_ref().unwrap_or(&target_path))
- .await
- .map_err(new_std_io_error)?;
+ let (target_path, tmp_path, f, target_exists, initial_size) =
+ self.core.hdfs_write(path, &op).await?;
Ok((
RpWrite::new(),
@@ -325,7 +244,7 @@ impl Access for HdfsBackend {
target_path,
tmp_path,
f,
- Arc::clone(&self.client),
+ Arc::clone(&self.core.client),
target_exists,
initial_size,
),
@@ -335,73 +254,22 @@ impl Access for HdfsBackend {
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
Ok((
RpDelete::default(),
- oio::OneShotDeleter::new(HdfsDeleter::new(Arc::new(self.clone()))),
+ oio::OneShotDeleter::new(HdfsDeleter::new(Arc::clone(&self.core))),
))
}
async fn list(&self, path: &str, _: OpList) -> Result<(RpList,
Self::Lister)> {
- let p = build_rooted_abs_path(&self.root, path);
-
- let f = match self.client.read_dir(&p) {
- Ok(f) => f,
- Err(e) => {
- return if e.kind() == io::ErrorKind::NotFound {
- Ok((RpList::default(), None))
- } else {
- Err(new_std_io_error(e))
- };
+ match self.core.hdfs_list(path)? {
+ Some(f) => {
+ let rd = HdfsLister::new(&self.core.root, f, path);
+ Ok((RpList::default(), Some(rd)))
}
- };
-
- let rd = HdfsLister::new(&self.root, f, path);
-
- Ok((RpList::default(), Some(rd)))
+ None => Ok((RpList::default(), None)),
+ }
}
async fn rename(&self, from: &str, to: &str, _args: OpRename) ->
Result<RpRename> {
- let from_path = build_rooted_abs_path(&self.root, from);
- self.client.metadata(&from_path).map_err(new_std_io_error)?;
-
- let to_path = build_rooted_abs_path(&self.root, to);
- let result = self.client.metadata(&to_path);
- match result {
- Err(err) => {
- // Early return if other error happened.
- if err.kind() != io::ErrorKind::NotFound {
- return Err(new_std_io_error(err));
- }
-
- let parent = PathBuf::from(&to_path)
- .parent()
- .ok_or_else(|| {
- Error::new(
- ErrorKind::Unexpected,
- "path should have parent but not, it must be
malformed",
- )
- .with_context("input", &to_path)
- })?
- .to_path_buf();
-
- self.client
- .create_dir(&parent.to_string_lossy())
- .map_err(new_std_io_error)?;
- }
- Ok(metadata) => {
- if metadata.is_file() {
- self.client
- .remove_file(&to_path)
- .map_err(new_std_io_error)?;
- } else {
- return Err(Error::new(ErrorKind::IsADirectory, "path
should be a file")
- .with_context("input", &to_path));
- }
- }
- }
-
- self.client
- .rename_file(&from_path, &to_path)
- .map_err(new_std_io_error)?;
-
+ self.core.hdfs_rename(from, to)?;
Ok(RpRename::new())
}
}
diff --git a/core/src/services/hdfs/core.rs b/core/src/services/hdfs/core.rs
new file mode 100644
index 000000000..6e8e08bdc
--- /dev/null
+++ b/core/src/services/hdfs/core.rs
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::io;
+use std::io::SeekFrom;
+use std::sync::Arc;
+
+use crate::raw::*;
+use crate::*;
+
+/// HdfsCore contains code that directly interacts with HDFS.
+#[derive(Clone)]
+pub struct HdfsCore {
+ pub info: Arc<AccessorInfo>,
+ pub root: String,
+ pub atomic_write_dir: Option<String>,
+ pub client: Arc<hdrs::Client>,
+}
+
+impl Debug for HdfsCore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("HdfsCore")
+ .field("root", &self.root)
+ .field("atomic_write_dir", &self.atomic_write_dir)
+ .finish_non_exhaustive()
+ }
+}
+
+impl HdfsCore {
+ pub fn hdfs_create_dir(&self, path: &str) -> Result<()> {
+ let p = build_rooted_abs_path(&self.root, path);
+ self.client.create_dir(&p).map_err(new_std_io_error)?;
+ Ok(())
+ }
+
+ pub fn hdfs_stat(&self, path: &str) -> Result<Metadata> {
+ let p = build_rooted_abs_path(&self.root, path);
+ let meta = self.client.metadata(&p).map_err(new_std_io_error)?;
+
+ let mode = if meta.is_dir() {
+ EntryMode::DIR
+ } else if meta.is_file() {
+ EntryMode::FILE
+ } else {
+ EntryMode::Unknown
+ };
+ let mut m = Metadata::new(mode);
+ m.set_content_length(meta.len());
+ m.set_last_modified(Timestamp::try_from(meta.modified())?);
+
+ Ok(m)
+ }
+
+ pub async fn hdfs_read(&self, path: &str, args: &OpRead) ->
Result<hdrs::AsyncFile> {
+ let p = build_rooted_abs_path(&self.root, path);
+
+ let client = self.client.clone();
+ let mut f = client
+ .open_file()
+ .read(true)
+ .async_open(&p)
+ .await
+ .map_err(new_std_io_error)?;
+
+ if args.range().offset() != 0 {
+ use futures::AsyncSeekExt;
+
+ f.seek(SeekFrom::Start(args.range().offset()))
+ .await
+ .map_err(new_std_io_error)?;
+ }
+
+ Ok(f)
+ }
+
+ pub async fn hdfs_write(
+ &self,
+ path: &str,
+ op: &OpWrite,
+ ) -> Result<(String, Option<String>, hdrs::AsyncFile, bool, u64)> {
+ let target_path = build_rooted_abs_path(&self.root, path);
+ let mut initial_size = 0;
+ let target_exists = match self.client.metadata(&target_path) {
+ Ok(meta) => {
+ initial_size = meta.len();
+ true
+ }
+ Err(err) => {
+ if err.kind() != io::ErrorKind::NotFound {
+ return Err(new_std_io_error(err));
+ }
+ false
+ }
+ };
+
+ let should_append = op.append() && target_exists;
+ let tmp_path =
self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| {
+ // If the target file exists, we should append to the end of it
directly.
+ (!should_append).then_some(build_rooted_abs_path(
+ atomic_write_dir,
+ &build_tmp_path_of(path),
+ ))
+ });
+
+ if !target_exists {
+ let parent = get_parent(&target_path);
+ self.client.create_dir(parent).map_err(new_std_io_error)?;
+ }
+ if !should_append {
+ initial_size = 0;
+ }
+
+ let mut open_options = self.client.open_file();
+ open_options.create(true);
+ if should_append {
+ open_options.append(true);
+ } else {
+ open_options.write(true);
+ }
+
+ let f = open_options
+ .async_open(tmp_path.as_ref().unwrap_or(&target_path))
+ .await
+ .map_err(new_std_io_error)?;
+
+ Ok((target_path, tmp_path, f, target_exists, initial_size))
+ }
+
+ pub fn hdfs_list(&self, path: &str) -> Result<Option<hdrs::Readdir>> {
+ let p = build_rooted_abs_path(&self.root, path);
+
+ match self.client.read_dir(&p) {
+ Ok(f) => Ok(Some(f)),
+ Err(e) => {
+ if e.kind() == io::ErrorKind::NotFound {
+ Ok(None)
+ } else {
+ Err(new_std_io_error(e))
+ }
+ }
+ }
+ }
+
+ pub fn hdfs_rename(&self, from: &str, to: &str) -> Result<()> {
+ let from_path = build_rooted_abs_path(&self.root, from);
+ self.client.metadata(&from_path).map_err(new_std_io_error)?;
+
+ let to_path = build_rooted_abs_path(&self.root, to);
+ let result = self.client.metadata(&to_path);
+ match result {
+ Err(err) => {
+ // Early return if other error happened.
+ if err.kind() != io::ErrorKind::NotFound {
+ return Err(new_std_io_error(err));
+ }
+
+ let parent = std::path::PathBuf::from(&to_path)
+ .parent()
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "path should have parent but not, it must be
malformed",
+ )
+ .with_context("input", &to_path)
+ })?
+ .to_path_buf();
+
+ self.client
+ .create_dir(&parent.to_string_lossy())
+ .map_err(new_std_io_error)?;
+ }
+ Ok(metadata) => {
+ if metadata.is_file() {
+ self.client
+ .remove_file(&to_path)
+ .map_err(new_std_io_error)?;
+ } else {
+ return Err(Error::new(ErrorKind::IsADirectory, "path
should be a file")
+ .with_context("input", &to_path));
+ }
+ }
+ }
+
+ self.client
+ .rename_file(&from_path, &to_path)
+ .map_err(new_std_io_error)?;
+
+ Ok(())
+ }
+}
diff --git a/core/src/services/hdfs/delete.rs b/core/src/services/hdfs/delete.rs
index a95478771..11471d794 100644
--- a/core/src/services/hdfs/delete.rs
+++ b/core/src/services/hdfs/delete.rs
@@ -18,16 +18,16 @@
use std::io;
use std::sync::Arc;
-use super::backend::HdfsBackend;
+use super::core::HdfsCore;
use crate::raw::*;
use crate::*;
pub struct HdfsDeleter {
- core: Arc<HdfsBackend>,
+ core: Arc<HdfsCore>,
}
impl HdfsDeleter {
- pub fn new(core: Arc<HdfsBackend>) -> Self {
+ pub fn new(core: Arc<HdfsCore>) -> Self {
Self { core }
}
}
diff --git a/core/src/services/hdfs/mod.rs b/core/src/services/hdfs/mod.rs
index 240d1474a..9b35636f3 100644
--- a/core/src/services/hdfs/mod.rs
+++ b/core/src/services/hdfs/mod.rs
@@ -17,6 +17,7 @@
/// Default scheme for hdfs service.
pub(super) const HDFS_SCHEME: &str = "hdfs";
+mod core;
mod delete;
mod lister;
mod reader;