This is an automated email from the ASF dual-hosted git repository.

aitozi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a2ea68  feat(io): Implement base IO Module (#51)
8a2ea68 is described below

commit 8a2ea68ea485ef6f90944fc9de7c5ea371cd22c2
Author: Huanbing <[email protected]>
AuthorDate: Sat Aug 17 19:52:55 2024 +0800

    feat(io): Implement base IO Module (#51)
---
 crates/paimon/Cargo.toml                           |  11 +
 crates/paimon/src/error.rs                         |  20 +
 crates/paimon/src/io/file_io.rs                    | 534 ++++++++++++++++++---
 crates/paimon/src/io/mod.rs                        |  13 +
 crates/paimon/src/io/storage.rs                    |  81 ++++
 crates/paimon/src/io/{mod.rs => storage_fs.rs}     |  14 +-
 crates/paimon/src/io/{mod.rs => storage_memory.rs} |  10 +-
 crates/paimon/src/spec/types.rs                    |   2 +-
 8 files changed, 612 insertions(+), 73 deletions(-)

diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 9e741fa..9e22e5b 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -26,8 +26,19 @@ edition.workspace = true
 license.workspace = true
 version.workspace = true
 
+[features]
+default = ["storage-memory", "storage-fs"]
+storage-all = ["storage-memory", "storage-fs"]
+
+storage-memory = ["opendal/services-memory"]
+storage-fs = ["opendal/services-fs"]
+
 [dependencies]
+url = "2.5.2"
+async-trait = "0.1.81"
+bytes = "1.7.1"
 bitflags = "2.6.0"
+tokio = { version = "1.39.2", features = ["macros"] }
 chrono = { version = "0.4.38", features = ["serde"] }
 serde = { version = "1", features = ["derive"] }
 serde_bytes = "0.11.15"
diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs
index 93404da..f42b465 100644
--- a/crates/paimon/src/error.rs
+++ b/crates/paimon/src/error.rs
@@ -42,4 +42,24 @@ pub enum Error {
         message: String,
         source: opendal::Error,
     },
+    #[snafu(
+        visibility(pub(crate)),
+        display("Paimon hitting unsupported io error {}", message)
+    )]
+    IoUnsupported { message: String },
+    #[snafu(
+        visibility(pub(crate)),
+        display("Paimon hitting invalid config: {}", message)
+    )]
+    ConfigInvalid { message: String },
+}
+
+impl From<opendal::Error> for Error {
+    fn from(source: opendal::Error) -> Self {
+        // TODO: Simple use IoUnexpected for now
+        Error::IoUnexpected {
+            message: "IO operation failed on underlying storage".to_string(),
+            source,
+        }
+    }
 }
diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
index 0d31af7..d9ebc87 100644
--- a/crates/paimon/src/io/file_io.rs
+++ b/crates/paimon/src/io/file_io.rs
@@ -17,59 +17,69 @@
 
 use crate::error::*;
 use std::collections::HashMap;
+use std::ops::Range;
+use std::sync::Arc;
 
-use chrono::offset::Utc;
-use chrono::DateTime;
-use opendal::services::Fs;
-use opendal::{Metakey, Operator};
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use opendal::Operator;
 use snafu::ResultExt;
