This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c55de4  feat: Introduce FileIO (#53)
8c55de4 is described below

commit 8c55de45f50714eb0c7357aa8a4898dfa4dbdaac
Author: Renjie Liu <[email protected]>
AuthorDate: Wed Sep 20 14:15:51 2023 +0800

    feat: Introduce FileIO (#53)
    
    * feat: Introduce FileIO
    
    * Sort
    
    * fix
    
    * Fix typo
    
    * Fix comments
    
    * Rename InputStream to FileRead
    
    * FileWrite trait
---
 crates/iceberg/Cargo.toml   |   5 +
 crates/iceberg/src/error.rs |  12 ++
 crates/iceberg/src/io.rs    | 456 ++++++++++++++++++++++++++++++++++++++++++++
 crates/iceberg/src/lib.rs   |   1 +
 4 files changed, 474 insertions(+)

diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index d025734..007c5c2 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -34,9 +34,11 @@ bitvec = "1.0.1"
 chrono = "0.4"
 derive_builder = "0.12.0"
 either = "1"
+futures = "0.3"
 itertools = "0.11"
 lazy_static = "1"
 once_cell = "1"
+opendal = "0.39"
 ordered-float = "3.7.0"
 rust_decimal = "1.31.0"
 serde = { version = "^1.0", features = ["rc"] }
@@ -44,7 +46,10 @@ serde_bytes = "0.11.8"
 serde_derive = "^1.0"
 serde_json = "^1.0"
 serde_repr = "0.1.16"
+url = "2"
 uuid = "1.4.1"
 
 [dev-dependencies]
 pretty_assertions = "1.4.0"
+tempdir = "0.3"
+tokio = { version = "1", features = ["macros"] }
diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs
index 48bdebc..e4ae576 100644
--- a/crates/iceberg/src/error.rs
+++ b/crates/iceberg/src/error.rs
@@ -295,6 +295,18 @@ define_from_err!(
     "Failure in conversion with avro"
 );
 
+define_from_err!(
+    opendal::Error,
+    ErrorKind::Unexpected,
+    "Failure in doing io operation"
+);
+
+define_from_err!(
+    url::ParseError,
+    ErrorKind::DataInvalid,
+    "Failed to parse url"
+);
+
 /// Helper macro to check arguments.
 ///
 ///
diff --git a/crates/iceberg/src/io.rs b/crates/iceberg/src/io.rs
new file mode 100644
index 0000000..9d55aac
--- /dev/null
+++ b/crates/iceberg/src/io.rs
@@ -0,0 +1,456 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! File io implementation.
+//!
+//! # How to build `FileIO`
+//!
+//! We provided a `FileIOBuilder` to build `FileIO` from scratch. For example:
+//! ```rust
+//! use iceberg::io::{FileIOBuilder, S3_REGION};
+//!
+//! let file_io = FileIOBuilder::new("s3")
+//!     .with_prop(S3_REGION, "us-east-1")
+//!     .build()
+//!     .unwrap();
+//! ```
+//!
+//! # How to use `FileIO`
+//!
+//! Currently `FileIO` provides simple methods for file operations:
+//!
+//! - `delete`: Delete file.
+//! - `is_exist`: Check if file exists.
+//! - `new_input`: Create input file for reading.
+//! - `new_output`: Create output file for writing.
+
+use std::{collections::HashMap, sync::Arc};
+
+use crate::{error::Result, Error, ErrorKind};
+use futures::{AsyncRead, AsyncSeek, AsyncWrite};
+use once_cell::sync::Lazy;
+use opendal::{Operator, Scheme};
+use url::Url;
+
+/// Following are arguments for [s3 file 
io](https://py.iceberg.apache.org/configuration/#s3).
+/// S3 endopint.
+pub const S3_ENDPOINT: &str = "s3.endpoint";
+/// S3 access key id.
+pub const S3_ACCESS_KEY_ID: &str = "s3.access-key-id";
+/// S3 secret access key.
+pub const S3_SECRET_ACCESS_KEY: &str = "s3.secret-access-key";
+/// S3 region.
+pub const S3_REGION: &str = "s3.region";
+
+/// A mapping from iceberg s3 configuration key to [`opendal::Operator`] 
configuration key.
+static S3_CONFIG_MAPPING: Lazy<HashMap<&'static str, &'static str>> = 
Lazy::new(|| {
+    let mut m = HashMap::with_capacity(4);
+    m.insert(S3_ENDPOINT, "endpoint");
+    m.insert(S3_ACCESS_KEY_ID, "access_key_id");
+    m.insert(S3_SECRET_ACCESS_KEY, "secret_access_key");
+    m.insert(S3_REGION, "region");
+
+    m
+});
+
+const DEFAULT_ROOT_PATH: &str = "/";
+/// FileIO implementation, used to manipulate files in underlying storage.
+///
+/// # Note
+///
+/// All path passed to `FileIO` must be absolute path starting with scheme 
string used to construct `FileIO`.
+/// For example, if you construct `FileIO` with `s3a` scheme, then all path 
passed to `FileIO` must start with `s3a://`.
+#[derive(Clone, Debug)]
+pub struct FileIO {
+    inner: Arc<Storage>,
+}
+
+/// Builder for [`FileIO`].
+pub struct FileIOBuilder {
+    /// This is used to infer scheme of operator.
+    ///
+    /// If this is `None`, then [`FileIOBuilder::build`](FileIOBuilder::build) 
will build a local file io.
+    scheme_str: Option<String>,
+    /// Arguments for operator.
+    props: HashMap<String, String>,
+}
+
+impl FileIOBuilder {
+    /// Creates a new builder with scheme.
+    pub fn new(scheme_str: impl ToString) -> Self {
+        Self {
+            scheme_str: Some(scheme_str.to_string()),
+            props: HashMap::default(),
+        }
+    }
+
+    /// Creates a new builder for local file io.
+    pub fn new_fs_io() -> Self {
+        Self {
+            scheme_str: None,
+            props: HashMap::default(),
+        }
+    }
+
+    /// Add argument for operator.
+    pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> 
Self {
+        self.props.insert(key.to_string(), value.to_string());
+        self
+    }
+
+    /// Add argument for operator.
+    pub fn with_props(
+        mut self,
+        args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
+    ) -> Self {
+        self.props
+            .extend(args.into_iter().map(|e| (e.0.to_string(), 
e.1.to_string())));
+        self
+    }
+
+    /// Builds [`FileIO`].
+    pub fn build(self) -> Result<FileIO> {
+        let storage = Storage::build(self)?;
+        Ok(FileIO {
+            inner: Arc::new(storage),
+        })
+    }
+}
+
+impl FileIO {
+    /// Deletes file.
+    pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
+        let (op, relative_path) = self.inner.create_operator(&path)?;
+        Ok(op.delete(relative_path).await?)
+    }
+
+    /// Check file exists.
+    pub async fn is_exist(&self, path: impl AsRef<str>) -> Result<bool> {
+        let (op, relative_path) = self.inner.create_operator(&path)?;
+        Ok(op.is_exist(relative_path).await?)
+    }
+
+    /// Creates input file.
+    pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
+        let (op, relative_path) = self.inner.create_operator(&path)?;
+        let path = path.as_ref().to_string();
+        let relative_path_pos = path.len() - relative_path.len();
+        Ok(InputFile {
+            op,
+            path,
+            relative_path_pos,
+        })
+    }
+
+    /// Creates output file.
+    pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
+        let (op, relative_path) = self.inner.create_operator(&path)?;
+        let path = path.as_ref().to_string();
+        let relative_path_pos = path.len() - relative_path.len();
+        Ok(OutputFile {
+            op,
+            path,
+            relative_path_pos,
+        })
+    }
+}
+
+/// Input file is used for reading from files.
+#[derive(Debug)]
+pub struct InputFile {
+    op: Operator,
+    // Absolution path of file.
+    path: String,
+    // Relative path of file to uri, starts at [`relative_path_pos`]
+    relative_path_pos: usize,
+}
+
+/// Trait for reading file.
+pub trait FileRead: AsyncRead + AsyncSeek {}
+
+impl<T> FileRead for T where T: AsyncRead + AsyncSeek {}
+
+impl InputFile {
+    /// Absolute path to root uri.
+    pub fn location(&self) -> &str {
+        &self.path
+    }
+
+    /// Check if file exists.
+    pub async fn exists(&self) -> Result<bool> {
+        Ok(self
+            .op
+            .is_exist(&self.path[self.relative_path_pos..])
+            .await?)
+    }
+
+    /// Creates [`InputStream`] for reading.
+    pub async fn reader(&self) -> Result<impl FileRead> {
+        Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
+    }
+}
+
+/// Trait for writing file.
+pub trait FileWrite: AsyncWrite {}
+
+impl<T> FileWrite for T where T: AsyncWrite {}
+
+/// Output file is used for writing to files..
+#[derive(Debug)]
+pub struct OutputFile {
+    op: Operator,
+    // Absolution path of file.
+    path: String,
+    // Relative path of file to uri, starts at [`relative_path_pos`]
+    relative_path_pos: usize,
+}
+
+impl OutputFile {
+    /// Relative path to root uri.
+    pub fn location(&self) -> &str {
+        &self.path
+    }
+
+    /// Checks if file exists.
+    pub async fn exists(&self) -> Result<bool> {
+        Ok(self
+            .op
+            .is_exist(&self.path[self.relative_path_pos..])
+            .await?)
+    }
+
+    /// Converts into [`InputFile`].
+    pub fn to_input_file(self) -> InputFile {
+        InputFile {
+            op: self.op,
+            path: self.path,
+            relative_path_pos: self.relative_path_pos,
+        }
+    }
+
+    /// Creates output file for writing.
+    pub async fn writer(&self) -> Result<impl FileWrite> {
+        Ok(self.op.writer(&self.path[self.relative_path_pos..]).await?)
+    }
+}
+
+// We introduce this because I don't want to handle unsupported `Scheme` in 
every method.
+#[derive(Debug)]
+enum Storage {
+    LocalFs {
+        op: Operator,
+    },
+    S3 {
+        scheme_str: String,
+        props: HashMap<String, String>,
+    },
+}
+
+impl Storage {
+    /// Creates operator from path.
+    ///
+    /// # Arguments
+    ///
+    /// * path: It should be *absolute* path starting with scheme string used 
to construct [`FileIO`].
+    ///
+    /// # Returns
+    ///
+    /// The return value consists of two parts:
+    ///
+    /// * An [`opendal::Operator`] instance used to operate on file.
+    /// * Relative path to the root uri of [`opendal::Operator`].
+    ///
+    fn create_operator<'a>(&self, path: &'a impl AsRef<str>) -> 
Result<(Operator, &'a str)> {
+        let path = path.as_ref();
+        match self {
+            Storage::LocalFs { op } => {
+                if let Some(stripped) = path.strip_prefix("file:/") {
+                    Ok((op.clone(), stripped))
+                } else {
+                    Ok((op.clone(), &path[1..]))
+                }
+            }
+            Storage::S3 { scheme_str, props } => {
+                let mut props = props.clone();
+                let url = Url::parse(path)?;
+                let bucket = url.host_str().ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Invalid s3 url: {}, missing bucket", path),
+                    )
+                })?;
+
+                props.insert("bucket".to_string(), bucket.to_string());
+
+                let prefix = format!("{}://{}/", scheme_str, bucket);
+                if path.starts_with(&prefix) {
+                    Ok((Operator::via_map(Scheme::S3, props)?, 
&path[prefix.len()..]))
+                } else {
+                    Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Invalid s3 url: {}, should start with {}", 
path, prefix),
+                    ))
+                }
+            }
+        }
+    }
+
+    /// Parse scheme.
+    fn parse_scheme(scheme: &str) -> Result<Scheme> {
+        match scheme {
+            "file" | "" => Ok(Scheme::Fs),
+            "s3" | "s3a" => Ok(Scheme::S3),
+            s => Ok(s.parse::<Scheme>()?),
+        }
+    }
+
+    /// Convert iceberg config to opendal config.
+    fn build(file_io_builder: FileIOBuilder) -> Result<Self> {
+        let scheme_str = file_io_builder.scheme_str.unwrap_or("".to_string());
+        let scheme = Self::parse_scheme(&scheme_str)?;
+        let mut new_props = HashMap::default();
+        new_props.insert("root".to_string(), DEFAULT_ROOT_PATH.to_string());
+
+        match scheme {
+            Scheme::Fs => Ok(Self::LocalFs {
+                op: Operator::via_map(Scheme::Fs, new_props)?,
+            }),
+            Scheme::S3 => {
+                for prop in file_io_builder.props {
+                    if let Some(op_key) = 
S3_CONFIG_MAPPING.get(prop.0.as_str()) {
+                        new_props.insert(op_key.to_string(), prop.1);
+                    }
+                }
+
+                Ok(Self::S3 {
+                    scheme_str,
+                    props: new_props,
+                })
+            }
+            _ => Err(Error::new(
+                ErrorKind::FeatureUnsupported,
+                format!("Constructing file io from scheme: {scheme} not 
supported now",),
+            )),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+
+    use std::io::Write;
+
+    use std::{fs::File, path::Path};
+
+    use futures::io::AllowStdIo;
+    use futures::{AsyncReadExt, AsyncWriteExt};
+
+    use tempdir::TempDir;
+
+    use super::{FileIO, FileIOBuilder};
+
+    fn create_local_file_io() -> FileIO {
+        FileIOBuilder::new_fs_io().build().unwrap()
+    }
+
+    fn write_to_file<P: AsRef<Path>>(s: &str, path: P) {
+        let mut f = File::create(path).unwrap();
+        write!(f, "{s}").unwrap();
+    }
+
+    async fn read_from_file<P: AsRef<Path>>(path: P) -> String {
+        let mut f = AllowStdIo::new(File::open(path).unwrap());
+        let mut s = String::new();
+        f.read_to_string(&mut s).await.unwrap();
+        s
+    }
+
+    #[tokio::test]
+    async fn test_local_input_file() {
+        let tmp_dir = TempDir::new("test").unwrap();
+
+        let file_name = "a.txt";
+        let content = "Iceberg loves rust.";
+
+        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), 
file_name);
+        write_to_file(content, &full_path);
+
+        let file_io = create_local_file_io();
+        let input_file = file_io.new_input(&full_path).unwrap();
+
+        assert!(input_file.exists().await.unwrap());
+        // Remove heading slash
+        assert_eq!(&full_path, input_file.location());
+        let read_content = read_from_file(full_path).await;
+
+        assert_eq!(content, &read_content);
+    }
+
+    #[tokio::test]
+    async fn test_delete_local_file() {
+        let tmp_dir = TempDir::new("test").unwrap();
+
+        let file_name = "a.txt";
+        let content = "Iceberg loves rust.";
+
+        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), 
file_name);
+        write_to_file(content, &full_path);
+
+        let file_io = create_local_file_io();
+        assert!(file_io.is_exist(&full_path).await.unwrap());
+        file_io.delete(&full_path).await.unwrap();
+        assert!(!file_io.is_exist(&full_path).await.unwrap());
+    }
+
+    #[tokio::test]
+    async fn test_delete_non_exist_file() {
+        let tmp_dir = TempDir::new("test").unwrap();
+
+        let file_name = "a.txt";
+        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), 
file_name);
+
+        let file_io = create_local_file_io();
+        assert!(!file_io.is_exist(&full_path).await.unwrap());
+        assert!(file_io.delete(&full_path).await.is_ok());
+    }
+
+    #[tokio::test]
+    async fn test_local_output_file() {
+        let tmp_dir = TempDir::new("test").unwrap();
+
+        let file_name = "a.txt";
+        let content = "Iceberg loves rust.";
+
+        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), 
file_name);
+
+        let file_io = create_local_file_io();
+        let output_file = file_io.new_output(&full_path).unwrap();
+
+        assert!(!output_file.exists().await.unwrap());
+        {
+            let mut writer = output_file.writer().await.unwrap();
+            writer.write_all(content.as_bytes()).await.unwrap();
+            writer.close().await.unwrap();
+        }
+
+        assert_eq!(&full_path, output_file.location());
+
+        let read_content = read_from_file(full_path).await;
+
+        assert_eq!(content, &read_content);
+    }
+}
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 5ef9ad3..93413d7 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -28,4 +28,5 @@ pub use error::ErrorKind;
 pub use error::Result;
 
 mod avro;
+pub mod io;
 pub mod spec;

Reply via email to