This is an automated email from the ASF dual-hosted git repository.
aitozi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 8a2ea68 feat(io): Implement base IO Module (#51)
8a2ea68 is described below
commit 8a2ea68ea485ef6f90944fc9de7c5ea371cd22c2
Author: Huanbing <[email protected]>
AuthorDate: Sat Aug 17 19:52:55 2024 +0800
feat(io): Implement base IO Module (#51)
---
crates/paimon/Cargo.toml | 11 +
crates/paimon/src/error.rs | 20 +
crates/paimon/src/io/file_io.rs | 534 ++++++++++++++++++---
crates/paimon/src/io/mod.rs | 13 +
crates/paimon/src/io/storage.rs | 81 ++++
crates/paimon/src/io/{mod.rs => storage_fs.rs} | 14 +-
crates/paimon/src/io/{mod.rs => storage_memory.rs} | 10 +-
crates/paimon/src/spec/types.rs | 2 +-
8 files changed, 612 insertions(+), 73 deletions(-)
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index 9e741fa..9e22e5b 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -26,8 +26,19 @@ edition.workspace = true
license.workspace = true
version.workspace = true
+[features]
+default = ["storage-memory", "storage-fs"]
+storage-all = ["storage-memory", "storage-fs"]
+
+storage-memory = ["opendal/services-memory"]
+storage-fs = ["opendal/services-fs"]
+
[dependencies]
+url = "2.5.2"
+async-trait = "0.1.81"
+bytes = "1.7.1"
bitflags = "2.6.0"
+tokio = { version = "1.39.2", features = ["macros"] }
chrono = { version = "0.4.38", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
serde_bytes = "0.11.15"
diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs
index 93404da..f42b465 100644
--- a/crates/paimon/src/error.rs
+++ b/crates/paimon/src/error.rs
@@ -42,4 +42,24 @@ pub enum Error {
message: String,
source: opendal::Error,
},
+ #[snafu(
+ visibility(pub(crate)),
+ display("Paimon hitting unsupported io error {}", message)
+ )]
+ IoUnsupported { message: String },
+ #[snafu(
+ visibility(pub(crate)),
+ display("Paimon hitting invalid config: {}", message)
+ )]
+ ConfigInvalid { message: String },
+}
+
+impl From<opendal::Error> for Error {
+ fn from(source: opendal::Error) -> Self {
+ // TODO: Simple use IoUnexpected for now
+ Error::IoUnexpected {
+ message: "IO operation failed on underlying storage".to_string(),
+ source,
+ }
+ }
}
diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
index 0d31af7..d9ebc87 100644
--- a/crates/paimon/src/io/file_io.rs
+++ b/crates/paimon/src/io/file_io.rs
@@ -17,59 +17,69 @@
use crate::error::*;
use std::collections::HashMap;
+use std::ops::Range;
+use std::sync::Arc;
-use chrono::offset::Utc;
-use chrono::DateTime;
-use opendal::services::Fs;
-use opendal::{Metakey, Operator};
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use opendal::Operator;
use snafu::ResultExt;
+use url::Url;
+
+use super::Storage;
#[derive(Clone, Debug)]
pub struct FileIO {
- op: Operator,
+ storage: Arc<Storage>,
}
impl FileIO {
- /// Create a new FileIO.
+ /// Try to infer file io scheme from path.
///
/// The input HashMap is paimon-java's
[`Options`](https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/options/Options.java#L60)
- ///
- /// TODO: Support building Operator from HashMap via options.
- pub fn new(_: HashMap<String, String>) -> Result<Self> {
- let op = Operator::new(Fs::default().root("/"))
- .context(IoUnexpectedSnafu {
- message: "Failed to create operator".to_string(),
- })?
- .finish();
- Ok(Self { op })
+ pub fn from_url(path: &str) -> crate::Result<FileIOBuilder> {
+ let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
+ message: format!("Invalid URL: {}", path),
+ })?;
+
+ Ok(FileIOBuilder::new(url.scheme()))
}
/// Create a new input file to read data.
///
/// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L76>
- pub fn new_input(&self, path: &str) -> InputFile {
- InputFile {
- _op: self.op.clone(),
- path: path.to_string(),
- }
+ pub fn new_input(&self, path: &str) -> crate::Result<InputFile> {
+ let (op, relative_path) = self.storage.create(path)?;
+ let path = path.to_string();
+ let relative_path_pos = path.len() - relative_path.len();
+ Ok(InputFile {
+ op,
+ path,
+ relative_path_pos,
+ })
}
/// Create a new output file to write data.
///
/// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L87>
- pub fn new_output(&self, path: &str) -> OutputFile {
- OutputFile {
- _op: self.op.clone(),
- path: path.to_string(),
- }
+ pub fn new_output(&self, path: &str) -> Result<OutputFile> {
+ let (op, relative_path) = self.storage.create(path)?;
+ let path = path.to_string();
+ let relative_path_pos = path.len() - relative_path.len();
+ Ok(OutputFile {
+ op,
+ path,
+ relative_path_pos,
+ })
}
/// Return a file status object that represents the path.
///
/// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L97>
pub async fn get_status(&self, path: &str) -> Result<FileStatus> {
- let meta = self.op.stat(path).await.context(IoUnexpectedSnafu {
- message: "Failed to get file status".to_string(),
+ let (op, relative_path) = self.storage.create(path)?;
+ let meta = op.stat(relative_path).await.context(IoUnexpectedSnafu {
+ message: format!("Failed to get file status for '{}'", path),
})?;
Ok(FileStatus {
@@ -86,32 +96,35 @@ impl FileIO {
///
/// FIXME: how to handle large dir? Better to return a stream instead?
pub async fn list_status(&self, path: &str) -> Result<Vec<FileStatus>> {
- let entries = self
- .op
- .list_with(path)
- .metakey(Metakey::ContentLength | Metakey::LastModified)
- .await
- .context(IoUnexpectedSnafu {
- message: "Failed to list file status".to_string(),
- })?;
+ let (op, relative_path) = self.storage.create(path)?;
+
+ let entries = op.list(relative_path).await.context(IoUnexpectedSnafu {
+ message: format!("Failed to list files in '{}'", path),
+ })?;
+
+ let mut statuses = Vec::new();
+
+ for entry in entries {
+ let meta = entry.metadata();
+ statuses.push(FileStatus {
+ size: meta.content_length(),
+ is_dir: meta.is_dir(),
+ path: path.to_string(),
+ last_modified: meta.last_modified(),
+ });
+ }
- Ok(entries
- .into_iter()
- .map(|meta| FileStatus {
- size: meta.metadata().content_length(),
- is_dir: meta.metadata().is_dir(),
- last_modified: meta.metadata().last_modified(),
- path: format!("{}{}", path, meta.name()),
- })
- .collect())
+ Ok(statuses)
}
/// Check if exists.
///
/// References:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L128>
pub async fn exists(&self, path: &str) -> Result<bool> {
- self.op.is_exist(path).await.context(IoUnexpectedSnafu {
- message: "Failed to check file existence".to_string(),
+ let (op, relative_path) = self.storage.create(path)?;
+
+ op.is_exist(relative_path).await.context(IoUnexpectedSnafu {
+ message: format!("Failed to check existence of '{}'", path),
})
}
@@ -119,8 +132,10 @@ impl FileIO {
///
/// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L139>
pub async fn delete_file(&self, path: &str) -> Result<()> {
- self.op.delete(path).await.context(IoUnexpectedSnafu {
- message: "Failed to delete file".to_string(),
+ let (op, relative_path) = self.storage.create(path)?;
+
+ op.delete(relative_path).await.context(IoUnexpectedSnafu {
+ message: format!("Failed to delete file '{}'", path),
})?;
Ok(())
@@ -130,9 +145,14 @@ impl FileIO {
///
/// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L139>
pub async fn delete_dir(&self, path: &str) -> Result<()> {
- self.op.remove_all(path).await.context(IoUnexpectedSnafu {
- message: "Failed to delete dir".to_string(),
- })?;
+ let (op, relative_path) = self.storage.create(path)?;
+
+ op.remove_all(relative_path)
+ .await
+ .context(IoUnexpectedSnafu {
+ message: format!("Failed to delete directory '{}'", path),
+ })?;
+
Ok(())
}
@@ -142,9 +162,14 @@ impl FileIO {
///
/// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L150>
pub async fn mkdirs(&self, path: &str) -> Result<()> {
- self.op.create_dir(path).await.context(IoUnexpectedSnafu {
- message: "Failed to create dir".to_string(),
- })?;
+ let (op, relative_path) = self.storage.create(path)?;
+
+ op.create_dir(relative_path)
+ .await
+ .context(IoUnexpectedSnafu {
+ message: format!("Failed to create directory '{}'", path),
+ })?;
+
Ok(())
}
@@ -152,14 +177,101 @@ impl FileIO {
///
/// Reference:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L159>
pub async fn rename(&self, src: &str, dst: &str) -> Result<()> {
- self.op.rename(src, dst).await.context(IoUnexpectedSnafu {
- message: "Failed to rename file".to_string(),
- })?;
+ let (op_src, relative_path_src) = self.storage.create(src)?;
+ let (_, relative_path_dst) = self.storage.create(dst)?;
+
+ op_src
+ .rename(relative_path_src, relative_path_dst)
+ .await
+ .context(IoUnexpectedSnafu {
+ message: format!("Failed to rename '{}' to '{}'", src, dst),
+ })?;
+
Ok(())
}
}
-/// FileStatus represents the status of a file.
+#[derive(Debug)]
+pub struct FileIOBuilder {
+ scheme_str: Option<String>,
+ props: HashMap<String, String>,
+}
+
+impl FileIOBuilder {
+ pub fn new(scheme_str: impl ToString) -> Self {
+ Self {
+ scheme_str: Some(scheme_str.to_string()),
+ props: HashMap::default(),
+ }
+ }
+
+ pub fn new_fs_io_builder() -> Self {
+ Self {
+ scheme_str: None,
+ props: HashMap::default(),
+ }
+ }
+
+ pub(crate) fn into_parts(self) -> (String, HashMap<String, String>) {
+ (self.scheme_str.unwrap_or_default(), self.props)
+ }
+
+ pub fn with_prop(mut self, key: impl ToString, value: impl ToString) ->
Self {
+ self.props.insert(key.to_string(), value.to_string());
+ self
+ }
+
+ pub fn with_props(
+ mut self,
+ args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
+ ) -> Self {
+ self.props
+ .extend(args.into_iter().map(|e| (e.0.to_string(),
e.1.to_string())));
+ self
+ }
+
+ pub fn build(self) -> crate::Result<FileIO> {
+ let storage = Storage::build(self)?;
+ Ok(FileIO {
+ storage: Arc::new(storage),
+ })
+ }
+}
+
+#[async_trait::async_trait]
+pub trait FileRead: Send + Unpin + 'static {
+ async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
+}
+
+#[async_trait::async_trait]
+impl FileRead for opendal::Reader {
+ async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
+ // TODO: build a error type
+ Ok(opendal::Reader::read(self, range)
+ .await
+ .expect("read error")
+ .to_bytes())
+ }
+}
+
+#[async_trait::async_trait]
+pub trait FileWrite: Send + Unpin + 'static {
+ async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
+
+ async fn close(&mut self) -> crate::Result<()>;
+}
+
+#[async_trait::async_trait]
+impl FileWrite for opendal::Writer {
+ async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
+ Ok(opendal::Writer::write(self, bs).await?)
+ }
+
+ async fn close(&mut self) -> crate::Result<()> {
+ Ok(opendal::Writer::close(self).await?)
+ }
+}
+
#[derive(Clone, Debug)]
pub struct FileStatus {
pub size: u64,
@@ -168,30 +280,316 @@ pub struct FileStatus {
pub last_modified: Option<DateTime<Utc>>,
}
-/// Input file represents a file that can be read from.
-#[derive(Clone, Debug)]
+#[derive(Debug)]
pub struct InputFile {
- _op: Operator,
+ op: Operator,
path: String,
+ relative_path_pos: usize,
}
impl InputFile {
- /// Get the path of given input file.
- pub fn path(&self) -> &str {
+ pub fn location(&self) -> &str {
&self.path
}
+
+ pub async fn exists(&self) -> crate::Result<bool> {
+ Ok(self
+ .op
+ .is_exist(&self.path[self.relative_path_pos..])
+ .await?)
+ }
+
+ pub async fn metadata(&self) -> crate::Result<FileStatus> {
+ let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
+
+ Ok(FileStatus {
+ size: meta.content_length(),
+ is_dir: meta.is_dir(),
+ path: self.path.clone(),
+ last_modified: meta.last_modified(),
+ })
+ }
+
+ pub async fn read(&self) -> crate::Result<Bytes> {
+ Ok(self
+ .op
+ .read(&self.path[self.relative_path_pos..])
+ .await?
+ .to_bytes())
+ }
+
+ pub async fn reader(&self) -> crate::Result<impl FileRead> {
+ Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
+ }
}
-/// Output file represents a file that can be written to.
-#[derive(Clone, Debug)]
+#[derive(Debug)]
pub struct OutputFile {
- _op: Operator,
+ op: Operator,
path: String,
+ relative_path_pos: usize,
}
impl OutputFile {
- /// Get the path of given output file.
- pub fn path(&self) -> &str {
+ pub fn location(&self) -> &str {
&self.path
}
+
+ pub async fn exists(&self) -> crate::Result<bool> {
+ Ok(self
+ .op
+ .is_exist(&self.path[self.relative_path_pos..])
+ .await?)
+ }
+
+ pub fn to_input_file(self) -> InputFile {
+ InputFile {
+ op: self.op,
+ path: self.path,
+ relative_path_pos: self.relative_path_pos,
+ }
+ }
+
+ pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
+ let mut writer = self.writer().await?;
+ writer.write(bs).await?;
+ writer.close().await
+ }
+
+ pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
+ Ok(Box::new(
+ self.op.writer(&self.path[self.relative_path_pos..]).await?,
+ ))
+ }
+}
+
+#[cfg(test)]
+mod file_action_test {
+ use std::fs;
+
+ use super::*;
+ use bytes::Bytes;
+
+ fn setup_memory_file_io() -> FileIO {
+ let storage = Storage::Memory;
+ FileIO {
+ storage: Arc::new(storage),
+ }
+ }
+
+ fn setup_fs_file_io() -> FileIO {
+ let storage = Storage::LocalFs;
+ FileIO {
+ storage: Arc::new(storage),
+ }
+ }
+
+ async fn common_test_get_status(file_io: &FileIO, path: &str) {
+ let output = file_io.new_output(path).unwrap();
+ output.write(Bytes::from("hello world")).await.unwrap();
+
+ let status = file_io.get_status(path).await.unwrap();
+ assert_eq!(status.size, 11);
+
+ file_io.delete_file(path).await.unwrap();
+ }
+
+ async fn common_test_exists(file_io: &FileIO, path: &str) {
+ let output = file_io.new_output(path).unwrap();
+ output.write(Bytes::from("hello world")).await.unwrap();
+
+ let exists = file_io.exists(path).await.unwrap();
+ assert!(exists);
+
+ file_io.delete_file(path).await.unwrap();
+ }
+
+ async fn common_test_delete_file(file_io: &FileIO, path: &str) {
+ let output = file_io.new_output(path).unwrap();
+ output.write(Bytes::from("hello world")).await.unwrap();
+
+ file_io.delete_file(path).await.unwrap();
+
+ let exists = file_io.exists(path).await.unwrap();
+ assert!(!exists);
+ }
+
+ async fn common_test_mkdirs(file_io: &FileIO, dir_path: &str) {
+ file_io.mkdirs(dir_path).await.unwrap();
+
+ let exists = file_io.exists(dir_path).await.unwrap();
+ assert!(exists);
+
+ let _ = fs::remove_dir_all(dir_path.strip_prefix("file:/").unwrap());
+ }
+
+ async fn common_test_rename(file_io: &FileIO, src: &str, dst: &str) {
+ let output = file_io.new_output(src).unwrap();
+ output.write(Bytes::from("hello world")).await.unwrap();
+
+ file_io.rename(src, dst).await.unwrap();
+
+ let exists_old = file_io.exists(src).await.unwrap();
+ let exists_new = file_io.exists(dst).await.unwrap();
+ assert!(!exists_old);
+ assert!(exists_new);
+
+ file_io.delete_file(dst).await.unwrap();
+ }
+
+ #[tokio::test]
+ async fn test_delete_file_memory() {
+ let file_io = setup_memory_file_io();
+ common_test_delete_file(&file_io,
"memory:/test_file_delete_mem").await;
+ }
+
+ #[tokio::test]
+ async fn test_get_status_fs() {
+ let file_io = setup_fs_file_io();
+ common_test_get_status(&file_io,
"file:/tmp/test_file_get_status_fs").await;
+ }
+
+ #[tokio::test]
+ async fn test_exists_fs() {
+ let file_io = setup_fs_file_io();
+ common_test_exists(&file_io, "file:/tmp/test_file_exists_fs").await;
+ }
+
+ #[tokio::test]
+ async fn test_delete_file_fs() {
+ let file_io = setup_fs_file_io();
+ common_test_delete_file(&file_io,
"file:/tmp/test_file_delete_fs").await;
+ }
+
+ #[tokio::test]
+ async fn test_mkdirs_fs() {
+ let file_io = setup_fs_file_io();
+ common_test_mkdirs(&file_io, "file:/tmp/test_fs_dir/").await;
+ }
+
+ #[tokio::test]
+ async fn test_rename_fs() {
+ let file_io = setup_fs_file_io();
+ common_test_rename(
+ &file_io,
+ "file:/tmp/test_file_fs_z",
+ "file:/tmp/new_test_file_fs_o",
+ )
+ .await;
+ }
+}
+
+#[cfg(test)]
+mod input_output_test {
+ use super::*;
+ use bytes::Bytes;
+
+ fn setup_memory_file_io() -> FileIO {
+ let storage = Storage::Memory;
+ FileIO {
+ storage: Arc::new(storage),
+ }
+ }
+
+ fn setup_fs_file_io() -> FileIO {
+ let storage = Storage::LocalFs;
+ FileIO {
+ storage: Arc::new(storage),
+ }
+ }
+
+ async fn common_test_output_file_write_and_read(file_io: &FileIO, path:
&str) {
+ let output = file_io.new_output(path).unwrap();
+ output.write(Bytes::from("hello world")).await.unwrap();
+
+ let input = output.to_input_file();
+ let content = input.read().await.unwrap();
+
+ assert_eq!(&content[..], b"hello world");
+
+ file_io.delete_file(path).await.unwrap();
+ }
+
+ async fn common_test_output_file_exists(file_io: &FileIO, path: &str) {
+ let output = file_io.new_output(path).unwrap();
+ output.write(Bytes::from("hello world")).await.unwrap();
+
+ let exists = output.exists().await.unwrap();
+ assert!(exists);
+
+ file_io.delete_file(path).await.unwrap();
+ }
+
+ async fn common_test_input_file_metadata(file_io: &FileIO, path: &str) {
+ let output = file_io.new_output(path).unwrap();
+ output.write(Bytes::from("hello world")).await.unwrap();
+
+ let input = output.to_input_file();
+ let metadata = input.metadata().await.unwrap();
+
+ assert_eq!(metadata.size, 11);
+
+ file_io.delete_file(path).await.unwrap();
+ }
+
+ async fn common_test_input_file_partial_read(file_io: &FileIO, path: &str)
{
+ let output = file_io.new_output(path).unwrap();
+ output.write(Bytes::from("hello world")).await.unwrap();
+
+ let input = output.to_input_file();
+ let reader = input.reader().await.unwrap();
+ let partial_content = reader.read(0..5).await.unwrap(); // 读取 "hello"
+
+ assert_eq!(&partial_content[..], b"hello");
+
+ file_io.delete_file(path).await.unwrap();
+ }
+
+ #[tokio::test]
+ async fn test_output_file_write_and_read_memory() {
+ let file_io = setup_memory_file_io();
+ common_test_output_file_write_and_read(&file_io,
"memory:/test_file_rw_mem").await;
+ }
+
+ #[tokio::test]
+ async fn test_output_file_exists_memory() {
+ let file_io = setup_memory_file_io();
+ common_test_output_file_exists(&file_io,
"memory:/test_file_exist_mem").await;
+ }
+
+ #[tokio::test]
+ async fn test_input_file_metadata_memory() {
+ let file_io = setup_memory_file_io();
+ common_test_input_file_metadata(&file_io,
"memory:/test_file_meta_mem").await;
+ }
+
+ #[tokio::test]
+ async fn test_input_file_partial_read_memory() {
+ let file_io = setup_memory_file_io();
+ common_test_input_file_partial_read(&file_io,
"memory:/test_file_part_read_mem").await;
+ }
+
+ #[tokio::test]
+ async fn test_output_file_write_and_read_fs() {
+ let file_io = setup_fs_file_io();
+ common_test_output_file_write_and_read(&file_io,
"file:/tmp/test_file_fs_rw").await;
+ }
+
+ #[tokio::test]
+ async fn test_output_file_exists_fs() {
+ let file_io = setup_fs_file_io();
+ common_test_output_file_exists(&file_io,
"file:/tmp/test_file_exists").await;
+ }
+
+ #[tokio::test]
+ async fn test_input_file_metadata_fs() {
+ let file_io = setup_fs_file_io();
+ common_test_input_file_metadata(&file_io,
"file:/tmp/test_file_meta").await;
+ }
+
+ #[tokio::test]
+ async fn test_input_file_partial_read_fs() {
+ let file_io = setup_fs_file_io();
+ common_test_input_file_partial_read(&file_io,
"file:/tmp/test_file_read_fs").await;
+ }
}
diff --git a/crates/paimon/src/io/mod.rs b/crates/paimon/src/io/mod.rs
index a9d049b..a216946 100644
--- a/crates/paimon/src/io/mod.rs
+++ b/crates/paimon/src/io/mod.rs
@@ -17,3 +17,16 @@
mod file_io;
pub use file_io::*;
+
+mod storage;
+pub use storage::*;
+
+#[cfg(feature = "storage-fs")]
+mod storage_fs;
+#[cfg(feature = "storage-fs")]
+use storage_fs::*;
+
+#[cfg(feature = "storage-memory")]
+mod storage_memory;
+#[cfg(feature = "storage-memory")]
+use storage_memory::*;
diff --git a/crates/paimon/src/io/storage.rs b/crates/paimon/src/io/storage.rs
new file mode 100644
index 0000000..14c528c
--- /dev/null
+++ b/crates/paimon/src/io/storage.rs
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use opendal::{Operator, Scheme};
+
+use crate::error;
+
+use super::FileIOBuilder;
+
+/// The storage carries all supported storage services in paimon
+#[derive(Debug)]
+pub enum Storage {
+ #[cfg(feature = "storage-memory")]
+ Memory,
+ #[cfg(feature = "storage-fs")]
+ LocalFs,
+}
+
+impl Storage {
+ pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result<Self>
{
+ let (scheme_str, _) = file_io_builder.into_parts();
+ let scheme = Self::parse_scheme(&scheme_str)?;
+
+ match scheme {
+ #[cfg(feature = "storage-memory")]
+ Scheme::Memory => Ok(Self::Memory),
+ #[cfg(feature = "storage-fs")]
+ Scheme::Fs => Ok(Self::LocalFs),
+ _ => Err(error::Error::IoUnsupported {
+ message: "Unsupported storage feature".to_string(),
+ }),
+ }
+ }
+
+ pub(crate) fn create<'a>(&self, path: &'a str) -> crate::Result<(Operator,
&'a str)> {
+ match self {
+ #[cfg(feature = "storage-memory")]
+ Storage::Memory => {
+ let op = super::memory_config_build()?;
+
+ if let Some(stripped) = path.strip_prefix("memory:/") {
+ Ok((op, stripped))
+ } else {
+ Ok((op, &path[1..]))
+ }
+ }
+ #[cfg(feature = "storage-fs")]
+ Storage::LocalFs => {
+ let op = super::fs_config_build()?;
+
+ if let Some(stripped) = path.strip_prefix("file:/") {
+ Ok((op, stripped))
+ } else {
+ Ok((op, &path[1..]))
+ }
+ }
+ }
+ }
+
+ fn parse_scheme(scheme: &str) -> crate::Result<Scheme> {
+ match scheme {
+ "memory" => Ok(Scheme::Memory),
+ "file" | "" => Ok(Scheme::Fs),
+ s => Ok(s.parse::<Scheme>()?),
+ }
+ }
+}
diff --git a/crates/paimon/src/io/mod.rs b/crates/paimon/src/io/storage_fs.rs
similarity index 72%
copy from crates/paimon/src/io/mod.rs
copy to crates/paimon/src/io/storage_fs.rs
index a9d049b..ff38d76 100644
--- a/crates/paimon/src/io/mod.rs
+++ b/crates/paimon/src/io/storage_fs.rs
@@ -15,5 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-mod file_io;
-pub use file_io::*;
+use opendal::services::FsConfig;
+use opendal::Operator;
+
+use crate::Result;
+
+/// Build new opendal operator from give path.
+pub(crate) fn fs_config_build() -> Result<Operator> {
+ let mut cfg = FsConfig::default();
+ cfg.root = Some("/".to_string());
+
+ Ok(Operator::from_config(cfg)?.finish())
+}
diff --git a/crates/paimon/src/io/mod.rs
b/crates/paimon/src/io/storage_memory.rs
similarity index 79%
copy from crates/paimon/src/io/mod.rs
copy to crates/paimon/src/io/storage_memory.rs
index a9d049b..ffc082d 100644
--- a/crates/paimon/src/io/mod.rs
+++ b/crates/paimon/src/io/storage_memory.rs
@@ -15,5 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-mod file_io;
-pub use file_io::*;
+use opendal::services::MemoryConfig;
+use opendal::Operator;
+
+use crate::Result;
+
+pub(crate) fn memory_config_build() -> Result<Operator> {
+ Ok(Operator::from_config(MemoryConfig::default())?.finish())
+}
diff --git a/crates/paimon/src/spec/types.rs b/crates/paimon/src/spec/types.rs
index 65ec59e..b253ea4 100644
--- a/crates/paimon/src/spec/types.rs
+++ b/crates/paimon/src/spec/types.rs
@@ -1175,7 +1175,7 @@ mod serde_utils {
{
let s = String::deserialize(deserializer)?;
- let (name, nullable) = s.split_once(" ").unwrap_or((s.as_str(),
""));
+ let (name, nullable) = s.split_once(' ').unwrap_or((s.as_str(),
""));
if name == T::NAME && nullable.is_empty() {
Ok(NullableType::from(true))