This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 52562e8d83 feat(services/hdfs): Atomic write for hdfs (#3875)
52562e8d83 is described below

commit 52562e8d8326a80dc6d8fdbfb926bd39a06d8488
Author: Shubham Raizada <[email protected]>
AuthorDate: Wed Jan 3 08:57:57 2024 +0530

    feat(services/hdfs): Atomic write for hdfs (#3875)
---
 .github/workflows/service_test_hdfs.yml | 37 +++++++++++++
 core/src/services/hdfs/backend.rs       | 98 +++++++++++++++++++++++++++++----
 core/src/services/hdfs/writer.rs        | 82 ++++++++++++++++++++++-----
 3 files changed, 191 insertions(+), 26 deletions(-)

diff --git a/.github/workflows/service_test_hdfs.yml 
b/.github/workflows/service_test_hdfs.yml
index 3533ad0be6..1e1dae9f7e 100644
--- a/.github/workflows/service_test_hdfs.yml
+++ b/.github/workflows/service_test_hdfs.yml
@@ -136,3 +136,40 @@ jobs:
           OPENDAL_HDFS_ROOT: /tmp/opendal/
           OPENDAL_HDFS_NAME_NODE: hdfs://localhost:8020
           OPENDAL_HDFS_ENABLE_APPEND: true
+
+  hdfs-default-with-atomic-write-dir:
+    runs-on: ubuntu-latest
+    steps:
+      - uses: actions/checkout@v4
+
+      - name: Setup Rust toolchain
+        uses: ./.github/actions/setup
+        with:
+          need-nextest: true
+
+      - name: Setup java env
+        uses: actions/setup-java@v4
+        with:
+          distribution: temurin
+          java-version: "11"
+      - name: Setup hadoop env
+        shell: bash
+        run: |
+          curl -LsSf 
https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar 
zxf - -C /home/runner
+
+      - name: Test
+        shell: bash
+        working-directory: core
+        run: |
+          export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob)
+          export LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${{ 
env.HADOOP_HOME }}/lib/native
+          cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${{ 
env.HADOOP_HOME }}/etc/hadoop/hdfs-site.xml
+
+          cargo test behavior --features tests,services-hdfs
+        env:
+          HADOOP_HOME: "/home/runner/hadoop-3.3.5"
+          OPENDAL_TEST: hdfs
+          OPENDAL_HDFS_ROOT: /tmp/opendal/
+          OPENDAL_HDFS_ATOMIC_WRITE_DIR: /tmp/atomic_write_dir/opendal/
+          OPENDAL_HDFS_NAME_NODE: default
+          OPENDAL_HDFS_ENABLE_APPEND: false
diff --git a/core/src/services/hdfs/backend.rs 
b/core/src/services/hdfs/backend.rs
index b9255a16d1..cd2ee0b262 100644
--- a/core/src/services/hdfs/backend.rs
+++ b/core/src/services/hdfs/backend.rs
@@ -25,6 +25,7 @@ use async_trait::async_trait;
 use futures::AsyncWriteExt;
 use log::debug;
 use serde::Deserialize;
+use uuid::Uuid;
 
 use super::lister::HdfsLister;
 use super::writer::HdfsWriter;
@@ -48,6 +49,8 @@ pub struct HdfsConfig {
     pub user: Option<String>,
     /// enable the append capacity
     pub enable_append: bool,
+    /// atomic_write_dir of this backend
+    pub atomic_write_dir: Option<String>,
 }
 
 impl Debug for HdfsConfig {
@@ -133,6 +136,21 @@ impl HdfsBuilder {
         self.config.enable_append = enable_append;
         self
     }
+
+    /// Set temp dir for atomic write.
+    ///
+    /// # Notes
+    ///
+    /// - When append is enabled, we will not use atomic write
+    /// to avoid data loss and performance issue.
+    pub fn atomic_write_dir(&mut self, dir: &str) -> &mut Self {
+        self.config.atomic_write_dir = if dir.is_empty() {
+            None
+        } else {
+            Some(String::from(dir))
+        };
+        self
+    }
 }
 
 impl Builder for HdfsBuilder {
@@ -181,19 +199,40 @@ impl Builder for HdfsBuilder {
             }
         }
 
+        let atomic_write_dir = self.config.atomic_write_dir.take();
+
+        // If atomic write dir is not exist, we must create it.
+        if let Some(d) = &atomic_write_dir {
+            if let Err(e) = client.metadata(d) {
+                if e.kind() == io::ErrorKind::NotFound {
+                    client.create_dir(d).map_err(new_std_io_error)?
+                }
+            }
+        }
+
         debug!("backend build finished: {:?}", &self);
         Ok(HdfsBackend {
             root,
+            atomic_write_dir,
             client: Arc::new(client),
             enable_append: self.config.enable_append,
         })
     }
 }
 