+use url::Url;
+
+use super::Storage;
 
 #[derive(Clone, Debug)]
 pub struct FileIO {
-    op: Operator,
+    storage: Arc<Storage>,
 }
 
 impl FileIO {
-    /// Create a new FileIO.
+    /// Try to infer file io scheme from path.
     ///
     /// The input HashMap is paimon-java's 
[`Options`](https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/options/Options.java#L60)
-    ///
-    /// TODO: Support building Operator from HashMap via options.
-    pub fn new(_: HashMap<String, String>) -> Result<Self> {
-        let op = Operator::new(Fs::default().root("/"))
-            .context(IoUnexpectedSnafu {
-                message: "Failed to create operator".to_string(),
-            })?
-            .finish();
-        Ok(Self { op })
+    pub fn from_url(path: &str) -> crate::Result<FileIOBuilder> {
+        let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
+            message: format!("Invalid URL: {}", path),
+        })?;
+
+        Ok(FileIOBuilder::new(url.scheme()))
     }
 
     /// Create a new input file to read data.
     ///
     /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L76>
-    pub fn new_input(&self, path: &str) -> InputFile {
-        InputFile {
-            _op: self.op.clone(),
-            path: path.to_string(),
-        }
+    pub fn new_input(&self, path: &str) -> crate::Result<InputFile> {
+        let (op, relative_path) = self.storage.create(path)?;
+        let path = path.to_string();
+        let relative_path_pos = path.len() - relative_path.len();
+        Ok(InputFile {
+            op,
+            path,
+            relative_path_pos,
+        })
     }
 
     /// Create a new output file to write data.
     ///
     /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L87>
-    pub fn new_output(&self, path: &str) -> OutputFile {
-        OutputFile {
-            _op: self.op.clone(),
-            path: path.to_string(),
-        }
+    pub fn new_output(&self, path: &str) -> Result<OutputFile> {
+        let (op, relative_path) = self.storage.create(path)?;
+        let path = path.to_string();
+        let relative_path_pos = path.len() - relative_path.len();
+        Ok(OutputFile {
+            op,
+            path,
+            relative_path_pos,
+        })
     }
 
     /// Return a file status object that represents the path.
     ///
     /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L97>
     pub async fn get_status(&self, path: &str) -> Result<FileStatus> {
-        let meta = self.op.stat(path).await.context(IoUnexpectedSnafu {
-            message: "Failed to get file status".to_string(),
+        let (op, relative_path) = self.storage.create(path)?;
+        let meta = op.stat(relative_path).await.context(IoUnexpectedSnafu {
+            message: format!("Failed to get file status for '{}'", path),
         })?;
 
         Ok(FileStatus {
@@ -86,32 +96,35 @@ impl FileIO {
     ///
     /// FIXME: how to handle large dir? Better to return a stream instead?
     pub async fn list_status(&self, path: &str) -> Result<Vec<FileStatus>> {
-        let entries = self
-            .op
-            .list_with(path)
-            .metakey(Metakey::ContentLength | Metakey::LastModified)
-            .await
-            .context(IoUnexpectedSnafu {
-                message: "Failed to list file status".to_string(),
-            })?;
+        let (op, relative_path) = self.storage.create(path)?;
+
+        let entries = op.list(relative_path).await.context(IoUnexpectedSnafu {
+            message: format!("Failed to list files in '{}'", path),
+        })?;
+
+        let mut statuses = Vec::new();
+
+        for entry in entries {
+            let meta = entry.metadata();
+            statuses.push(FileStatus {
+                size: meta.content_length(),
+                is_dir: meta.is_dir(),
+                path: path.to_string(),
+                last_modified: meta.last_modified(),
+            });
+        }
 
-        Ok(entries
-            .into_iter()
-            .map(|meta| FileStatus {
-                size: meta.metadata().content_length(),
-                is_dir: meta.metadata().is_dir(),
-                last_modified: meta.metadata().last_modified(),
-                path: format!("{}{}", path, meta.name()),
-            })
-            .collect())
+        Ok(statuses)
     }
 
     /// Check if exists.
     ///
     /// References: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L128>
     pub async fn exists(&self, path: &str) -> Result<bool> {
-        self.op.is_exist(path).await.context(IoUnexpectedSnafu {
-            message: "Failed to check file existence".to_string(),
+        let (op, relative_path) = self.storage.create(path)?;
+
+        op.is_exist(relative_path).await.context(IoUnexpectedSnafu {
+            message: format!("Failed to check existence of '{}'", path),
         })
     }
 
@@ -119,8 +132,10 @@ impl FileIO {
     ///
     /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L139>
     pub async fn delete_file(&self, path: &str) -> Result<()> {
-        self.op.delete(path).await.context(IoUnexpectedSnafu {
-            message: "Failed to delete file".to_string(),
+        let (op, relative_path) = self.storage.create(path)?;
+
+        op.delete(relative_path).await.context(IoUnexpectedSnafu {
+            message: format!("Failed to delete file '{}'", path),
         })?;
 
         Ok(())
@@ -130,9 +145,14 @@ impl FileIO {
     ///
     /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L139>
     pub async fn delete_dir(&self, path: &str) -> Result<()> {
-        self.op.remove_all(path).await.context(IoUnexpectedSnafu {
-            message: "Failed to delete dir".to_string(),
-        })?;
+        let (op, relative_path) = self.storage.create(path)?;
+
+        op.remove_all(relative_path)
+            .await
+            .context(IoUnexpectedSnafu {
+                message: format!("Failed to delete directory '{}'", path),
+            })?;
+
         Ok(())
     }
 
@@ -142,9 +162,14 @@ impl FileIO {
     ///
     /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L150>
     pub async fn mkdirs(&self, path: &str) -> Result<()> {
-        self.op.create_dir(path).await.context(IoUnexpectedSnafu {
-            message: "Failed to create dir".to_string(),
-        })?;
+        let (op, relative_path) = self.storage.create(path)?;
+
+        op.create_dir(relative_path)
+            .await
+            .context(IoUnexpectedSnafu {
+                message: format!("Failed to create directory '{}'", path),
+            })?;
+
         Ok(())
     }
 
@@ -152,14 +177,101 @@ impl FileIO {
     ///
     /// Reference: 
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L159>
     pub async fn rename(&self, src: &str, dst: &str) -> Result<()> {
-        self.op.rename(src, dst).await.context(IoUnexpectedSnafu {
-            message: "Failed to rename file".to_string(),
-        })?;
+        let (op_src, relative_path_src) = self.storage.create(src)?;
+        let (_, relative_path_dst) = self.storage.create(dst)?;
+
+        op_src
+            .rename(relative_path_src, relative_path_dst)
+            .await
+            .context(IoUnexpectedSnafu {
+                message: format!("Failed to rename '{}' to '{}'", src, dst),
+            })?;
+
         Ok(())
     }
 }
 
-/// FileStatus represents the status of a file.
+#[derive(Debug)]
+pub struct FileIOBuilder {
+    scheme_str: Option<String>,
+    props: HashMap<String, String>,
+}
+
+impl FileIOBuilder {
+    pub fn new(scheme_str: impl ToString) -> Self {
+        Self {
+            scheme_str: Some(scheme_str.to_string()),
+            props: HashMap::default(),
+        }
+    }
+
+    pub fn new_fs_io_builder() -> Self {
+        Self {
+            scheme_str: None,
+            props: HashMap::default(),
+        }
+    }
+
+    pub(crate) fn into_parts(self) -> (String, HashMap<String, String>) {
+        (self.scheme_str.unwrap_or_default(), self.props)
+    }
+
+    pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> 
Self {
+        self.props.insert(key.to_string(), value.to_string());
+        self
+    }
+
+    pub fn with_props(
+        mut self,
+        args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
+    ) -> Self {
+        self.props
+            .extend(args.into_iter().map(|e| (e.0.to_string(), 
e.1.to_string())));
+        self
+    }
+
+    pub fn build(self) -> crate::Result<FileIO> {
+        let storage = Storage::build(self)?;
+        Ok(FileIO {
+            storage: Arc::new(storage),
+        })
+    }
+}
+
+#[async_trait::async_trait]
+pub trait FileRead: Send + Unpin + 'static {
+    async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
+}
+
+#[async_trait::async_trait]
+impl FileRead for opendal::Reader {
+    async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
+        // TODO: build a error type
+        Ok(opendal::Reader::read(self, range)
+            .await
+            .expect("read error")
+            .to_bytes())
+    }
+}
+
+#[async_trait::async_trait]
+pub trait FileWrite: Send + Unpin + 'static {
+    async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
+
+    async fn close(&mut self) -> crate::Result<()>;
+}
+
+#[async_trait::async_trait]
+impl FileWrite for opendal::Writer {
+    async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
+        Ok(opendal::Writer::write(self, bs).await?)
+    }
+
+    async fn close(&mut self) -> crate::Result<()> {
+        Ok(opendal::Writer::close(self).await?)
+    }
+}
+
 #[derive(Clone, Debug)]
 pub struct FileStatus {
     pub size: u64,
@@ -168,30 +280,316 @@ pub struct FileStatus {
     pub last_modified: Option<DateTime<Utc>>,
 }
 
-/// Input file represents a file that can be read from.
-#[derive(Clone, Debug)]
+#[derive(Debug)]
 pub struct InputFile {
-    _op: Operator,
+    op: Operator,
     path: String,
+    relative_path_pos: usize,
 }
 
 impl InputFile {
-    /// Get the path of given input file.
-    pub fn path(&self) -> &str {
+    pub fn location(&self) -> &str {
         &self.path
     }
+
+    pub async fn exists(&self) -> crate::Result<bool> {
+        Ok(self
+            .op
+            .is_exist(&self.path[self.relative_path_pos..])
+            .await?)
+    }
+
+    pub async fn metadata(&self) -> crate::Result<FileStatus> {
+        let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
+
+        Ok(FileStatus {
+            size: meta.content_length(),
+            is_dir: meta.is_dir(),
+            path: self.path.clone(),
+            last_modified: meta.last_modified(),
+        })
+    }
+
+    pub async fn read(&self) -> crate::Result<Bytes> {
+        Ok(self
+            .op
+            .read(&self.path[self.relative_path_pos..])
+            .await?
+            .to_bytes())
+    }
+
+    pub async fn reader(&self) -> crate::Result<impl FileRead> {
+        Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
+    }
 }
 
-/// Output file represents a file that can be written to.
-#[derive(Clone, Debug)]
+#[derive(Debug)]
 pub struct OutputFile {
-    _op: Operator,
+    op: Operator,
     path: String,
+    relative_path_pos: usize,
 }
 
 impl OutputFile {
-    /// Get the path of given output file.
-    pub fn path(&self) -> &str {
+    pub fn location(&self) -> &str {
         &self.path
     }
+
+    pub async fn exists(&self) -> crate::Result<bool> {
+        Ok(self
+            .op
+            .is_exist(&self.path[self.relative_path_pos..])
+            .await?)
+    }
+
+    pub fn to_input_file(self) -> InputFile {
+        InputFile {
+            op: self.op,
+            path: self.path,
+            relative_path_pos: self.relative_path_pos,
+        }
+    }
+
+    pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
+        let mut writer = self.writer().await?;
+        writer.write(bs).await?;
+        writer.close().await
+    }
+
+    pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
+        Ok(Box::new(
+            self.op.writer(&self.path[self.relative_path_pos..]).await?,
+        ))
+    }
+}
+
+#[cfg(test)]
+mod file_action_test {
+    use std::fs;
+
+    use super::*;
+    use bytes::Bytes;
+
+    fn setup_memory_file_io() -> FileIO {
+        let storage = Storage::Memory;
+        FileIO {
+            storage: Arc::new(storage),
+        }
+    }
+
+    fn setup_fs_file_io() -> FileIO {
+        let storage = Storage::LocalFs;
+        FileIO {
+            storage: Arc::new(storage),
+        }
+    }
+
+    async fn common_test_get_status(file_io: &FileIO, path: &str) {
+        let output = file_io.new_output(path).unwrap();
+        output.write(Bytes::from("hello world")).await.unwrap();
+
+        let status = file_io.get_status(path).await.unwrap();
+        assert_eq!(status.size, 11);
+
+        file_io.delete_file(path).await.unwrap();
+    }
+
+    async fn common_test_exists(file_io: &FileIO, path: &str) {
+        let output = file_io.new_output(path).unwrap();
+        output.write(Bytes::from("hello world")).await.unwrap();
+
+        let exists = file_io.exists(path).await.unwrap();
+        assert!(exists);
+
+        file_io.delete_file(path).await.unwrap();
+    }
+
+    async fn common_test_delete_file(file_io: &FileIO, path: &str) {
+        let output = file_io.new_output(path).unwrap();
+        output.write(Bytes::from("hello world")).await.unwrap();
+
+        file_io.delete_file(path).await.unwrap();
+
+        let exists = file_io.exists(path).await.unwrap();
+        assert!(!exists);
+    }
+
+    async fn common_test_mkdirs(file_io: &FileIO, dir_path: &str) {
+        file_io.mkdirs(dir_path).await.unwrap();
+
+        let exists = file_io.exists(dir_path).await.unwrap();
+        assert!(exists);
+
+        let _ = fs::remove_dir_all(dir_path.strip_prefix("file:/").unwrap());
+    }
+
+    async fn common_test_rename(file_io: &FileIO, src: &str, dst: &str) {
+        let output = file_io.new_output(src).unwrap();
+        output.write(Bytes::from("hello world")).await.unwrap();
+
+        file_io.rename(src, dst).await.unwrap();
+
+        let exists_old = file_io.exists(src).await.unwrap();
+        let exists_new = file_io.exists(dst).await.unwrap();
+        assert!(!exists_old);
+        assert!(exists_new);
+
+        file_io.delete_file(dst).await.unwrap();
+    }
+
+    #[tokio::test]
+    async fn test_delete_file_memory() {
+        let file_io = setup_memory_file_io();
+        common_test_delete_file(&file_io, 
"memory:/test_file_delete_mem").await;
+    }
+
+    #[tokio::test]
+    async fn test_get_status_fs() {
+        let file_io = setup_fs_file_io();
+        common_test_get_status(&file_io, 
"file:/tmp/test_file_get_status_fs").await;
+    }
+
+    #[tokio::test]
+    async fn test_exists_fs() {
+        let file_io = setup_fs_file_io();
+        common_test_exists(&file_io, "file:/tmp/test_file_exists_fs").await;
+    }
+
+    #[tokio::test]
+    async fn test_delete_file_fs() {
+        let file_io = setup_fs_file_io();
+        common_test_delete_file(&file_io, 
"file:/tmp/test_file_delete_fs").await;
+    }
+
+    #[tokio::test]
+    async fn test_mkdirs_fs() {
+        let file_io = setup_fs_file_io();
+        common_test_mkdirs(&file_io, "file:/tmp/test_fs_dir/").await;
+    }
+
+    #[tokio::test]
+    async fn test_rename_fs() {
+        let file_io = setup_fs_file_io();
+        common_test_rename(
+            &file_io,
+            "file:/tmp/test_file_fs_z",
+            "file:/tmp/new_test_file_fs_o",
+        )
+        .await;
+    }
+}
+
+#[cfg(test)]
+mod input_output_test {
+    use super::*;
+    use bytes::Bytes;
+
+    fn setup_memory_file_io() -> FileIO {
+        let storage = Storage::Memory;
+        FileIO {
+            storage: Arc::new(storage),
+        }
+    }
+
+    fn setup_fs_file_io() -> FileIO {
+        let storage = Storage::LocalFs;
+        FileIO {
+            storage: Arc::new(storage),
+        }
+    }
+
+    async fn common_test_output_file_write_and_read(file_io: &FileIO, path: 
&str) {
+        let output = file_io.new_output(path).unwrap();
+        output.write(Bytes::from("hello world")).await.unwrap();
+
+        let input = output.to_input_file();
+        let content = input.read().await.unwrap();
+
+        assert_eq!(&content[..], b"hello world");
+
+        file_io.delete_file(path).await.unwrap();
+    }
+
+    async fn common_test_output_file_exists(file_io: &FileIO, path: &str) {
+        let output = file_io.new_output(path).unwrap();
+        output.write(Bytes::from("hello world")).await.unwrap();
+
+        let exists = output.exists().await.unwrap();
+        assert!(exists);
+
+        file_io.delete_file(path).await.unwrap();
+    }
+
+    async fn common_test_input_file_metadata(file_io: &FileIO, path: &str) {
+        let output = file_io.new_output(path).unwrap();
+        output.write(Bytes::from("hello world")).await.unwrap();
+
+        let input = output.to_input_file();
+        let metadata = input.metadata().await.unwrap();
+
+        assert_eq!(metadata.size, 11);
+
+        file_io.delete_file(path).await.unwrap();
+    }
+
+    async fn common_test_input_file_partial_read(file_io: &FileIO, path: &str) 
{
+        let output = file_io.new_output(path).unwrap();
+        output.write(Bytes::from("hello world")).await.unwrap();
+
+        let input = output.to_input_file();
+        let reader = input.reader().await.unwrap();
+        let partial_content = reader.read(0..5).await.unwrap(); // 读取 "hello"
+
+        assert_eq!(&partial_content[..], b"hello");
+
+        file_io.delete_file(path).await.unwrap();
+    }
+
+    #[tokio::test]
+    async fn test_output_file_write_and_read_memory() {
+        let file_io = setup_memory_file_io();
+        common_test_output_file_write_and_read(&file_io, 
"memory:/test_file_rw_mem").await;
+    }
+
+    #[tokio::test]
+    async fn test_output_file_exists_memory() {
+        let file_io = setup_memory_file_io();
+        common_test_output_file_exists(&file_io, 
"memory:/test_file_exist_mem").await;
+    }
+
+    #[tokio::test]
+    async fn test_input_file_metadata_memory() {
+        let file_io = setup_memory_file_io();
+        common_test_input_file_metadata(&file_io, 
"memory:/test_file_meta_mem").await;
+    }
+
+    #[tokio::test]
+    async fn test_input_file_partial_read_memory() {
+        let file_io = setup_memory_file_io();
+        common_test_input_file_partial_read(&file_io, 
"memory:/test_file_part_read_mem").await;
+    }
+
+    #[tokio::test]
+    async fn test_output_file_write_and_read_fs() {
+        let file_io = setup_fs_file_io();
+        common_test_output_file_write_and_read(&file_io, 
"file:/tmp/test_file_fs_rw").await;
+    }
+
+    #[tokio::test]
+    async fn test_output_file_exists_fs() {
+        let file_io = setup_fs_file_io();
+        common_test_output_file_exists(&file_io, 
"file:/tmp/test_file_exists").await;
+    }
+
+    #[tokio::test]
+    async fn test_input_file_metadata_fs() {
+        let file_io = setup_fs_file_io();
+        common_test_input_file_metadata(&file_io, 
"file:/tmp/test_file_meta").await;
+    }
+
+    #[tokio::test]
+    async fn test_input_file_partial_read_fs() {
+        let file_io = setup_fs_file_io();
+        common_test_input_file_partial_read(&file_io, 
"file:/tmp/test_file_read_fs").await;
+    }
 }
diff --git a/crates/paimon/src/io/mod.rs b/crates/paimon/src/io/mod.rs
index a9d049b..a216946 100644
--- a/crates/paimon/src/io/mod.rs
+++ b/crates/paimon/src/io/mod.rs
@@ -17,3 +17,16 @@
 
 mod file_io;
 pub use file_io::*;
+
+mod storage;
+pub use storage::*;
+
+#[cfg(feature = "storage-fs")]
+mod storage_fs;
+#[cfg(feature = "storage-fs")]
+use storage_fs::*;
+
+#[cfg(feature = "storage-memory")]
+mod storage_memory;
+#[cfg(feature = "storage-memory")]
+use storage_memory::*;
diff --git a/crates/paimon/src/io/storage.rs b/crates/paimon/src/io/storage.rs
new file mode 100644
index 0000000..14c528c
--- /dev/null
+++ b/crates/paimon/src/io/storage.rs
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use opendal::{Operator, Scheme};
+
+use crate::error;
+
+use super::FileIOBuilder;
+
+/// The storage carries all supported storage services in paimon
+#[derive(Debug)]
+pub enum Storage {
+    #[cfg(feature = "storage-memory")]
+    Memory,
+    #[cfg(feature = "storage-fs")]
+    LocalFs,
+}
+
+impl Storage {
+    pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result<Self> 
{
+        let (scheme_str, _) = file_io_builder.into_parts();
+        let scheme = Self::parse_scheme(&scheme_str)?;
+
+        match scheme {
+            #[cfg(feature = "storage-memory")]
+            Scheme::Memory => Ok(Self::Memory),
+            #[cfg(feature = "storage-fs")]
+            Scheme::Fs => Ok(Self::LocalFs),
+            _ => Err(error::Error::IoUnsupported {
+                message: "Unsupported storage feature".to_string(),
+            }),
+        }
+    }
+
+    pub(crate) fn create<'a>(&self, path: &'a str) -> crate::Result<(Operator, 
&'a str)> {
+        match self {
+            #[cfg(feature = "storage-memory")]
+            Storage::Memory => {
+                let op = super::memory_config_build()?;
+
+                if let Some(stripped) = path.strip_prefix("memory:/") {
+                    Ok((op, stripped))
+                } else {
+                    Ok((op, &path[1..]))
+                }
+            }
+            #[cfg(feature = "storage-fs")]
+            Storage::LocalFs => {
+                let op = super::fs_config_build()?;
+
+                if let Some(stripped) = path.strip_prefix("file:/") {
+                    Ok((op, stripped))
+                } else {
+                    Ok((op, &path[1..]))
+                }
+            }
+        }
+    }
+
+    fn parse_scheme(scheme: &str) -> crate::Result<Scheme> {
+        match scheme {
+            "memory" => Ok(Scheme::Memory),
+            "file" | "" => Ok(Scheme::Fs),
+            s => Ok(s.parse::<Scheme>()?),
+        }
+    }
+}
diff --git a/crates/paimon/src/io/mod.rs b/crates/paimon/src/io/storage_fs.rs
similarity index 72%
copy from crates/paimon/src/io/mod.rs
copy to crates/paimon/src/io/storage_fs.rs
index a9d049b..ff38d76 100644
--- a/crates/paimon/src/io/mod.rs
+++ b/crates/paimon/src/io/storage_fs.rs
@@ -15,5 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod file_io;
-pub use file_io::*;
+use opendal::services::FsConfig;
+use opendal::Operator;
+
+use crate::Result;
+
+/// Build new opendal operator from give path.
+pub(crate) fn fs_config_build() -> Result<Operator> {
+    let mut cfg = FsConfig::default();
+    cfg.root = Some("/".to_string());
+
+    Ok(Operator::from_config(cfg)?.finish())
+}
diff --git a/crates/paimon/src/io/mod.rs 
b/crates/paimon/src/io/storage_memory.rs
similarity index 79%
copy from crates/paimon/src/io/mod.rs
copy to crates/paimon/src/io/storage_memory.rs
index a9d049b..ffc082d 100644
--- a/crates/paimon/src/io/mod.rs
+++ b/crates/paimon/src/io/storage_memory.rs
@@ -15,5 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod file_io;
-pub use file_io::*;
+use opendal::services::MemoryConfig;
+use opendal::Operator;
+
+use crate::Result;
+
+pub(crate) fn memory_config_build() -> Result<Operator> {
+    Ok(Operator::from_config(MemoryConfig::default())?.finish())
+}
diff --git a/crates/paimon/src/spec/types.rs b/crates/paimon/src/spec/types.rs
index 65ec59e..b253ea4 100644
--- a/crates/paimon/src/spec/types.rs
+++ b/crates/paimon/src/spec/types.rs
@@ -1175,7 +1175,7 @@ mod serde_utils {
         {
             let s = String::deserialize(deserializer)?;
 
-            let (name, nullable) = s.split_once(" ").unwrap_or((s.as_str(), 
""));
+            let (name, nullable) = s.split_once(' ').unwrap_or((s.as_str(), 
""));
 
             if name == T::NAME && nullable.is_empty() {
                 Ok(NullableType::from(true))

Reply via email to