This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 52562e8d83 feat(services/hdfs): Atomic write for hdfs (#3875)
52562e8d83 is described below
commit 52562e8d8326a80dc6d8fdbfb926bd39a06d8488
Author: Shubham Raizada <[email protected]>
AuthorDate: Wed Jan 3 08:57:57 2024 +0530
feat(services/hdfs): Atomic write for hdfs (#3875)
---
.github/workflows/service_test_hdfs.yml | 37 +++++++++++++
core/src/services/hdfs/backend.rs | 98 +++++++++++++++++++++++++++++----
core/src/services/hdfs/writer.rs | 82 ++++++++++++++++++++++-----
3 files changed, 191 insertions(+), 26 deletions(-)
diff --git a/.github/workflows/service_test_hdfs.yml
b/.github/workflows/service_test_hdfs.yml
index 3533ad0be6..1e1dae9f7e 100644
--- a/.github/workflows/service_test_hdfs.yml
+++ b/.github/workflows/service_test_hdfs.yml
@@ -136,3 +136,40 @@ jobs:
OPENDAL_HDFS_ROOT: /tmp/opendal/
OPENDAL_HDFS_NAME_NODE: hdfs://localhost:8020
OPENDAL_HDFS_ENABLE_APPEND: true
+
+ hdfs-default-with-atomic-write-dir:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Setup Rust toolchain
+ uses: ./.github/actions/setup
+ with:
+ need-nextest: true
+
+ - name: Setup java env
+ uses: actions/setup-java@v4
+ with:
+ distribution: temurin
+ java-version: "11"
+ - name: Setup hadoop env
+ shell: bash
+ run: |
+ curl -LsSf
https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar
zxf - -C /home/runner
+
+ - name: Test
+ shell: bash
+ working-directory: core
+ run: |
+ export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob)
+ export LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${{
env.HADOOP_HOME }}/lib/native
+ cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${{
env.HADOOP_HOME }}/etc/hadoop/hdfs-site.xml
+
+ cargo test behavior --features tests,services-hdfs
+ env:
+ HADOOP_HOME: "/home/runner/hadoop-3.3.5"
+ OPENDAL_TEST: hdfs
+ OPENDAL_HDFS_ROOT: /tmp/opendal/
+ OPENDAL_HDFS_ATOMIC_WRITE_DIR: /tmp/atomic_write_dir/opendal/
+ OPENDAL_HDFS_NAME_NODE: default
+ OPENDAL_HDFS_ENABLE_APPEND: false
diff --git a/core/src/services/hdfs/backend.rs
b/core/src/services/hdfs/backend.rs
index b9255a16d1..cd2ee0b262 100644
--- a/core/src/services/hdfs/backend.rs
+++ b/core/src/services/hdfs/backend.rs
@@ -25,6 +25,7 @@ use async_trait::async_trait;
use futures::AsyncWriteExt;
use log::debug;
use serde::Deserialize;
+use uuid::Uuid;
use super::lister::HdfsLister;
use super::writer::HdfsWriter;
@@ -48,6 +49,8 @@ pub struct HdfsConfig {
pub user: Option<String>,
/// enable the append capacity
pub enable_append: bool,
+ /// atomic_write_dir of this backend
+ pub atomic_write_dir: Option<String>,
}
impl Debug for HdfsConfig {
@@ -133,6 +136,21 @@ impl HdfsBuilder {
self.config.enable_append = enable_append;
self
}
+
+ /// Set temp dir for atomic write.
+ ///
+ /// # Notes
+ ///
+ /// - When append is enabled, we will not use atomic write
+ /// to avoid data loss and performance issue.
+ pub fn atomic_write_dir(&mut self, dir: &str) -> &mut Self {
+ self.config.atomic_write_dir = if dir.is_empty() {
+ None
+ } else {
+ Some(String::from(dir))
+ };
+ self
+ }
}
impl Builder for HdfsBuilder {
@@ -181,19 +199,40 @@ impl Builder for HdfsBuilder {
}
}
+ let atomic_write_dir = self.config.atomic_write_dir.take();
+
+ // If atomic write dir is not exist, we must create it.
+ if let Some(d) = &atomic_write_dir {
+ if let Err(e) = client.metadata(d) {
+ if e.kind() == io::ErrorKind::NotFound {
+ client.create_dir(d).map_err(new_std_io_error)?
+ }
+ }
+ }
+
debug!("backend build finished: {:?}", &self);
Ok(HdfsBackend {
root,
+ atomic_write_dir,
client: Arc::new(client),
enable_append: self.config.enable_append,
})
}
}
+#[inline]
+fn tmp_file_of(path: &str) -> String {
+ let name = get_basename(path);
+ let uuid = Uuid::new_v4().to_string();
+
+ format!("{name}.{uuid}")
+}
+
/// Backend for hdfs services.
#[derive(Debug, Clone)]
pub struct HdfsBackend {
root: String,
+ atomic_write_dir: Option<String>,
client: Arc<hdrs::Client>,
enable_append: bool,
}
@@ -263,15 +302,28 @@ impl Accessor for HdfsBackend {
}
async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- let p = build_rooted_abs_path(&self.root, path);
+ let (target_path, tmp_path) = if let Some(atomic_write_dir) =
&self.atomic_write_dir {
+ let target_path = build_rooted_abs_path(&self.root, path);
+ let tmp_path = build_rooted_abs_path(atomic_write_dir,
&tmp_file_of(path));
+
+ // If the target file exists, we should append to the end of it
directly.
+ if op.append() && self.client.metadata(&target_path).is_ok() {
+ (target_path, None)
+ } else {
+ (target_path, Some(tmp_path))
+ }
+ } else {
+ let p = build_rooted_abs_path(&self.root, path);
+ (p, None)
+ };
- if let Err(err) = self.client.metadata(&p) {
+ if let Err(err) = self.client.metadata(&target_path) {
// Early return if other error happened.
if err.kind() != io::ErrorKind::NotFound {
return Err(new_std_io_error(err));
}
- let parent = get_parent(&p);
+ let parent = get_parent(&target_path);
self.client.create_dir(parent).map_err(new_std_io_error)?;
@@ -280,7 +332,7 @@ impl Accessor for HdfsBackend {
.open_file()
.create(true)
.write(true)
- .async_open(&p)
+ .async_open(&target_path)
.await
.map_err(new_std_io_error)?;
f.close().await.map_err(new_std_io_error)?;
@@ -295,11 +347,14 @@ impl Accessor for HdfsBackend {
}
let f = open_options
- .async_open(&p)
+ .async_open(tmp_path.as_ref().unwrap_or(&target_path))
.await
.map_err(new_std_io_error)?;
- Ok((RpWrite::new(), HdfsWriter::new(f)))
+ Ok((
+ RpWrite::new(),
+ HdfsWriter::new(target_path, tmp_path, f,
Arc::clone(&self.client)),
+ ))
}
async fn rename(&self, from: &str, to: &str, _args: OpRename) ->
Result<RpRename> {
@@ -438,15 +493,29 @@ impl Accessor for HdfsBackend {
}
fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
- let p = build_rooted_abs_path(&self.root, path);
+ let (target_path, tmp_path) = if let Some(atomic_write_dir) =
&self.atomic_write_dir {
+ let target_path = build_rooted_abs_path(&self.root, path);
+ let tmp_path = build_rooted_abs_path(atomic_write_dir,
&tmp_file_of(path));
- if let Err(err) = self.client.metadata(&p) {
+ // If the target file exists, we should append to the end of it
directly.
+ if op.append() && self.client.metadata(&target_path).is_ok() {
+ (target_path, None)
+ } else {
+ (target_path, Some(tmp_path))
+ }
+ } else {
+ let p = build_rooted_abs_path(&self.root, path);
+
+ (p, None)
+ };
+
+ if let Err(err) = self.client.metadata(&target_path) {
// Early return if other error happened.
if err.kind() != io::ErrorKind::NotFound {
return Err(new_std_io_error(err));
}
- let parent = get_parent(&p);
+ let parent = get_parent(&target_path);
self.client.create_dir(parent).map_err(new_std_io_error)?;
@@ -454,7 +523,7 @@ impl Accessor for HdfsBackend {
.open_file()
.create(true)
.write(true)
- .open(&p)
+ .open(&target_path)
.map_err(new_std_io_error)?;
}
@@ -466,9 +535,14 @@ impl Accessor for HdfsBackend {
open_options.write(true);
}
- let f = open_options.open(&p).map_err(new_std_io_error)?;
+ let f = open_options
+ .open(tmp_path.as_ref().unwrap_or(&target_path))
+ .map_err(new_std_io_error)?;
- Ok((RpWrite::new(), HdfsWriter::new(f)))
+ Ok((
+ RpWrite::new(),
+ HdfsWriter::new(target_path, tmp_path, f,
Arc::clone(&self.client)),
+ ))
}
fn blocking_rename(&self, from: &str, to: &str, _args: OpRename) ->
Result<RpRename> {
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 4990df40a9..a2ddd5c002 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -15,56 +15,110 @@
// specific language governing permissions and limitations
// under the License.
+use futures::future::BoxFuture;
use std::io::Write;
use std::pin::Pin;
-use std::task::Context;
+use std::sync::Arc;
use std::task::Poll;
+use std::task::{ready, Context};
use async_trait::async_trait;
-use futures::AsyncWrite;
+use futures::{AsyncWrite, AsyncWriteExt, FutureExt};
use crate::raw::*;
use crate::*;
pub struct HdfsWriter<F> {
- f: F,
+ target_path: String,
+ tmp_path: Option<String>,
+ f: Option<F>,
+ client: Arc<hdrs::Client>,
+ fut: Option<BoxFuture<'static, Result<()>>>,
}
+/// # Safety
+///
+/// We will only take `&mut Self` reference for HdfsWriter.
+unsafe impl<F> Sync for HdfsWriter<F> {}
+
impl<F> HdfsWriter<F> {
- pub fn new(f: F) -> Self {
- Self { f }
+ pub fn new(
+ target_path: String,
+ tmp_path: Option<String>,
+ f: F,
+ client: Arc<hdrs::Client>,
+ ) -> Self {
+ Self {
+ target_path,
+ tmp_path,
+ f: Some(f),
+ client,
+ fut: None,
+ }
}
}
#[async_trait]
impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) ->
Poll<Result<usize>> {
- Pin::new(&mut self.f)
+ let f = self.f.as_mut().expect("HdfsWriter must be initialized");
+
+ Pin::new(f)
.poll_write(cx, bs.chunk())
.map_err(new_std_io_error)
}
+ fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ loop {
+ if let Some(fut) = self.fut.as_mut() {
+ let res = ready!(fut.poll_unpin(cx));
+ self.fut = None;
+ return Poll::Ready(res);
+ }
+
+ let mut f = self.f.take().expect("HdfsWriter must be initialized");
+ let tmp_path = self.tmp_path.clone();
+ let target_path = self.target_path.clone();
+ // Clone client to allow move into the future.
+ let client = self.client.clone();
+
+ self.fut = Some(Box::pin(async move {
+ f.close().await.map_err(new_std_io_error)?;
+
+ if let Some(tmp_path) = tmp_path {
+ client
+ .rename_file(&tmp_path, &target_path)
+ .map_err(new_std_io_error)?;
+ }
+
+ Ok(())
+ }));
+ }
+ }
+
fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Err(Error::new(
ErrorKind::Unsupported,
"HdfsWriter doesn't support abort",
)))
}
-
- fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
- Pin::new(&mut self.f)
- .poll_close(cx)
- .map_err(new_std_io_error)
- }
}
impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
- self.f.write(bs.chunk()).map_err(new_std_io_error)
+ let f = self.f.as_mut().expect("HdfsWriter must be initialized");
+ f.write(bs.chunk()).map_err(new_std_io_error)
}
fn close(&mut self) -> Result<()> {
- self.f.flush().map_err(new_std_io_error)?;
+ let f = self.f.as_mut().expect("HdfsWriter must be initialized");
+ f.flush().map_err(new_std_io_error)?;
+
+ if let Some(tmp_path) = &self.tmp_path {
+ self.client
+ .rename_file(tmp_path, &self.target_path)
+ .map_err(new_std_io_error)?;
+ }
Ok(())
}