This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 5ef683a feat: Add file io (#23)
5ef683a is described below
commit 5ef683a0212710363f21afd902a9fbb9b234add0
Author: Xuanwo <[email protected]>
AuthorDate: Tue Jul 30 17:19:06 2024 +0800
feat: Add file io (#23)
---
crates/paimon/Cargo.toml | 3 +-
crates/paimon/src/error.rs | 16 ++-
crates/paimon/src/io/file_io.rs | 186 ++++++++++++++++++++++++++++++++
crates/paimon/src/{lib.rs => io/mod.rs} | 4 +-
crates/paimon/src/lib.rs | 4 +
5 files changed, 207 insertions(+), 6 deletions(-)
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 1e33ce0..0af0b17 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -28,8 +28,9 @@ version.workspace = true
[dependencies]
bitflags = "2.6.0"
-chrono = {version = "0.4.38", features = ["serde"]}
+chrono = { version = "0.4.38", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
serde_with = "3.8.3"
snafu = "0.8.3"
typed-builder = "^0.18"
+opendal = "0.48"
diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs
index f615649..6323f3f 100644
--- a/crates/paimon/src/error.rs
+++ b/crates/paimon/src/error.rs
@@ -15,16 +15,26 @@
// specific language governing permissions and limitations
// under the License.
-use snafu::Snafu;
+use snafu::prelude::*;
+
+/// Result type used in paimon.
+pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Error type for paimon.
-#[allow(dead_code)]
#[derive(Debug, Snafu)]
pub enum Error {
- #[snafu(display("paimon data invalid for {}: {:?}", message, source))]
+ #[snafu(display("Paimon data invalid for {}: {:?}", message, source))]
DataInvalid {
message: String,
#[snafu(backtrace)]
source: snafu::Whatever,
},
+ #[snafu(
+ visibility(pub(crate)),
+ display("Paimon hitting unexpected error {}: {:?}", message, source)
+ )]
+ IoUnexpected {
+ message: String,
+ source: opendal::Error,
+ },
}
diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
new file mode 100644
index 0000000..5247b4f
--- /dev/null
+++ b/crates/paimon/src/io/file_io.rs
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::*;
+use std::collections::HashMap;
+
+use opendal::services::MemoryConfig;
+use opendal::{Metakey, Operator};
+use snafu::ResultExt;
+
+#[derive(Clone, Debug)]
+pub struct FileIO {
+ op: Operator,
+}
+
+impl FileIO {
+ /// Create a new FileIO.
+ ///
+ /// The input HashMap is paimon-java's
[`Options`](https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/options/Options.java#L60)
+ ///
+ /// TODO: Support building Operator from HashMap via options.
+ pub fn new(_: HashMap<String, String>) -> Result<Self> {
+ let op = Operator::from_config(MemoryConfig::default())
+ .context(IoUnexpectedSnafu {
+ message: "Failed to create operator".to_string(),
+ })?
+ .finish();
+ Ok(Self { op })
+ }
+
+ /// Create a new input file to read data.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L76>
+ pub fn new_input(&self, path: &str) -> InputFile {
+ InputFile {
+ _op: self.op.clone(),
+ path: path.to_string(),
+ }
+ }
+
+ /// Create a new output file to write data.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L87>
+ pub fn new_output(&self, path: &str) -> OutputFile {
+ OutputFile {
+ _op: self.op.clone(),
+ path: path.to_string(),
+ }
+ }
+
+ /// Return a file status object that represents the path.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L97>
+ pub async fn get_status(&self, path: &str) -> Result<FileStatus> {
+ let meta = self.op.stat(path).await.context(IoUnexpectedSnafu {
+ message: "Failed to get file status".to_string(),
+ })?;
+
+ Ok(FileStatus {
+ size: meta.content_length(),
+ })
+ }
+
+ /// List the statuses of the files/directories in the given path if the
path is a directory.
+ ///
+ /// References:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L105>
+ ///
+ /// FIXME: how to handle large dir? Better to return a stream instead?
+ pub async fn list_status(&self, path: &str) -> Result<Vec<FileStatus>> {
+ let entries = self
+ .op
+ .list_with(path)
+ .metakey(Metakey::ContentLength)
+ .await
+ .context(IoUnexpectedSnafu {
+ message: "Failed to list file status".to_string(),
+ })?;
+
+ Ok(entries
+ .into_iter()
+ .map(|meta| FileStatus {
+ size: meta.metadata().content_length(),
+ })
+ .collect())
+ }
+
+ /// Check if exists.
+ ///
+ /// References:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L128>
+ pub async fn exists(&self, path: &str) -> Result<bool> {
+ self.op.is_exist(path).await.context(IoUnexpectedSnafu {
+ message: "Failed to check file existence".to_string(),
+ })
+ }
+
+ /// Delete a file.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L139>
+ pub async fn delete_file(&self, path: &str) -> Result<()> {
+ self.op.delete(path).await.context(IoUnexpectedSnafu {
+ message: "Failed to delete file".to_string(),
+ })?;
+
+ Ok(())
+ }
+
+ /// Delete a dir recursively.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L139>
+ pub async fn delete_dir(&self, path: &str) -> Result<()> {
+ self.op.remove_all(path).await.context(IoUnexpectedSnafu {
+ message: "Failed to delete dir".to_string(),
+ })?;
+ Ok(())
+ }
+
+ /// Make the given file and all non-existent parents into directories.
+ ///
+ /// Has the semantics of Unix 'mkdir -p'. Existence of the directory
hierarchy is not an error.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L150>
+ pub async fn mkdirs(&self, path: &str) -> Result<()> {
+ self.op.create_dir(path).await.context(IoUnexpectedSnafu {
+ message: "Failed to create dir".to_string(),
+ })?;
+ Ok(())
+ }
+
+ /// Renames the file/directory src to dst.
+ ///
+ /// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L159>
+ pub async fn rename(&self, src: &str, dst: &str) -> Result<()> {
+ self.op.rename(src, dst).await.context(IoUnexpectedSnafu {
+ message: "Failed to rename file".to_string(),
+ })?;
+ Ok(())
+ }
+}
+
+/// FileStatus represents the status of a file.
+#[derive(Clone, Debug)]
+pub struct FileStatus {
+ pub size: u64,
+}
+
+/// Input file represents a file that can be read from.
+#[derive(Clone, Debug)]
+pub struct InputFile {
+ _op: Operator,
+ path: String,
+}
+
+impl InputFile {
+ /// Get the path of given input file.
+ pub fn path(&self) -> &str {
+ &self.path
+ }
+}
+
+/// Output file represents a file that can be written to.
+#[derive(Clone, Debug)]
+pub struct OutputFile {
+ _op: Operator,
+ path: String,
+}
+
+impl OutputFile {
+ /// Get the path of given output file.
+ pub fn path(&self) -> &str {
+ &self.path
+ }
+}
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/io/mod.rs
similarity index 96%
copy from crates/paimon/src/lib.rs
copy to crates/paimon/src/io/mod.rs
index bf367f0..a9d049b 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/io/mod.rs
@@ -15,5 +15,5 @@
// specific language governing permissions and limitations
// under the License.
-mod error;
-pub mod spec;
+mod file_io;
+pub use file_io::*;
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs
index bf367f0..6e15e0b 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/lib.rs
@@ -16,4 +16,8 @@
// under the License.
mod error;
+pub use error::Error;
+pub use error::Result;
+
+pub mod io;
pub mod spec;