This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ef683a  feat: Add file io (#23)
5ef683a is described below

commit 5ef683a0212710363f21afd902a9fbb9b234add0
Author: Xuanwo <[email protected]>
AuthorDate: Tue Jul 30 17:19:06 2024 +0800

    feat: Add file io (#23)
---
 crates/paimon/Cargo.toml                |   3 +-
 crates/paimon/src/error.rs              |  16 ++-
 crates/paimon/src/io/file_io.rs         | 186 ++++++++++++++++++++++++++++++++
 crates/paimon/src/{lib.rs => io/mod.rs} |   4 +-
 crates/paimon/src/lib.rs                |   4 +
 5 files changed, 207 insertions(+), 6 deletions(-)

diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 1e33ce0..0af0b17 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -28,8 +28,9 @@ version.workspace = true
 
 [dependencies]
 bitflags = "2.6.0"
-chrono = {version = "0.4.38", features = ["serde"]}
+chrono = { version = "0.4.38", features = ["serde"] }
 serde = { version = "1", features = ["derive"] }
 serde_with = "3.8.3"
 snafu = "0.8.3"
 typed-builder = "^0.18"
+opendal = "0.48"
diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs
index f615649..6323f3f 100644
--- a/crates/paimon/src/error.rs
+++ b/crates/paimon/src/error.rs
@@ -15,16 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use snafu::Snafu;
+use snafu::prelude::*;
+
+/// Result type used in paimon.
+pub type Result<T, E = Error> = std::result::Result<T, E>;
 
 /// Error type for paimon.
-#[allow(dead_code)]
 #[derive(Debug, Snafu)]
 pub enum Error {
-    #[snafu(display("paimon data invalid for {}: {:?}", message, source))]
+    #[snafu(display("Paimon data invalid for {}: {:?}", message, source))]
     DataInvalid {
         message: String,
         #[snafu(backtrace)]
         source: snafu::Whatever,
     },
+    #[snafu(
+        visibility(pub(crate)),
+        display("Paimon hitting unexpected error {}: {:?}", message, source)
+    )]
+    IoUnexpected {
+        message: String,
+        source: opendal::Error,
+    },
 }
diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
new file mode 100644
index 0000000..5247b4f
--- /dev/null
+++ b/crates/paimon/src/io/file_io.rs
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::*;
+use std::collections::HashMap;
+
+use opendal::services::MemoryConfig;
+use opendal::{Metakey, Operator};
+use snafu::ResultExt;
+
+#[derive(Clone, Debug)]
+pub struct FileIO {
+    op: Operator,
+}
+
+impl FileIO {
+    /// Create a new FileIO.
+    ///
+    /// The input HashMap is paimon-java's 
[`Options`](https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/options/Options.java#L60)
+    ///
+    /// TODO: Support building Operator from HashMap via options.
+    pub fn new(_: HashMap<String, String>) -> Result<Self> {
+        let op = Operator::from_config(MemoryConfig::default())
+            .context(IoUnexpectedSnafu {
+                message: "Failed to create operator".to_string(),
+            })?
+            .finish();
+        Ok(Self { op })
+    }
+
+    /// Create a new input file to read data.
+    ///
+    /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L76>
+    pub fn new_input(&self, path: &str) -> InputFile {
+        InputFile {
+            _op: self.op.clone(),
+            path: path.to_string(),
+        }
+    }
+
+    /// Create a new output file to write data.
+    ///
+    /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L87>
+    pub fn new_output(&self, path: &str) -> OutputFile {
+        OutputFile {
+            _op: self.op.clone(),
+            path: path.to_string(),
+        }
+    }
+
+    /// Return a file status object that represents the path.
+    ///
+    /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L97>
+    pub async fn get_status(&self, path: &str) -> Result<FileStatus> {
+        let meta = self.op.stat(path).await.context(IoUnexpectedSnafu {
+            message: "Failed to get file status".to_string(),
+        })?;
+
+        Ok(FileStatus {
+            size: meta.content_length(),
+        })
+    }
+
+    /// List the statuses of the files/directories in the given path if the 
path is a directory.
+    ///
+    /// References: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L105>
+    ///
+    /// FIXME: how to handle large dir? Better to return a stream instead?
+    pub async fn list_status(&self, path: &str) -> Result<Vec<FileStatus>> {
+        let entries = self
+            .op
+            .list_with(path)
+            .metakey(Metakey::ContentLength)
+            .await
+            .context(IoUnexpectedSnafu {
+                message: "Failed to list file status".to_string(),
+            })?;
+
+        Ok(entries
+            .into_iter()
+            .map(|meta| FileStatus {
+                size: meta.metadata().content_length(),
+            })
+            .collect())
+    }
+
+    /// Check if exists.
+    ///
+    /// References: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L128>
+    pub async fn exists(&self, path: &str) -> Result<bool> {
+        self.op.is_exist(path).await.context(IoUnexpectedSnafu {
+            message: "Failed to check file existence".to_string(),
+        })
+    }
+
+    /// Delete a file.
+    ///
+    /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L139>
+    pub async fn delete_file(&self, path: &str) -> Result<()> {
+        self.op.delete(path).await.context(IoUnexpectedSnafu {
+            message: "Failed to delete file".to_string(),
+        })?;
+
+        Ok(())
+    }
+
+    /// Delete a dir recursively.
+    ///
+    /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L139>
+    pub async fn delete_dir(&self, path: &str) -> Result<()> {
+        self.op.remove_all(path).await.context(IoUnexpectedSnafu {
+            message: "Failed to delete dir".to_string(),
+        })?;
+        Ok(())
+    }
+
+    /// Make the given file and all non-existent parents into directories.
+    ///
+    /// Has the semantics of Unix 'mkdir -p'. Existence of the directory 
hierarchy is not an error.
+    ///
+    /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L150>
+    pub async fn mkdirs(&self, path: &str) -> Result<()> {
+        self.op.create_dir(path).await.context(IoUnexpectedSnafu {
+            message: "Failed to create dir".to_string(),
+        })?;
+        Ok(())
+    }
+
+    /// Renames the file/directory src to dst.
+    ///
+    /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L159>
+    pub async fn rename(&self, src: &str, dst: &str) -> Result<()> {
+        self.op.rename(src, dst).await.context(IoUnexpectedSnafu {
+            message: "Failed to rename file".to_string(),
+        })?;
+        Ok(())
+    }
+}
+
+/// FileStatus represents the status of a file.
+#[derive(Clone, Debug)]
+pub struct FileStatus {
+    pub size: u64,
+}
+
+/// Input file represents a file that can be read from.
+#[derive(Clone, Debug)]
+pub struct InputFile {
+    _op: Operator,
+    path: String,
+}
+
+impl InputFile {
+    /// Get the path of given input file.
+    pub fn path(&self) -> &str {
+        &self.path
+    }
+}
+
+/// Output file represents a file that can be written to.
+#[derive(Clone, Debug)]
+pub struct OutputFile {
+    _op: Operator,
+    path: String,
+}
+
+impl OutputFile {
+    /// Get the path of given output file.
+    pub fn path(&self) -> &str {
+        &self.path
+    }
+}
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/io/mod.rs
similarity index 96%
copy from crates/paimon/src/lib.rs
copy to crates/paimon/src/io/mod.rs
index bf367f0..a9d049b 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/io/mod.rs
@@ -15,5 +15,5 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod error;
-pub mod spec;
+mod file_io;
+pub use file_io::*;
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs
index bf367f0..6e15e0b 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/lib.rs
@@ -16,4 +16,8 @@
 // under the License.
 
 mod error;
+pub use error::Error;
+pub use error::Result;
+
+pub mod io;
 pub mod spec;

Reply via email to