+#[inline]
+fn tmp_file_of(path: &str) -> String {
+    let name = get_basename(path);
+    let uuid = Uuid::new_v4().to_string();
+
+    format!("{name}.{uuid}")
+}
+
 /// Backend for hdfs services.
 #[derive(Debug, Clone)]
 pub struct HdfsBackend {
     root: String,
+    atomic_write_dir: Option<String>,
     client: Arc<hdrs::Client>,
     enable_append: bool,
 }
@@ -263,15 +302,28 @@ impl Accessor for HdfsBackend {
     }
 
     async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let p = build_rooted_abs_path(&self.root, path);
+        let (target_path, tmp_path) = if let Some(atomic_write_dir) = 
&self.atomic_write_dir {
+            let target_path = build_rooted_abs_path(&self.root, path);
+            let tmp_path = build_rooted_abs_path(atomic_write_dir, 
&tmp_file_of(path));
+
+            // If the target file exists, we should append to the end of it 
directly.
+            if op.append() && self.client.metadata(&target_path).is_ok() {
+                (target_path, None)
+            } else {
+                (target_path, Some(tmp_path))
+            }
+        } else {
+            let p = build_rooted_abs_path(&self.root, path);
+            (p, None)
+        };
 
-        if let Err(err) = self.client.metadata(&p) {
+        if let Err(err) = self.client.metadata(&target_path) {
             // Early return if other error happened.
             if err.kind() != io::ErrorKind::NotFound {
                 return Err(new_std_io_error(err));
             }
 
-            let parent = get_parent(&p);
+            let parent = get_parent(&target_path);
 
             self.client.create_dir(parent).map_err(new_std_io_error)?;
 
@@ -280,7 +332,7 @@ impl Accessor for HdfsBackend {
                 .open_file()
                 .create(true)
                 .write(true)
-                .async_open(&p)
+                .async_open(&target_path)
                 .await
                 .map_err(new_std_io_error)?;
             f.close().await.map_err(new_std_io_error)?;
@@ -295,11 +347,14 @@ impl Accessor for HdfsBackend {
         }
 
         let f = open_options
-            .async_open(&p)
+            .async_open(tmp_path.as_ref().unwrap_or(&target_path))
             .await
             .map_err(new_std_io_error)?;
 
-        Ok((RpWrite::new(), HdfsWriter::new(f)))
+        Ok((
+            RpWrite::new(),
+            HdfsWriter::new(target_path, tmp_path, f, 
Arc::clone(&self.client)),
+        ))
     }
 
     async fn rename(&self, from: &str, to: &str, _args: OpRename) -> 
Result<RpRename> {
@@ -438,15 +493,29 @@ impl Accessor for HdfsBackend {
     }
 
     fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
-        let p = build_rooted_abs_path(&self.root, path);
+        let (target_path, tmp_path) = if let Some(atomic_write_dir) = 
&self.atomic_write_dir {
+            let target_path = build_rooted_abs_path(&self.root, path);
+            let tmp_path = build_rooted_abs_path(atomic_write_dir, 
&tmp_file_of(path));
 
-        if let Err(err) = self.client.metadata(&p) {
+            // If the target file exists, we should append to the end of it 
directly.
+            if op.append() && self.client.metadata(&target_path).is_ok() {
+                (target_path, None)
+            } else {
+                (target_path, Some(tmp_path))
+            }
+        } else {
+            let p = build_rooted_abs_path(&self.root, path);
+
+            (p, None)
+        };
+
+        if let Err(err) = self.client.metadata(&target_path) {
             // Early return if other error happened.
             if err.kind() != io::ErrorKind::NotFound {
                 return Err(new_std_io_error(err));
             }
 
-            let parent = get_parent(&p);
+            let parent = get_parent(&target_path);
 
             self.client.create_dir(parent).map_err(new_std_io_error)?;
 
@@ -454,7 +523,7 @@ impl Accessor for HdfsBackend {
                 .open_file()
                 .create(true)
                 .write(true)
-                .open(&p)
+                .open(&target_path)
                 .map_err(new_std_io_error)?;
         }
 
@@ -466,9 +535,14 @@ impl Accessor for HdfsBackend {
             open_options.write(true);
         }
 
-        let f = open_options.open(&p).map_err(new_std_io_error)?;
+        let f = open_options
+            .open(tmp_path.as_ref().unwrap_or(&target_path))
+            .map_err(new_std_io_error)?;
 
-        Ok((RpWrite::new(), HdfsWriter::new(f)))
+        Ok((
+            RpWrite::new(),
+            HdfsWriter::new(target_path, tmp_path, f, 
Arc::clone(&self.client)),
+        ))
     }
 
     fn blocking_rename(&self, from: &str, to: &str, _args: OpRename) -> 
Result<RpRename> {
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 4990df40a9..a2ddd5c002 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -15,56 +15,110 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use futures::future::BoxFuture;
 use std::io::Write;
 use std::pin::Pin;
-use std::task::Context;
+use std::sync::Arc;
 use std::task::Poll;
+use std::task::{ready, Context};
 
 use async_trait::async_trait;
-use futures::AsyncWrite;
+use futures::{AsyncWrite, AsyncWriteExt, FutureExt};
 
 use crate::raw::*;
 use crate::*;
 
 pub struct HdfsWriter<F> {
-    f: F,
+    target_path: String,
+    tmp_path: Option<String>,
+    f: Option<F>,
+    client: Arc<hdrs::Client>,
+    fut: Option<BoxFuture<'static, Result<()>>>,
 }
 
+/// # Safety
+///
+/// We will only take `&mut Self` reference for HdfsWriter.
+unsafe impl<F> Sync for HdfsWriter<F> {}
+
 impl<F> HdfsWriter<F> {
-    pub fn new(f: F) -> Self {
-        Self { f }
+    pub fn new(
+        target_path: String,
+        tmp_path: Option<String>,
+        f: F,
+        client: Arc<hdrs::Client>,
+    ) -> Self {
+        Self {
+            target_path,
+            tmp_path,
+            f: Some(f),
+            client,
+            fut: None,
+        }
     }
 }
 
 #[async_trait]
 impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        Pin::new(&mut self.f)
+        let f = self.f.as_mut().expect("HdfsWriter must be initialized");
+
+        Pin::new(f)
             .poll_write(cx, bs.chunk())
             .map_err(new_std_io_error)
     }
 
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        loop {
+            if let Some(fut) = self.fut.as_mut() {
+                let res = ready!(fut.poll_unpin(cx));
+                self.fut = None;
+                return Poll::Ready(res);
+            }
+
+            let mut f = self.f.take().expect("HdfsWriter must be initialized");
+            let tmp_path = self.tmp_path.clone();
+            let target_path = self.target_path.clone();
+            // Clone client to allow move into the future.
+            let client = self.client.clone();
+
+            self.fut = Some(Box::pin(async move {
+                f.close().await.map_err(new_std_io_error)?;
+
+                if let Some(tmp_path) = tmp_path {
+                    client
+                        .rename_file(&tmp_path, &target_path)
+                        .map_err(new_std_io_error)?;
+                }
+
+                Ok(())
+            }));
+        }
+    }
+
     fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
         Poll::Ready(Err(Error::new(
             ErrorKind::Unsupported,
             "HdfsWriter doesn't support abort",
         )))
     }
-
-    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Pin::new(&mut self.f)
-            .poll_close(cx)
-            .map_err(new_std_io_error)
-    }
 }
 
 impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
     fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
-        self.f.write(bs.chunk()).map_err(new_std_io_error)
+        let f = self.f.as_mut().expect("HdfsWriter must be initialized");
+        f.write(bs.chunk()).map_err(new_std_io_error)
     }
 
     fn close(&mut self) -> Result<()> {
-        self.f.flush().map_err(new_std_io_error)?;
+        let f = self.f.as_mut().expect("HdfsWriter must be initialized");
+        f.flush().map_err(new_std_io_error)?;
+
+        if let Some(tmp_path) = &self.tmp_path {
+            self.client
+                .rename_file(tmp_path, &self.target_path)
+                .map_err(new_std_io_error)?;
+        }
 
         Ok(())
     }

Reply via email to