This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 8587044bc feat(io): Implement native MemoryStorage (#2097)
8587044bc is described below
commit 8587044bc3f14fe9628df9c5b761cb3d63588825
Author: Shawn Chang <[email protected]>
AuthorDate: Tue Feb 3 22:22:26 2026 -0800
feat(io): Implement native MemoryStorage (#2097)
## Which issue does this PR close?
- Closes #2057
- This depends on #2083
## What changes are included in this PR?
- Add a new MemoryStorage (not used anywhere for now)
## Are these changes tested?
Added uts
---
crates/iceberg/src/io/memory.rs | 597 ++++++++++++++++++++++++++++++++++++++++
crates/iceberg/src/io/mod.rs | 1 +
2 files changed, 598 insertions(+)
diff --git a/crates/iceberg/src/io/memory.rs b/crates/iceberg/src/io/memory.rs
new file mode 100644
index 000000000..39f1f5db9
--- /dev/null
+++ b/crates/iceberg/src/io/memory.rs
@@ -0,0 +1,597 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pure Rust in-memory storage implementation for testing.
+//!
+//! This module provides a `MemoryStorage` implementation that stores data
+//! in a thread-safe `HashMap`, without any external dependencies.
+//! It is primarily intended for unit testing and scenarios where persistent
+//! storage is not needed.
+
+use std::collections::HashMap;
+use std::ops::Range;
+use std::sync::{Arc, RwLock};
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use serde::{Deserialize, Serialize};
+
+use super::{
+ FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage,
StorageConfig,
+ StorageFactory,
+};
+use crate::{Error, ErrorKind, Result};
+
+/// In-memory storage implementation.
+///
+/// This storage implementation stores all data in a thread-safe `HashMap`,
+/// making it suitable for unit tests and scenarios where persistent storage
+/// is not needed.
+///
+/// # Path Normalization
+///
+/// The storage normalizes paths to handle various formats:
+/// - `memory://path/to/file` -> `path/to/file`
+/// - `memory:/path/to/file` -> `path/to/file`
+/// - `/path/to/file` -> `path/to/file`
+/// - `path/to/file` -> `path/to/file`
+///
+/// # Serialization
+///
+/// When serialized, `MemoryStorage` serializes to an empty state. When
+/// deserialized, it creates a new empty instance. This is intentional
+/// because in-memory data cannot be meaningfully serialized across
+/// process boundaries.
+/// ```
+#[derive(Debug, Clone, Default, Serialize, Deserialize)]
+pub struct MemoryStorage {
+ #[serde(skip, default = "default_memory_data")]
+ data: Arc<RwLock<HashMap<String, Bytes>>>,
+}
+
+fn default_memory_data() -> Arc<RwLock<HashMap<String, Bytes>>> {
+ Arc::new(RwLock::new(HashMap::new()))
+}
+
+impl MemoryStorage {
+ /// Create a new empty `MemoryStorage` instance.
+ pub fn new() -> Self {
+ Self {
+ data: Arc::new(RwLock::new(HashMap::new())),
+ }
+ }
+
+ /// Normalize a path by removing scheme prefixes and leading slashes.
+ ///
+ /// This handles the following formats:
+ /// - `memory://path` -> `path`
+ /// - `memory:/path` -> `path`
+ /// - `/path` -> `path`
+ /// - `path` -> `path`
+ pub(crate) fn normalize_path(path: &str) -> String {
+ // Handle memory:// prefix (with double slash)
+ let path = path.strip_prefix("memory://").unwrap_or(path);
+ // Handle memory:/ prefix (with single slash)
+ let path = path.strip_prefix("memory:/").unwrap_or(path);
+ // Remove any leading slashes
+ path.trim_start_matches('/').to_string()
+ }
+}
+
+#[async_trait]
+#[typetag::serde]
+impl Storage for MemoryStorage {
+ async fn exists(&self, path: &str) -> Result<bool> {
+ let normalized = Self::normalize_path(path);
+ let data = self.data.read().map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to acquire read lock: {e}"),
+ )
+ })?;
+ Ok(data.contains_key(&normalized))
+ }
+
+ async fn metadata(&self, path: &str) -> Result<FileMetadata> {
+ let normalized = Self::normalize_path(path);
+ let data = self.data.read().map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to acquire read lock: {e}"),
+ )
+ })?;
+ match data.get(&normalized) {
+ Some(bytes) => Ok(FileMetadata {
+ size: bytes.len() as u64,
+ }),
+ None => Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("File not found: {path}"),
+ )),
+ }
+ }
+
+ async fn read(&self, path: &str) -> Result<Bytes> {
+ let normalized = Self::normalize_path(path);
+ let data = self.data.read().map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to acquire read lock: {e}"),
+ )
+ })?;
+ match data.get(&normalized) {
+ Some(bytes) => Ok(bytes.clone()),
+ None => Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("File not found: {path}"),
+ )),
+ }
+ }
+
+ async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
+ let normalized = Self::normalize_path(path);
+ let data = self.data.read().map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to acquire read lock: {e}"),
+ )
+ })?;
+ match data.get(&normalized) {
+ Some(bytes) => Ok(Box::new(MemoryFileRead::new(bytes.clone()))),
+ None => Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("File not found: {path}"),
+ )),
+ }
+ }
+
+ async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
+ let normalized = Self::normalize_path(path);
+ let mut data = self.data.write().map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to acquire write lock: {e}"),
+ )
+ })?;
+ data.insert(normalized, bs);
+ Ok(())
+ }
+
+ async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
+ let normalized = Self::normalize_path(path);
+ Ok(Box::new(MemoryFileWrite::new(
+ self.data.clone(),
+ normalized,
+ )))
+ }
+
+ async fn delete(&self, path: &str) -> Result<()> {
+ let normalized = Self::normalize_path(path);
+ let mut data = self.data.write().map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to acquire write lock: {e}"),
+ )
+ })?;
+ data.remove(&normalized);
+ Ok(())
+ }
+
+ async fn delete_prefix(&self, path: &str) -> Result<()> {
+ let normalized = Self::normalize_path(path);
+ let prefix = if normalized.ends_with('/') {
+ normalized
+ } else {
+ format!("{normalized}/")
+ };
+
+ let mut data = self.data.write().map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to acquire write lock: {e}"),
+ )
+ })?;
+
+ // Collect keys to remove (can't modify while iterating)
+ let keys_to_remove: Vec<String> = data
+ .keys()
+ .filter(|k| k.starts_with(&prefix))
+ .cloned()
+ .collect();
+
+ for key in keys_to_remove {
+ data.remove(&key);
+ }
+
+ Ok(())
+ }
+
+ fn new_input(&self, path: &str) -> Result<InputFile> {
+ Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
+ }
+
+ fn new_output(&self, path: &str) -> Result<OutputFile> {
+ Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
+ }
+}
+
+/// Factory for creating `MemoryStorage` instances.
+///
+/// This factory implements `StorageFactory` and creates `MemoryStorage`
+/// instances. Since the factory is explicitly chosen, no scheme validation
+/// is performed - the storage will validate paths during operations.
+#[derive(Clone, Debug, Default, Serialize, Deserialize)]
+pub struct MemoryStorageFactory;
+
+#[typetag::serde]
+impl StorageFactory for MemoryStorageFactory {
+ fn build(&self, _config: &StorageConfig) -> Result<Arc<dyn Storage>> {
+ Ok(Arc::new(MemoryStorage::new()))
+ }
+}
+
+/// File reader for in-memory storage.
+#[derive(Debug)]
+pub struct MemoryFileRead {
+ data: Bytes,
+}
+
+impl MemoryFileRead {
+ /// Create a new `MemoryFileRead` with the given data.
+ pub fn new(data: Bytes) -> Self {
+ Self { data }
+ }
+}
+
+#[async_trait]
+impl FileRead for MemoryFileRead {
+ async fn read(&self, range: Range<u64>) -> Result<Bytes> {
+ let start = range.start as usize;
+ let end = range.end as usize;
+
+ if start > self.data.len() || end > self.data.len() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Range {}..{} is out of bounds for data of length {}",
+ start,
+ end,
+ self.data.len()
+ ),
+ ));
+ }
+
+ Ok(self.data.slice(start..end))
+ }
+}
+
+/// File writer for in-memory storage.
+///
+/// This struct implements `FileWrite` for writing to in-memory storage.
+/// Data is buffered until `close()` is called, at which point it is
+/// flushed to the storage.
+#[derive(Debug)]
+pub struct MemoryFileWrite {
+ data: Arc<RwLock<HashMap<String, Bytes>>>,
+ path: String,
+ buffer: Vec<u8>,
+ closed: bool,
+}
+
+impl MemoryFileWrite {
+ /// Create a new `MemoryFileWrite` for the given path.
+ pub fn new(data: Arc<RwLock<HashMap<String, Bytes>>>, path: String) ->
Self {
+ Self {
+ data,
+ path,
+ buffer: Vec::new(),
+ closed: false,
+ }
+ }
+}
+
+#[async_trait]
+impl FileWrite for MemoryFileWrite {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ if self.closed {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Cannot write to closed file",
+ ));
+ }
+ self.buffer.extend_from_slice(&bs);
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ if self.closed {
+ return Err(Error::new(ErrorKind::DataInvalid, "File already
closed"));
+ }
+
+ let mut data = self.data.write().map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!("Failed to acquire write lock: {e}"),
+ )
+ })?;
+
+ data.insert(
+ self.path.clone(),
+ Bytes::from(std::mem::take(&mut self.buffer)),
+ );
+ self.closed = true;
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_normalize_path() {
+ // Test memory:// prefix
+ assert_eq!(
+ MemoryStorage::normalize_path("memory://path/to/file"),
+ "path/to/file"
+ );
+
+ // Test memory:/ prefix
+ assert_eq!(
+ MemoryStorage::normalize_path("memory:/path/to/file"),
+ "path/to/file"
+ );
+
+ // Test leading slash
+ assert_eq!(
+ MemoryStorage::normalize_path("/path/to/file"),
+ "path/to/file"
+ );
+
+ // Test bare path
+ assert_eq!(
+ MemoryStorage::normalize_path("path/to/file"),
+ "path/to/file"
+ );
+
+ // Test multiple leading slashes
+ assert_eq!(
+ MemoryStorage::normalize_path("///path/to/file"),
+ "path/to/file"
+ );
+
+ // Test memory:// with leading slash in path
+ assert_eq!(
+ MemoryStorage::normalize_path("memory:///path/to/file"),
+ "path/to/file"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_memory_storage_write_read() {
+ let storage = MemoryStorage::new();
+ let path = "memory://test/file.txt";
+ let content = Bytes::from("Hello, World!");
+
+ // Write
+ storage.write(path, content.clone()).await.unwrap();
+
+ // Read
+ let read_content = storage.read(path).await.unwrap();
+ assert_eq!(read_content, content);
+ }
+
+ #[tokio::test]
+ async fn test_memory_storage_exists() {
+ let storage = MemoryStorage::new();
+ let path = "memory://test/file.txt";
+
+ // File doesn't exist initially
+ assert!(!storage.exists(path).await.unwrap());
+
+ // Write file
+ storage.write(path, Bytes::from("test")).await.unwrap();
+
+ // File exists now
+ assert!(storage.exists(path).await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_memory_storage_metadata() {
+ let storage = MemoryStorage::new();
+ let path = "memory://test/file.txt";
+ let content = Bytes::from("Hello, World!");
+
+ storage.write(path, content.clone()).await.unwrap();
+
+ let metadata = storage.metadata(path).await.unwrap();
+ assert_eq!(metadata.size, content.len() as u64);
+ }
+
+ #[tokio::test]
+ async fn test_memory_storage_delete() {
+ let storage = MemoryStorage::new();
+ let path = "memory://test/file.txt";
+
+ storage.write(path, Bytes::from("test")).await.unwrap();
+ assert!(storage.exists(path).await.unwrap());
+
+ storage.delete(path).await.unwrap();
+ assert!(!storage.exists(path).await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_memory_storage_delete_prefix() {
+ let storage = MemoryStorage::new();
+
+ // Create multiple files
+ storage
+ .write("memory://dir/file1.txt", Bytes::from("1"))
+ .await
+ .unwrap();
+ storage
+ .write("memory://dir/file2.txt", Bytes::from("2"))
+ .await
+ .unwrap();
+ storage
+ .write("memory://other/file.txt", Bytes::from("3"))
+ .await
+ .unwrap();
+
+ // Delete prefix
+ storage.delete_prefix("memory://dir").await.unwrap();
+
+ // Files in dir should be deleted
+ assert!(!storage.exists("memory://dir/file1.txt").await.unwrap());
+ assert!(!storage.exists("memory://dir/file2.txt").await.unwrap());
+
+ // File in other dir should still exist
+ assert!(storage.exists("memory://other/file.txt").await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_memory_storage_reader() {
+ let storage = MemoryStorage::new();
+ let path = "memory://test/file.txt";
+ let content = Bytes::from("Hello, World!");
+
+ storage.write(path, content.clone()).await.unwrap();
+
+ let reader = storage.reader(path).await.unwrap();
+ let read_content = reader.read(0..content.len() as u64).await.unwrap();
+ assert_eq!(read_content, content);
+
+ // Test partial read
+ let partial = reader.read(0..5).await.unwrap();
+ assert_eq!(partial, Bytes::from("Hello"));
+ }
+
+ #[tokio::test]
+ async fn test_memory_storage_writer() {
+ let storage = MemoryStorage::new();
+ let path = "memory://test/file.txt";
+
+ let mut writer = storage.writer(path).await.unwrap();
+ writer.write(Bytes::from("Hello, ")).await.unwrap();
+ writer.write(Bytes::from("World!")).await.unwrap();
+ writer.close().await.unwrap();
+
+ let content = storage.read(path).await.unwrap();
+ assert_eq!(content, Bytes::from("Hello, World!"));
+ }
+
+ #[tokio::test]
+ async fn test_memory_file_write_double_close() {
+ let storage = MemoryStorage::new();
+ let path = "memory://test/file.txt";
+
+ let mut writer = storage.writer(path).await.unwrap();
+ writer.write(Bytes::from("test")).await.unwrap();
+ writer.close().await.unwrap();
+
+ // Second close should fail
+ let result = writer.close().await;
+ assert!(result.is_err());
+ }
+
+ #[tokio::test]
+ async fn test_memory_file_write_after_close() {
+ let storage = MemoryStorage::new();
+ let path = "memory://test/file.txt";
+
+ let mut writer = storage.writer(path).await.unwrap();
+ writer.close().await.unwrap();
+
+ // Write after close should fail
+ let result = writer.write(Bytes::from("test")).await;
+ assert!(result.is_err());
+ }
+
+ #[tokio::test]
+ async fn test_memory_file_read_out_of_bounds() {
+ let storage = MemoryStorage::new();
+ let path = "memory://test/file.txt";
+ let content = Bytes::from("Hello");
+
+ storage.write(path, content).await.unwrap();
+
+ let reader = storage.reader(path).await.unwrap();
+ let result = reader.read(0..100).await;
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_memory_storage_serialization() {
+ let storage = MemoryStorage::new();
+
+ // Serialize
+ let serialized = serde_json::to_string(&storage).unwrap();
+
+ // Deserialize
+ let deserialized: MemoryStorage =
serde_json::from_str(&serialized).unwrap();
+
+ // Deserialized storage should be empty (new instance)
+ assert!(deserialized.data.read().unwrap().is_empty());
+ }
+
+ #[test]
+ fn test_memory_storage_factory() {
+ let factory = MemoryStorageFactory;
+ let config = StorageConfig::new();
+ let storage = factory.build(&config).unwrap();
+
+ // Verify we got a valid storage instance
+ assert!(format!("{storage:?}").contains("MemoryStorage"));
+ }
+
+ #[test]
+ fn test_memory_storage_factory_serialization() {
+ let factory = MemoryStorageFactory;
+
+ // Serialize
+ let serialized = serde_json::to_string(&factory).unwrap();
+
+ // Deserialize
+ let deserialized: MemoryStorageFactory =
serde_json::from_str(&serialized).unwrap();
+
+ // Verify the deserialized factory works
+ let config = StorageConfig::new();
+ let storage = deserialized.build(&config).unwrap();
+ assert!(format!("{storage:?}").contains("MemoryStorage"));
+ }
+
+ #[tokio::test]
+ async fn test_path_normalization_consistency() {
+ let storage = MemoryStorage::new();
+ let content = Bytes::from("test content");
+
+ // Write with one format
+ storage
+ .write("memory://path/to/file", content.clone())
+ .await
+ .unwrap();
+
+ // Read with different formats - all should work
+ assert_eq!(
+ storage.read("memory://path/to/file").await.unwrap(),
+ content
+ );
+ assert_eq!(storage.read("memory:/path/to/file").await.unwrap(),
content);
+ assert_eq!(storage.read("/path/to/file").await.unwrap(), content);
+ assert_eq!(storage.read("path/to/file").await.unwrap(), content);
+ }
+}
diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs
index 8c76bcbcc..5f65a15fc 100644
--- a/crates/iceberg/src/io/mod.rs
+++ b/crates/iceberg/src/io/mod.rs
@@ -69,6 +69,7 @@
mod config;
mod file_io;
mod local_fs;
+mod memory;
mod opendal;
mod storage;