This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 9746efca6 refactor(hdfs-native): restructure HdfsNativeBackend and
introduce HdfsNativeCore (#6737)
9746efca6 is described below
commit 9746efca6aaa95776d467e7e5e88c5ec93dfd00d
Author: Kingsword <[email protected]>
AuthorDate: Fri Oct 24 23:19:40 2025 +0800
refactor(hdfs-native): restructure HdfsNativeBackend and introduce
HdfsNativeCore (#6737)
---
core/src/services/hdfs_native/backend.rs | 221 ++++++++-----------------------
core/src/services/hdfs_native/core.rs | 214 ++++++++++++++++++++++++++++++
core/src/services/hdfs_native/delete.rs | 16 +--
core/src/services/hdfs_native/mod.rs | 1 +
4 files changed, 272 insertions(+), 180 deletions(-)
diff --git a/core/src/services/hdfs_native/backend.rs
b/core/src/services/hdfs_native/backend.rs
index 70136e6a1..39ac787b6 100644
--- a/core/src/services/hdfs_native/backend.rs
+++ b/core/src/services/hdfs_native/backend.rs
@@ -19,11 +19,10 @@ use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
-use hdfs_native::HdfsError;
-use hdfs_native::WriteOptions;
use log::debug;
use super::HDFS_NATIVE_SCHEME;
+use super::core::HdfsNativeCore;
use super::delete::HdfsNativeDeleter;
use super::error::parse_hdfs_error;
use super::lister::HdfsNativeLister;
@@ -110,9 +109,37 @@ impl Builder for HdfsNativeBuilder {
// need to check if root dir exists, create if not
Ok(HdfsNativeBackend {
- root,
- client: Arc::new(client),
- enable_append: self.config.enable_append,
+ core: Arc::new(HdfsNativeCore {
+ info: {
+ let am = AccessorInfo::default();
+ am.set_scheme(HDFS_NATIVE_SCHEME)
+ .set_root(&root)
+ .set_native_capability(Capability {
+ stat: true,
+
+ read: true,
+
+ write: true,
+ write_can_append: self.config.enable_append,
+
+ create_dir: true,
+ delete: true,
+
+ list: true,
+
+ rename: true,
+
+ shared: true,
+
+ ..Default::default()
+ });
+
+ am.into()
+ },
+ root,
+ client: Arc::new(client),
+ enable_append: self.config.enable_append,
+ }),
})
}
}
@@ -128,15 +155,9 @@ impl Builder for HdfsNativeBuilder {
/// Backend for hdfs-native services.
#[derive(Debug, Clone)]
pub struct HdfsNativeBackend {
- pub root: String,
- pub client: Arc<hdfs_native::Client>,
- enable_append: bool,
+ core: Arc<HdfsNativeCore>,
}
-/// hdfs_native::Client is thread-safe.
-unsafe impl Send for HdfsNativeBackend {}
-unsafe impl Sync for HdfsNativeBackend {}
-
impl Access for HdfsNativeBackend {
type Reader = HdfsNativeReader;
type Writer = HdfsNativeWriter;
@@ -144,118 +165,29 @@ impl Access for HdfsNativeBackend {
type Deleter = oio::OneShotDeleter<HdfsNativeDeleter>;
fn info(&self) -> Arc<AccessorInfo> {
- let am = AccessorInfo::default();
- am.set_scheme(HDFS_NATIVE_SCHEME)
- .set_root(&self.root)
- .set_native_capability(Capability {
- stat: true,
-
- read: true,
-
- write: true,
- write_can_append: self.enable_append,
-
- create_dir: true,
- delete: true,
-
- list: true,
-
- rename: true,
-
- shared: true,
-
- ..Default::default()
- });
-
- am.into()
+ self.core.info.clone()
}
async fn create_dir(&self, path: &str, _args: OpCreateDir) ->
Result<RpCreateDir> {
- let p = build_rooted_abs_path(&self.root, path);
-
- self.client
- .mkdirs(&p, 0o777, true)
- .await
- .map_err(parse_hdfs_error)?;
-
+ self.core.hdfs_create_dir(path).await?;
Ok(RpCreateDir::default())
}
async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
- let p = build_rooted_abs_path(&self.root, path);
-
- let status: hdfs_native::client::FileStatus = self
- .client
- .get_file_info(&p)
- .await
- .map_err(parse_hdfs_error)?;
-
- let mode = if status.isdir {
- EntryMode::DIR
- } else {
- EntryMode::FILE
- };
-
- let mut metadata = Metadata::new(mode);
- metadata
- .set_last_modified(Timestamp::from_millisecond(
- status.modification_time as i64,
- )?)
- .set_content_length(status.length as u64);
-
- Ok(RpStat::new(metadata))
+ let m = self.core.hdfs_stat(path).await?;
+ Ok(RpStat::new(m))
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
- let p = build_rooted_abs_path(&self.root, path);
+ let (f, offset, size) = self.core.hdfs_read(path, &args).await?;
- let f = self.client.read(&p).await.map_err(parse_hdfs_error)?;
-
- let r = HdfsNativeReader::new(
- f,
- args.range().offset() as _,
- args.range().size().unwrap_or(u64::MAX) as _,
- );
+ let r = HdfsNativeReader::new(f, offset as _, size as _);
Ok((RpRead::new(), r))
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- let target_path = build_rooted_abs_path(&self.root, path);
- let mut initial_size = 0;
-
- let target_exists = match
self.client.get_file_info(&target_path).await {
- Ok(status) => {
- initial_size = status.length as u64;
- true
- }
- Err(err) => match &err {
- HdfsError::FileNotFound(_) => false,
- _ => return Err(parse_hdfs_error(err)),
- },
- };
-
- let f = if target_exists {
- if args.append() {
- assert!(self.enable_append, "append is not enabled");
- self.client
- .append(&target_path)
- .await
- .map_err(parse_hdfs_error)?
- } else {
- initial_size = 0;
- self.client
- .create(&target_path,
WriteOptions::default().overwrite(true))
- .await
- .map_err(parse_hdfs_error)?
- }
- } else {
- initial_size = 0;
- self.client
- .create(&target_path, WriteOptions::default())
- .await
- .map_err(parse_hdfs_error)?
- };
+ let (f, initial_size) = self.core.hdfs_write(path, &args).await?;
Ok((RpWrite::new(), HdfsNativeWriter::new(f, initial_size)))
}
@@ -263,74 +195,27 @@ impl Access for HdfsNativeBackend {
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
Ok((
RpDelete::default(),
-
oio::OneShotDeleter::new(HdfsNativeDeleter::new(Arc::new(self.clone()))),
+
oio::OneShotDeleter::new(HdfsNativeDeleter::new(Arc::clone(&self.core))),
))
}
async fn list(&self, path: &str, _args: OpList) -> Result<(RpList,
Self::Lister)> {
- let p: String = build_rooted_abs_path(&self.root, path);
-
- let isdir = match self.client.get_file_info(&p).await {
- Ok(status) => status.isdir,
- Err(err) => {
- return match &err {
- HdfsError::FileNotFound(_) => Ok((RpList::default(),
None)),
- _ => Err(parse_hdfs_error(err)),
- };
- }
- };
- let current_path = if isdir {
- if !path.ends_with("/") {
- Some(path.to_string() + "/")
- } else {
- Some(path.to_string())
- }
- } else {
- None
- };
-
- Ok((
- RpList::default(),
- Some(HdfsNativeLister::new(
- &self.root,
- &self.client,
- &p,
- current_path,
+ match self.core.hdfs_list(path).await? {
+ Some((p, current_path)) => Ok((
+ RpList::default(),
+ Some(HdfsNativeLister::new(
+ &self.core.root,
+ &self.core.client,
+ &p,
+ current_path,
+ )),
)),
- ))
+ None => Ok((RpList::default(), None)),
+ }
}
async fn rename(&self, from: &str, to: &str, _args: OpRename) ->
Result<RpRename> {
- let from_path = build_rooted_abs_path(&self.root, from);
- let to_path = build_rooted_abs_path(&self.root, to);
- match self.client.get_file_info(&to_path).await {
- Ok(status) => {
- if status.isdir {
- return Err(Error::new(ErrorKind::IsADirectory, "path
should be a file")
- .with_context("input", &to_path));
- } else {
- self.client
- .delete(&to_path, true)
- .await
- .map_err(parse_hdfs_error)?;
- }
- }
- Err(err) => match &err {
- HdfsError::FileNotFound(_) => {
- self.client
- .create(&to_path,
WriteOptions::default().create_parent(true))
- .await
- .map_err(parse_hdfs_error)?;
- }
- _ => return Err(parse_hdfs_error(err)),
- },
- };
-
- self.client
- .rename(&from_path, &to_path, true)
- .await
- .map_err(parse_hdfs_error)?;
-
+ self.core.hdfs_rename(from, to).await?;
Ok(RpRename::default())
}
}
diff --git a/core/src/services/hdfs_native/core.rs
b/core/src/services/hdfs_native/core.rs
new file mode 100644
index 000000000..353458ee5
--- /dev/null
+++ b/core/src/services/hdfs_native/core.rs
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use hdfs_native::HdfsError;
+use hdfs_native::WriteOptions;
+
+use super::error::parse_hdfs_error;
+use crate::raw::*;
+use crate::*;
+
+/// HdfsNativeCore contains code that directly interacts with HDFS Native
client.
+#[derive(Clone)]
+pub struct HdfsNativeCore {
+ pub info: Arc<AccessorInfo>,
+ pub root: String,
+ pub client: Arc<hdfs_native::Client>,
+ pub enable_append: bool,
+}
+
+impl Debug for HdfsNativeCore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("HdfsNativeCore")
+ .field("root", &self.root)
+ .field("enable_append", &self.enable_append)
+ .finish_non_exhaustive()
+ }
+}
+
+impl HdfsNativeCore {
+ pub async fn hdfs_create_dir(&self, path: &str) -> Result<()> {
+ let p = build_rooted_abs_path(&self.root, path);
+
+ self.client
+ .mkdirs(&p, 0o777, true)
+ .await
+ .map_err(parse_hdfs_error)?;
+
+ Ok(())
+ }
+
+ pub async fn hdfs_stat(&self, path: &str) -> Result<Metadata> {
+ let p = build_rooted_abs_path(&self.root, path);
+
+ let status: hdfs_native::client::FileStatus = self
+ .client
+ .get_file_info(&p)
+ .await
+ .map_err(parse_hdfs_error)?;
+
+ let mode = if status.isdir {
+ EntryMode::DIR
+ } else {
+ EntryMode::FILE
+ };
+
+ let mut metadata = Metadata::new(mode);
+ metadata
+ .set_last_modified(Timestamp::from_millisecond(
+ status.modification_time as i64,
+ )?)
+ .set_content_length(status.length as u64);
+
+ Ok(metadata)
+ }
+
+ pub async fn hdfs_read(
+ &self,
+ path: &str,
+ args: &OpRead,
+ ) -> Result<(hdfs_native::file::FileReader, u64, u64)> {
+ let p = build_rooted_abs_path(&self.root, path);
+
+ let f = self.client.read(&p).await.map_err(parse_hdfs_error)?;
+
+ let offset = args.range().offset();
+ let size = args.range().size().unwrap_or(u64::MAX);
+
+ Ok((f, offset, size))
+ }
+
+ pub async fn hdfs_write(
+ &self,
+ path: &str,
+ args: &OpWrite,
+ ) -> Result<(hdfs_native::file::FileWriter, u64)> {
+ let target_path = build_rooted_abs_path(&self.root, path);
+ let mut initial_size = 0;
+
+ let target_exists = match
self.client.get_file_info(&target_path).await {
+ Ok(status) => {
+ initial_size = status.length as u64;
+ true
+ }
+ Err(err) => match &err {
+ HdfsError::FileNotFound(_) => false,
+ _ => return Err(parse_hdfs_error(err)),
+ },
+ };
+
+ let f = if target_exists {
+ if args.append() {
+ assert!(self.enable_append, "append is not enabled");
+ self.client
+ .append(&target_path)
+ .await
+ .map_err(parse_hdfs_error)?
+ } else {
+ initial_size = 0;
+ self.client
+ .create(&target_path,
WriteOptions::default().overwrite(true))
+ .await
+ .map_err(parse_hdfs_error)?
+ }
+ } else {
+ initial_size = 0;
+ self.client
+ .create(&target_path, WriteOptions::default())
+ .await
+ .map_err(parse_hdfs_error)?
+ };
+
+ Ok((f, initial_size))
+ }
+
+ pub async fn hdfs_delete(&self, path: &str) -> Result<()> {
+ let p = build_rooted_abs_path(&self.root, path);
+
+ self.client
+ .delete(&p, true)
+ .await
+ .map_err(parse_hdfs_error)?;
+
+ Ok(())
+ }
+
+ pub async fn hdfs_list(&self, path: &str) -> Result<Option<(String,
Option<String>)>> {
+ let p: String = build_rooted_abs_path(&self.root, path);
+
+ let isdir = match self.client.get_file_info(&p).await {
+ Ok(status) => status.isdir,
+ Err(err) => {
+ return match &err {
+ HdfsError::FileNotFound(_) => Ok(None),
+ _ => Err(parse_hdfs_error(err)),
+ };
+ }
+ };
+
+ let current_path = if isdir {
+ if !path.ends_with("/") {
+ Some(path.to_string() + "/")
+ } else {
+ Some(path.to_string())
+ }
+ } else {
+ None
+ };
+
+ Ok(Some((p, current_path)))
+ }
+
+ pub async fn hdfs_rename(&self, from: &str, to: &str) -> Result<()> {
+ let from_path = build_rooted_abs_path(&self.root, from);
+ let to_path = build_rooted_abs_path(&self.root, to);
+
+ match self.client.get_file_info(&to_path).await {
+ Ok(status) => {
+ if status.isdir {
+ return Err(Error::new(ErrorKind::IsADirectory, "path
should be a file")
+ .with_context("input", &to_path));
+ } else {
+ self.client
+ .delete(&to_path, true)
+ .await
+ .map_err(parse_hdfs_error)?;
+ }
+ }
+ Err(err) => match &err {
+ HdfsError::FileNotFound(_) => {
+ self.client
+ .create(&to_path,
WriteOptions::default().create_parent(true))
+ .await
+ .map_err(parse_hdfs_error)?;
+ }
+ _ => return Err(parse_hdfs_error(err)),
+ },
+ };
+
+ self.client
+ .rename(&from_path, &to_path, true)
+ .await
+ .map_err(parse_hdfs_error)?;
+
+ Ok(())
+ }
+}
diff --git a/core/src/services/hdfs_native/delete.rs
b/core/src/services/hdfs_native/delete.rs
index 84888b0e8..648f15346 100644
--- a/core/src/services/hdfs_native/delete.rs
+++ b/core/src/services/hdfs_native/delete.rs
@@ -17,31 +17,23 @@
use std::sync::Arc;
-use super::backend::HdfsNativeBackend;
-use super::error::parse_hdfs_error;
+use super::core::HdfsNativeCore;
use crate::raw::*;
use crate::*;
pub struct HdfsNativeDeleter {
- core: Arc<HdfsNativeBackend>,
+ core: Arc<HdfsNativeCore>,
}
impl HdfsNativeDeleter {
- pub fn new(core: Arc<HdfsNativeBackend>) -> Self {
+ pub fn new(core: Arc<HdfsNativeCore>) -> Self {
Self { core }
}
}
impl oio::OneShotDelete for HdfsNativeDeleter {
async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
- let p = build_rooted_abs_path(&self.core.root, &path);
-
- self.core
- .client
- .delete(&p, true)
- .await
- .map_err(parse_hdfs_error)?;
-
+ self.core.hdfs_delete(&path).await?;
Ok(())
}
}
diff --git a/core/src/services/hdfs_native/mod.rs
b/core/src/services/hdfs_native/mod.rs
index 625971426..dfc3fe7b1 100644
--- a/core/src/services/hdfs_native/mod.rs
+++ b/core/src/services/hdfs_native/mod.rs
@@ -17,6 +17,7 @@
/// Default scheme for hdfs_native service.
pub(super) const HDFS_NATIVE_SCHEME: &str = "hdfs_native";
+mod core;
mod delete;
mod error;
mod lister;