This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 5124737f9 feat(io): Implement native LocalFsStorage (#2094)
5124737f9 is described below
commit 5124737f9946ccc61f47542138c7376fde8e3d7d
Author: Shawn Chang <[email protected]>
AuthorDate: Tue Feb 3 17:50:03 2026 -0800
feat(io): Implement native LocalFsStorage (#2094)
## Which issue does this PR close?
This PR depends on #2080
- Closes #2056
## What changes are included in this PR?
- Implemented `LocalFsStorage`, this can be used for testing simple
cases
## Are these changes tested?
Added some uts
---
crates/iceberg/src/io/local_fs.rs | 537 ++++++++++++++++++++++++++++++++++++++
crates/iceberg/src/io/mod.rs | 1 +
2 files changed, 538 insertions(+)
diff --git a/crates/iceberg/src/io/local_fs.rs
b/crates/iceberg/src/io/local_fs.rs
new file mode 100644
index 000000000..0a55199f7
--- /dev/null
+++ b/crates/iceberg/src/io/local_fs.rs
@@ -0,0 +1,537 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Local filesystem storage implementation for testing.
+//!
+//! This module provides a `LocalFsStorage` implementation that uses standard
+//! Rust filesystem operations. It is primarily intended for unit testing
+//! scenarios where tests need to read/write files on the local filesystem.
+
+use std::fs;
+use std::io::{Read, Seek, SeekFrom, Write};
+use std::ops::Range;
+use std::path::PathBuf;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use serde::{Deserialize, Serialize};
+
+use super::{
+ FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage,
StorageConfig,
+ StorageFactory,
+};
+use crate::{Error, ErrorKind, Result};
+
+/// Local filesystem storage implementation.
+///
+/// This storage implementation uses standard Rust filesystem operations,
+/// making it suitable for unit tests that need to read/write files on disk.
+///
+/// # Path Normalization
+///
+/// The storage normalizes paths to handle various formats:
+/// - `file:///path/to/file` -> `/path/to/file`
+/// - `file:/path/to/file` -> `/path/to/file`
+/// - `/path/to/file` -> `/path/to/file`
+/// ```
+#[derive(Debug, Clone, Default, Serialize, Deserialize)]
+pub struct LocalFsStorage;
+
+impl LocalFsStorage {
+ /// Create a new `LocalFsStorage` instance.
+ pub fn new() -> Self {
+ Self
+ }
+
+ /// Normalize a path by removing scheme prefixes.
+ ///
+ /// This handles the following formats:
+ /// - `file:///path` -> `/path`
+ /// - `file://path` -> `/path` (treats as absolute)
+ /// - `file:/path` -> `/path`
+ /// - `/path` -> `/path`
+ pub(crate) fn normalize_path(path: &str) -> PathBuf {
+ let path = if let Some(stripped) = path.strip_prefix("file://") {
+ // file:///path -> /path or file://path -> /path
+ if stripped.starts_with('/') {
+ stripped.to_string()
+ } else {
+ format!("/{stripped}")
+ }
+ } else if let Some(stripped) = path.strip_prefix("file:") {
+ // file:/path -> /path
+ if stripped.starts_with('/') {
+ stripped.to_string()
+ } else {
+ format!("/{stripped}")
+ }
+ } else {
+ path.to_string()
+ };
+ PathBuf::from(path)
+ }
+}
+
+#[async_trait]
+#[typetag::serde]
+impl Storage for LocalFsStorage {
+ async fn exists(&self, path: &str) -> Result<bool> {
+ let path = Self::normalize_path(path);
+ Ok(path.exists())
+ }
+
+ async fn metadata(&self, path: &str) -> Result<FileMetadata> {
+ let path = Self::normalize_path(path);
+ let metadata = fs::metadata(&path).map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Failed to get metadata for {}: {}", path.display(),
e),
+ )
+ })?;
+ Ok(FileMetadata {
+ size: metadata.len(),
+ })
+ }
+
+ async fn read(&self, path: &str) -> Result<Bytes> {
+ let path = Self::normalize_path(path);
+ let content = fs::read(&path).map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Failed to read file {}: {}", path.display(), e),
+ )
+ })?;
+ Ok(Bytes::from(content))
+ }
+
+ async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
+ let path = Self::normalize_path(path);
+ let file = fs::File::open(&path).map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Failed to open file {}: {}", path.display(), e),
+ )
+ })?;
+ Ok(Box::new(LocalFsFileRead::new(file)))
+ }
+
+ async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
+ let path = Self::normalize_path(path);
+
+ // Create parent directories if they don't exist
+ if let Some(parent) = path.parent() {
+ fs::create_dir_all(parent).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to create directory {}: {}",
parent.display(), e),
+ )
+ })?;
+ }
+
+ fs::write(&path, &bs).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to write file {}: {}", path.display(), e),
+ )
+ })?;
+ Ok(())
+ }
+
+ async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
+ let path = Self::normalize_path(path);
+
+ // Create parent directories if they don't exist
+ if let Some(parent) = path.parent() {
+ fs::create_dir_all(parent).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to create directory {}: {}",
parent.display(), e),
+ )
+ })?;
+ }
+
+ let file = fs::File::create(&path).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to create file {}: {}", path.display(), e),
+ )
+ })?;
+ Ok(Box::new(LocalFsFileWrite::new(file)))
+ }
+
+ async fn delete(&self, path: &str) -> Result<()> {
+ let path = Self::normalize_path(path);
+ if path.exists() {
+ fs::remove_file(&path).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to delete file {}: {}", path.display(), e),
+ )
+ })?;
+ }
+ Ok(())
+ }
+
+ async fn delete_prefix(&self, path: &str) -> Result<()> {
+ let path = Self::normalize_path(path);
+ if path.is_dir() {
+ fs::remove_dir_all(&path).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to delete directory {}: {}",
path.display(), e),
+ )
+ })?;
+ }
+ Ok(())
+ }
+
+ fn new_input(&self, path: &str) -> Result<InputFile> {
+ Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
+ }
+
+ fn new_output(&self, path: &str) -> Result<OutputFile> {
+ Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
+ }
+}
+
+/// File reader for local filesystem storage.
+#[derive(Debug)]
+pub struct LocalFsFileRead {
+ file: std::sync::Mutex<fs::File>,
+}
+
+impl LocalFsFileRead {
+ /// Create a new `LocalFsFileRead` with the given file.
+ pub fn new(file: fs::File) -> Self {
+ Self {
+ file: std::sync::Mutex::new(file),
+ }
+ }
+}
+
+#[async_trait]
+impl FileRead for LocalFsFileRead {
+ async fn read(&self, range: Range<u64>) -> Result<Bytes> {
+ let mut file = self.file.lock().map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to acquire file lock: {e}"),
+ )
+ })?;
+
+ file.seek(SeekFrom::Start(range.start)).map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Failed to seek to position {}: {}", range.start, e),
+ )
+ })?;
+
+ let len = (range.end - range.start) as usize;
+ let mut buffer = vec![0u8; len];
+ file.read_exact(&mut buffer).map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Failed to read {len} bytes: {e}"),
+ )
+ })?;
+
+ Ok(Bytes::from(buffer))
+ }
+}
+
+/// File writer for local filesystem storage.
+///
+/// This struct implements `FileWrite` for writing to local files.
+#[derive(Debug)]
+pub struct LocalFsFileWrite {
+ file: Option<fs::File>,
+}
+
+impl LocalFsFileWrite {
+ /// Create a new `LocalFsFileWrite` for the given file.
+ pub fn new(file: fs::File) -> Self {
+ Self { file: Some(file) }
+ }
+}
+
+#[async_trait]
+impl FileWrite for LocalFsFileWrite {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ let file = self
+ .file
+ .as_mut()
+ .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Cannot write to
closed file"))?;
+
+ file.write_all(&bs).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to write to file: {e}"),
+ )
+ })?;
+
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ let file = self
+ .file
+ .take()
+ .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "File already
closed"))?;
+
+ file.sync_all()
+ .map_err(|e| Error::new(ErrorKind::Unexpected, format!("Failed to
sync file: {e}")))?;
+
+ Ok(())
+ }
+}
+
+/// Factory for creating `LocalFsStorage` instances.
+///
+/// This factory implements `StorageFactory` and creates `LocalFsStorage`
+/// instances for the "file" scheme.
+///
+/// # Example
+///
+/// ```rust,ignore
+/// use iceberg::io::{StorageConfig, StorageFactory, LocalFsStorageFactory};
+///
+/// let factory = LocalFsStorageFactory;
+/// let config = StorageConfig::new();
+/// let storage = factory.build(&config)?;
+/// ```
+#[derive(Clone, Debug, Default, Serialize, Deserialize)]
+pub struct LocalFsStorageFactory;
+
+#[typetag::serde]
+impl StorageFactory for LocalFsStorageFactory {
+ fn build(&self, _config: &StorageConfig) -> Result<Arc<dyn Storage>> {
+ Ok(Arc::new(LocalFsStorage::new()))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use tempfile::TempDir;
+
+ use super::*;
+
+ #[test]
+ fn test_normalize_path() {
+ // Test file:/// prefix
+ assert_eq!(
+ LocalFsStorage::normalize_path("file:///path/to/file"),
+ PathBuf::from("/path/to/file")
+ );
+
+ // Test file:// prefix (without leading slash in path)
+ assert_eq!(
+ LocalFsStorage::normalize_path("file://path/to/file"),
+ PathBuf::from("/path/to/file")
+ );
+
+ // Test file:/ prefix
+ assert_eq!(
+ LocalFsStorage::normalize_path("file:/path/to/file"),
+ PathBuf::from("/path/to/file")
+ );
+
+ // Test bare path
+ assert_eq!(
+ LocalFsStorage::normalize_path("/path/to/file"),
+ PathBuf::from("/path/to/file")
+ );
+ }
+
+ #[tokio::test]
+ async fn test_local_fs_storage_write_read() {
+ let tmp_dir = TempDir::new().unwrap();
+ let storage = LocalFsStorage::new();
+ let path = tmp_dir.path().join("test.txt");
+ let path_str = path.to_str().unwrap();
+ let content = Bytes::from("Hello, World!");
+
+ // Write
+ storage.write(path_str, content.clone()).await.unwrap();
+
+ // Read
+ let read_content = storage.read(path_str).await.unwrap();
+ assert_eq!(read_content, content);
+ }
+
+ #[tokio::test]
+ async fn test_local_fs_storage_exists() {
+ let tmp_dir = TempDir::new().unwrap();
+ let storage = LocalFsStorage::new();
+ let path = tmp_dir.path().join("test.txt");
+ let path_str = path.to_str().unwrap();
+
+ // File doesn't exist initially
+ assert!(!storage.exists(path_str).await.unwrap());
+
+ // Write file
+ storage.write(path_str, Bytes::from("test")).await.unwrap();
+
+ // File exists now
+ assert!(storage.exists(path_str).await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_local_fs_storage_metadata() {
+ let tmp_dir = TempDir::new().unwrap();
+ let storage = LocalFsStorage::new();
+ let path = tmp_dir.path().join("test.txt");
+ let path_str = path.to_str().unwrap();
+ let content = Bytes::from("Hello, World!");
+
+ storage.write(path_str, content.clone()).await.unwrap();
+
+ let metadata = storage.metadata(path_str).await.unwrap();
+ assert_eq!(metadata.size, content.len() as u64);
+ }
+
+ #[tokio::test]
+ async fn test_local_fs_storage_delete() {
+ let tmp_dir = TempDir::new().unwrap();
+ let storage = LocalFsStorage::new();
+ let path = tmp_dir.path().join("test.txt");
+ let path_str = path.to_str().unwrap();
+
+ storage.write(path_str, Bytes::from("test")).await.unwrap();
+ assert!(storage.exists(path_str).await.unwrap());
+
+ storage.delete(path_str).await.unwrap();
+ assert!(!storage.exists(path_str).await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_local_fs_storage_delete_prefix() {
+ let tmp_dir = TempDir::new().unwrap();
+ let storage = LocalFsStorage::new();
+ let dir_path = tmp_dir.path().join("subdir");
+ let file1 = dir_path.join("file1.txt");
+ let file2 = dir_path.join("file2.txt");
+
+ // Create files in subdirectory
+ storage
+ .write(file1.to_str().unwrap(), Bytes::from("1"))
+ .await
+ .unwrap();
+ storage
+ .write(file2.to_str().unwrap(), Bytes::from("2"))
+ .await
+ .unwrap();
+
+ // Delete prefix (directory)
+ storage
+ .delete_prefix(dir_path.to_str().unwrap())
+ .await
+ .unwrap();
+
+ // Directory should be deleted
+ assert!(!dir_path.exists());
+ }
+
+ #[tokio::test]
+ async fn test_local_fs_storage_reader() {
+ let tmp_dir = TempDir::new().unwrap();
+ let storage = LocalFsStorage::new();
+ let path = tmp_dir.path().join("test.txt");
+ let path_str = path.to_str().unwrap();
+ let content = Bytes::from("Hello, World!");
+
+ storage.write(path_str, content.clone()).await.unwrap();
+
+ let reader = storage.reader(path_str).await.unwrap();
+ let read_content = reader.read(0..content.len() as u64).await.unwrap();
+ assert_eq!(read_content, content);
+
+ // Test partial read
+ let partial = reader.read(0..5).await.unwrap();
+ assert_eq!(partial, Bytes::from("Hello"));
+ }
+
+ #[tokio::test]
+ async fn test_local_fs_storage_writer() {
+ let tmp_dir = TempDir::new().unwrap();
+ let storage = LocalFsStorage::new();
+ let path = tmp_dir.path().join("test.txt");
+ let path_str = path.to_str().unwrap();
+
+ let mut writer = storage.writer(path_str).await.unwrap();
+ writer.write(Bytes::from("Hello, ")).await.unwrap();
+ writer.write(Bytes::from("World!")).await.unwrap();
+ writer.close().await.unwrap();
+
+ let content = storage.read(path_str).await.unwrap();
+ assert_eq!(content, Bytes::from("Hello, World!"));
+ }
+
+ #[tokio::test]
+ async fn test_local_fs_file_write_double_close() {
+ let tmp_dir = TempDir::new().unwrap();
+ let storage = LocalFsStorage::new();
+ let path = tmp_dir.path().join("test.txt");
+ let path_str = path.to_str().unwrap();
+
+ let mut writer = storage.writer(path_str).await.unwrap();
+ writer.write(Bytes::from("test")).await.unwrap();
+ writer.close().await.unwrap();
+
+ // Second close should fail
+ let result = writer.close().await;
+ assert!(result.is_err());
+ }
+
+ #[tokio::test]
+ async fn test_local_fs_file_write_after_close() {
+ let tmp_dir = TempDir::new().unwrap();
+ let storage = LocalFsStorage::new();
+ let path = tmp_dir.path().join("test.txt");
+ let path_str = path.to_str().unwrap();
+
+ let mut writer = storage.writer(path_str).await.unwrap();
+ writer.close().await.unwrap();
+
+ // Write after close should fail
+ let result = writer.write(Bytes::from("test")).await;
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_local_fs_storage_factory() {
+ let factory = LocalFsStorageFactory;
+ let config = StorageConfig::new();
+ let storage = factory.build(&config).unwrap();
+
+ // Verify we got a valid storage instance
+ assert!(format!("{storage:?}").contains("LocalFsStorage"));
+ }
+
+ #[tokio::test]
+ async fn test_local_fs_creates_parent_directories() {
+ let tmp_dir = TempDir::new().unwrap();
+ let storage = LocalFsStorage::new();
+ let path = tmp_dir.path().join("a/b/c/test.txt");
+ let path_str = path.to_str().unwrap();
+
+ // Write should create parent directories
+ storage.write(path_str, Bytes::from("test")).await.unwrap();
+
+ assert!(path.exists());
+ }
+}
diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs
index 888b868e3..8c76bcbcc 100644
--- a/crates/iceberg/src/io/mod.rs
+++ b/crates/iceberg/src/io/mod.rs
@@ -68,6 +68,7 @@
mod config;
mod file_io;
+mod local_fs;
mod opendal;
mod storage;