martin-g commented on code in PR #21392: URL: https://github.com/apache/datafusion/pull/21392#discussion_r3038603797
########## datafusion-cli/src/object_storage/mmap.rs: ########## @@ -0,0 +1,512 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Memory-mapped [`ObjectStore`] for local files. + +// ObjectStoreError is 72+ bytes, unavoidable when implementing ObjectStore +#![allow(clippy::result_large_err)] +//! +//! [`MmapObjectStore`] maps local files into memory for zero copy reads. +//! Files are mapped on first access and cached to avoid blocking I/O. +//! +//! Writes and metadata operations use [`LocalFileSystem`] and clear the cache + +use std::collections::HashMap; +use std::fmt::{Debug, Display, Formatter}; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::{self, BoxStream, StreamExt}; +use memmap2::Mmap; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +use object_store::{ + Attributes, CopyOptions, Error as ObjectStoreError, GetOptions, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload, PutResult, + Result as ObjectStoreResult, +}; +use parking_lot::RwLock; + +/// An [`ObjectStore`] implementation that memory-maps local files for +/// zero-copy reads. +pub struct MmapObjectStore { + local: LocalFileSystem, + /// Cache of memory mapped files. + cache: Arc<RwLock<HashMap<PathBuf, Bytes>>>, +} + +impl MmapObjectStore { + /// Creates a new `MmapObjectStore` rooted at the filesystem root (`/`) + pub fn new() -> Self { + Self { + local: LocalFileSystem::new(), + cache: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Converts an `object_store::Path` to an absolute filesystem path. + fn to_fs_path(location: &Path) -> PathBuf { + PathBuf::from(format!("/{}", location.as_ref())) Review Comment: The leading `/` is a bit confusing - it won't work on Windows. But it is never used for FS operations. It is just used as a key for the cache. I'd suggest to rename it to `to_key()` or at least add a comment. ########## datafusion-cli/src/object_storage/mmap.rs: ########## @@ -0,0 +1,512 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Memory-mapped [`ObjectStore`] for local files. + +// ObjectStoreError is 72+ bytes, unavoidable when implementing ObjectStore +#![allow(clippy::result_large_err)] +//! +//! [`MmapObjectStore`] maps local files into memory for zero copy reads. +//! Files are mapped on first access and cached to avoid blocking I/O. +//! +//! Writes and metadata operations use [`LocalFileSystem`] and clear the cache + +use std::collections::HashMap; +use std::fmt::{Debug, Display, Formatter}; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::{self, BoxStream, StreamExt}; +use memmap2::Mmap; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +use object_store::{ + Attributes, CopyOptions, Error as ObjectStoreError, GetOptions, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload, PutResult, + Result as ObjectStoreResult, +}; +use parking_lot::RwLock; + +/// An [`ObjectStore`] implementation that memory-maps local files for +/// zero-copy reads. +pub struct MmapObjectStore { + local: LocalFileSystem, + /// Cache of memory mapped files. + cache: Arc<RwLock<HashMap<PathBuf, Bytes>>>, +} + +impl MmapObjectStore { + /// Creates a new `MmapObjectStore` rooted at the filesystem root (`/`) + pub fn new() -> Self { + Self { + local: LocalFileSystem::new(), + cache: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Converts an `object_store::Path` to an absolute filesystem path. + fn to_fs_path(location: &Path) -> PathBuf { + PathBuf::from(format!("/{}", location.as_ref())) + } + + /// Returns the memory-mapped `Bytes` for `location`. + fn get_mmap(&self, location: &Path) -> ObjectStoreResult<Bytes> { + let fs_path = Self::to_fs_path(location); + + { + let cache = self.cache.read(); + if let Some(bytes) = cache.get(&fs_path) { + return Ok(bytes.clone()); + } + } + + let file = std::fs::File::open(&fs_path).map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + ObjectStoreError::NotFound { + path: location.to_string(), + source: Box::new(e), + } + } else { + ObjectStoreError::Generic { + store: "MmapObjectStore", + source: Box::new(e), + } + } + })?; + + let file_len = file.metadata().map(|m| m.len()).unwrap_or(0); + + let bytes = if file_len == 0 { + Bytes::new() + } else { + // SAFETY: In the CLI context, files are static datasets so they won't be truncated + let mmap = unsafe { + Mmap::map(&file).map_err(|e| ObjectStoreError::Generic { + store: "MmapObjectStore", + source: Box::new(e), + })? + }; + Bytes::from_owner(mmap) + }; + + let mut cache = self.cache.write(); + Ok(cache.entry(fs_path).or_insert(bytes).clone()) + } + + /// Evicts cache entry for the location + fn invalidate(&self, location: &Path) { + self.cache.write().remove(&Self::to_fs_path(location)); + } + + /// Validates that range lies within [0, size) + fn check_range( + range: &Range<usize>, + size: usize, + location: &Path, + ) -> ObjectStoreResult<()> { + if range.end > size { + return Err(ObjectStoreError::Generic { + store: "MmapObjectStore", + source: format!( + "requested range {}..{} out of bounds for \ + file '{}' of size {size}", + range.start, range.end, location, + ) + .into(), + }); + } + Ok(()) + } +} + +impl Default for MmapObjectStore { + fn default() -> Self { + Self::new() + } +} + +impl Display for MmapObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "MmapObjectStore") + } +} + +impl Debug for MmapObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MmapObjectStore") + .field("local", &self.local) + .field("cache_entries", &self.cache.read().len()) + .finish() + } +} + +#[async_trait] +impl ObjectStore for MmapObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> ObjectStoreResult<PutResult> { + let result = self.local.put_opts(location, payload, opts).await; + if result.is_ok() { + self.invalidate(location); + } + result + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> ObjectStoreResult<Box<dyn MultipartUpload>> { + self.invalidate(location); Review Comment: Why the cache is invalidated before the local call ? In `put_opts()` it is done only after successful return from `self.local.put_opts()` ########## datafusion-cli/src/object_storage/mmap.rs: ########## @@ -0,0 +1,512 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Memory-mapped [`ObjectStore`] for local files. + +// ObjectStoreError is 72+ bytes, unavoidable when implementing ObjectStore +#![allow(clippy::result_large_err)] +//! +//! [`MmapObjectStore`] maps local files into memory for zero copy reads. +//! Files are mapped on first access and cached to avoid blocking I/O. +//! +//! Writes and metadata operations use [`LocalFileSystem`] and clear the cache + +use std::collections::HashMap; +use std::fmt::{Debug, Display, Formatter}; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::{self, BoxStream, StreamExt}; +use memmap2::Mmap; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +use object_store::{ + Attributes, CopyOptions, Error as ObjectStoreError, GetOptions, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload, PutResult, + Result as ObjectStoreResult, +}; +use parking_lot::RwLock; + +/// An [`ObjectStore`] implementation that memory-maps local files for +/// zero-copy reads. +pub struct MmapObjectStore { + local: LocalFileSystem, + /// Cache of memory mapped files. + cache: Arc<RwLock<HashMap<PathBuf, Bytes>>>, +} + +impl MmapObjectStore { + /// Creates a new `MmapObjectStore` rooted at the filesystem root (`/`) + pub fn new() -> Self { + Self { + local: LocalFileSystem::new(), + cache: Arc::new(RwLock::new(HashMap::new())), Review Comment: There is no upper limit of the cache size. A CLI session that opens several big files will consume memory that won't be evicted. ########## datafusion-cli/src/object_storage/mmap.rs: ########## @@ -0,0 +1,512 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Memory-mapped [`ObjectStore`] for local files. + +// ObjectStoreError is 72+ bytes, unavoidable when implementing ObjectStore +#![allow(clippy::result_large_err)] +//! +//! [`MmapObjectStore`] maps local files into memory for zero copy reads. +//! Files are mapped on first access and cached to avoid blocking I/O. +//! +//! Writes and metadata operations use [`LocalFileSystem`] and clear the cache + +use std::collections::HashMap; +use std::fmt::{Debug, Display, Formatter}; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::{self, BoxStream, StreamExt}; +use memmap2::Mmap; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +use object_store::{ + Attributes, CopyOptions, Error as ObjectStoreError, GetOptions, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload, PutResult, + Result as ObjectStoreResult, +}; +use parking_lot::RwLock; + +/// An [`ObjectStore`] implementation that memory-maps local files for +/// zero-copy reads. +pub struct MmapObjectStore { + local: LocalFileSystem, + /// Cache of memory mapped files. + cache: Arc<RwLock<HashMap<PathBuf, Bytes>>>, +} + +impl MmapObjectStore { + /// Creates a new `MmapObjectStore` rooted at the filesystem root (`/`) + pub fn new() -> Self { + Self { + local: LocalFileSystem::new(), + cache: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Converts an `object_store::Path` to an absolute filesystem path. + fn to_fs_path(location: &Path) -> PathBuf { + PathBuf::from(format!("/{}", location.as_ref())) + } + + /// Returns the memory-mapped `Bytes` for `location`. + fn get_mmap(&self, location: &Path) -> ObjectStoreResult<Bytes> { + let fs_path = Self::to_fs_path(location); + + { + let cache = self.cache.read(); + if let Some(bytes) = cache.get(&fs_path) { + return Ok(bytes.clone()); + } + } + + let file = std::fs::File::open(&fs_path).map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + ObjectStoreError::NotFound { + path: location.to_string(), + source: Box::new(e), + } + } else { + ObjectStoreError::Generic { + store: "MmapObjectStore", + source: Box::new(e), + } + } + })?; + + let file_len = file.metadata().map(|m| m.len()).unwrap_or(0); + + let bytes = if file_len == 0 { + Bytes::new() + } else { + // SAFETY: In the CLI context, files are static datasets so they won't be truncated + let mmap = unsafe { + Mmap::map(&file).map_err(|e| ObjectStoreError::Generic { + store: "MmapObjectStore", + source: Box::new(e), + })? + }; + Bytes::from_owner(mmap) + }; + + let mut cache = self.cache.write(); + Ok(cache.entry(fs_path).or_insert(bytes).clone()) + } + + /// Evicts cache entry for the location + fn invalidate(&self, location: &Path) { + self.cache.write().remove(&Self::to_fs_path(location)); + } + + /// Validates that range lies within [0, size) + fn check_range( + range: &Range<usize>, + size: usize, + location: &Path, + ) -> ObjectStoreResult<()> { + if range.end > size { Review Comment: The range.start is not always `0`. When using [offset](https://github.com/apache/datafusion/pull/21392/changes#diff-5e66236d723d527cb208f8a81a24f18d941618a827e3be39c3e88f75a90a17c5R228) it could also be bigger than the `size` ########## datafusion-cli/src/object_storage/mmap.rs: ########## @@ -0,0 +1,512 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Memory-mapped [`ObjectStore`] for local files. + +// ObjectStoreError is 72+ bytes, unavoidable when implementing ObjectStore +#![allow(clippy::result_large_err)] +//! +//! [`MmapObjectStore`] maps local files into memory for zero copy reads. +//! Files are mapped on first access and cached to avoid blocking I/O. +//! +//! Writes and metadata operations use [`LocalFileSystem`] and clear the cache + +use std::collections::HashMap; +use std::fmt::{Debug, Display, Formatter}; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::{self, BoxStream, StreamExt}; +use memmap2::Mmap; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +use object_store::{ + Attributes, CopyOptions, Error as ObjectStoreError, GetOptions, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload, PutResult, + Result as ObjectStoreResult, +}; +use parking_lot::RwLock; + +/// An [`ObjectStore`] implementation that memory-maps local files for +/// zero-copy reads. +pub struct MmapObjectStore { + local: LocalFileSystem, + /// Cache of memory mapped files. + cache: Arc<RwLock<HashMap<PathBuf, Bytes>>>, +} + +impl MmapObjectStore { + /// Creates a new `MmapObjectStore` rooted at the filesystem root (`/`) + pub fn new() -> Self { + Self { + local: LocalFileSystem::new(), + cache: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Converts an `object_store::Path` to an absolute filesystem path. + fn to_fs_path(location: &Path) -> PathBuf { + PathBuf::from(format!("/{}", location.as_ref())) + } + + /// Returns the memory-mapped `Bytes` for `location`. + fn get_mmap(&self, location: &Path) -> ObjectStoreResult<Bytes> { + let fs_path = Self::to_fs_path(location); + + { + let cache = self.cache.read(); + if let Some(bytes) = cache.get(&fs_path) { + return Ok(bytes.clone()); + } + } + + let file = std::fs::File::open(&fs_path).map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + ObjectStoreError::NotFound { + path: location.to_string(), + source: Box::new(e), + } + } else { + ObjectStoreError::Generic { + store: "MmapObjectStore", + source: Box::new(e), + } + } + })?; + + let file_len = file.metadata().map(|m| m.len()).unwrap_or(0); + + let bytes = if file_len == 0 { + Bytes::new() + } else { + // SAFETY: In the CLI context, files are static datasets so they won't be truncated + let mmap = unsafe { + Mmap::map(&file).map_err(|e| ObjectStoreError::Generic { + store: "MmapObjectStore", + source: Box::new(e), + })? + }; + Bytes::from_owner(mmap) + }; + + let mut cache = self.cache.write(); + Ok(cache.entry(fs_path).or_insert(bytes).clone()) + } + + /// Evicts cache entry for the location + fn invalidate(&self, location: &Path) { + self.cache.write().remove(&Self::to_fs_path(location)); + } + + /// Validates that range lies within [0, size) + fn check_range( + range: &Range<usize>, + size: usize, + location: &Path, + ) -> ObjectStoreResult<()> { + if range.end > size { + return Err(ObjectStoreError::Generic { + store: "MmapObjectStore", + source: format!( + "requested range {}..{} out of bounds for \ + file '{}' of size {size}", + range.start, range.end, location, + ) + .into(), + }); + } + Ok(()) + } +} + +impl Default for MmapObjectStore { + fn default() -> Self { + Self::new() + } +} + +impl Display for MmapObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "MmapObjectStore") + } +} + +impl Debug for MmapObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MmapObjectStore") + .field("local", &self.local) + .field("cache_entries", &self.cache.read().len()) + .finish() + } +} + +#[async_trait] +impl ObjectStore for MmapObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> ObjectStoreResult<PutResult> { + let result = self.local.put_opts(location, payload, opts).await; + if result.is_ok() { + self.invalidate(location); + } + result + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> ObjectStoreResult<Box<dyn MultipartUpload>> { + self.invalidate(location); + self.local.put_multipart_opts(location, opts).await + } + + /// Handles GET requests + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> ObjectStoreResult<GetResult> { + let has_conditionals = options.if_match.is_some() + || options.if_none_match.is_some() + || options.if_modified_since.is_some() + || options.if_unmodified_since.is_some(); + + if has_conditionals || options.head { + return self.local.get_opts(location, options).await; + } + + match &options.range { + None => { + let bytes = self.get_mmap(location)?; + let meta = self.local.head(location).await?; + let size = bytes.len(); + let size_u64 = size as u64; + Ok(GetResult { + payload: GetResultPayload::Stream(Box::pin(stream::once( + async move { Ok(bytes) }, + ))), + meta, + range: 0..size_u64, + attributes: Attributes::default(), + }) + } + Some(get_range) => { + let bytes = self.get_mmap(location)?; + let meta = self.local.head(location).await?; + let size = bytes.len(); + + use object_store::GetRange; + let range: Range<usize> = match get_range { + GetRange::Bounded(r) => { + let start = usize::try_from(r.start).unwrap_or(0); + let end = usize::try_from(r.end).unwrap_or(size); + start..end + } + GetRange::Offset(o) => { + let start = usize::try_from(*o).unwrap_or(0); + start..size + } + GetRange::Suffix(n) => { + let n_usize = usize::try_from(*n).unwrap_or(size); + size.saturating_sub(n_usize)..size + } + }; + + Self::check_range(&range, size, location)?; + let sliced = bytes.slice(range.clone()); + let result_range = (range.start as u64)..(range.end as u64); + + Ok(GetResult { + payload: GetResultPayload::Stream(Box::pin(stream::once( + async move { Ok(sliced) }, + ))), + meta, + range: result_range, + attributes: Attributes::default(), + }) + } + } + } + + /// Returns multiple byte slices + async fn get_ranges( + &self, + location: &Path, + ranges: &[Range<u64>], + ) -> ObjectStoreResult<Vec<Bytes>> { + let bytes = self.get_mmap(location)?; + let size = bytes.len(); + ranges + .iter() + .map(|range| { + let start = usize::try_from(range.start).unwrap_or(0); + let end = usize::try_from(range.end).unwrap_or(size); + let usize_range = start..end; + Self::check_range(&usize_range, size, location)?; + Ok(bytes.slice(usize_range)) + }) + .collect() + } + + fn delete_stream( + &self, + locations: BoxStream<'static, ObjectStoreResult<Path>>, + ) -> BoxStream<'static, ObjectStoreResult<Path>> { + let cache = Arc::clone(&self.cache); + let local_stream = self.local.delete_stream(locations); + + Box::pin(local_stream.map(move |result| { + if let Ok(ref path) = result { + cache.write().remove(&Self::to_fs_path(path)); + } + result + })) + } + + fn list( + &self, + prefix: Option<&Path>, + ) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> { + self.local.list(prefix) + } + + async fn list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> ObjectStoreResult<ListResult> { + self.local.list_with_delimiter(prefix).await + } + + async fn copy_opts( + &self, + from: &Path, + to: &Path, + options: CopyOptions, + ) -> ObjectStoreResult<()> { + let result = self.local.copy_opts(from, to, options).await; + self.invalidate(from); Review Comment: Shouldn't the invalidates be done only on success ? Especially the one for `to` ########## datafusion-cli/src/object_storage/mmap.rs: ########## @@ -0,0 +1,512 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Memory-mapped [`ObjectStore`] for local files. + +// ObjectStoreError is 72+ bytes, unavoidable when implementing ObjectStore +#![allow(clippy::result_large_err)] +//! +//! [`MmapObjectStore`] maps local files into memory for zero copy reads. +//! Files are mapped on first access and cached to avoid blocking I/O. +//! +//! Writes and metadata operations use [`LocalFileSystem`] and clear the cache + +use std::collections::HashMap; +use std::fmt::{Debug, Display, Formatter}; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::{self, BoxStream, StreamExt}; +use memmap2::Mmap; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +use object_store::{ + Attributes, CopyOptions, Error as ObjectStoreError, GetOptions, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload, PutResult, + Result as ObjectStoreResult, +}; +use parking_lot::RwLock; + +/// An [`ObjectStore`] implementation that memory-maps local files for +/// zero-copy reads. +pub struct MmapObjectStore { + local: LocalFileSystem, + /// Cache of memory mapped files. + cache: Arc<RwLock<HashMap<PathBuf, Bytes>>>, +} + +impl MmapObjectStore { + /// Creates a new `MmapObjectStore` rooted at the filesystem root (`/`) + pub fn new() -> Self { + Self { + local: LocalFileSystem::new(), + cache: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Converts an `object_store::Path` to an absolute filesystem path. + fn to_fs_path(location: &Path) -> PathBuf { + PathBuf::from(format!("/{}", location.as_ref())) + } + + /// Returns the memory-mapped `Bytes` for `location`. + fn get_mmap(&self, location: &Path) -> ObjectStoreResult<Bytes> { + let fs_path = Self::to_fs_path(location); + + { + let cache = self.cache.read(); + if let Some(bytes) = cache.get(&fs_path) { + return Ok(bytes.clone()); + } + } + + let file = std::fs::File::open(&fs_path).map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + ObjectStoreError::NotFound { + path: location.to_string(), + source: Box::new(e), + } + } else { + ObjectStoreError::Generic { + store: "MmapObjectStore", + source: Box::new(e), + } + } + })?; + + let file_len = file.metadata().map(|m| m.len()).unwrap_or(0); + + let bytes = if file_len == 0 { + Bytes::new() + } else { + // SAFETY: In the CLI context, files are static datasets so they won't be truncated + let mmap = unsafe { + Mmap::map(&file).map_err(|e| ObjectStoreError::Generic { + store: "MmapObjectStore", + source: Box::new(e), + })? + }; + Bytes::from_owner(mmap) + }; + + let mut cache = self.cache.write(); + Ok(cache.entry(fs_path).or_insert(bytes).clone()) + } + + /// Evicts cache entry for the location + fn invalidate(&self, location: &Path) { + self.cache.write().remove(&Self::to_fs_path(location)); + } + + /// Validates that range lies within [0, size) + fn check_range( + range: &Range<usize>, + size: usize, + location: &Path, + ) -> ObjectStoreResult<()> { + if range.end > size { + return Err(ObjectStoreError::Generic { + store: "MmapObjectStore", + source: format!( + "requested range {}..{} out of bounds for \ + file '{}' of size {size}", + range.start, range.end, location, + ) + .into(), + }); + } + Ok(()) + } +} + +impl Default for MmapObjectStore { + fn default() -> Self { + Self::new() + } +} + +impl Display for MmapObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "MmapObjectStore") + } +} + +impl Debug for MmapObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MmapObjectStore") + .field("local", &self.local) + .field("cache_entries", &self.cache.read().len()) + .finish() + } +} + +#[async_trait] +impl ObjectStore for MmapObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> ObjectStoreResult<PutResult> { + let result = self.local.put_opts(location, payload, opts).await; + if result.is_ok() { + self.invalidate(location); + } + result + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> ObjectStoreResult<Box<dyn MultipartUpload>> { + self.invalidate(location); + self.local.put_multipart_opts(location, opts).await + } + + /// Handles GET requests + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> ObjectStoreResult<GetResult> { + let has_conditionals = options.if_match.is_some() + || options.if_none_match.is_some() + || options.if_modified_since.is_some() + || options.if_unmodified_since.is_some(); + + if has_conditionals || options.head { + return self.local.get_opts(location, options).await; + } + + match &options.range { + None => { + let bytes = self.get_mmap(location)?; + let meta = self.local.head(location).await?; + let size = bytes.len(); + let size_u64 = size as u64; + Ok(GetResult { + payload: GetResultPayload::Stream(Box::pin(stream::once( + async move { Ok(bytes) }, + ))), + meta, + range: 0..size_u64, + attributes: Attributes::default(), + }) + } + Some(get_range) => { + let bytes = self.get_mmap(location)?; + let meta = self.local.head(location).await?; + let size = bytes.len(); + + use object_store::GetRange; + let range: Range<usize> = match get_range { + GetRange::Bounded(r) => { + let start = usize::try_from(r.start).unwrap_or(0); + let end = usize::try_from(r.end).unwrap_or(size); Review Comment: If `r.end` is bigger than usize::MAX then you truncate it to `size`. But if `end` is smaller than usize::MAX but bigger than `size` then an error is raised at `check_range()`. This is inconsistent. ########## datafusion-cli/src/object_storage/mmap.rs: ########## @@ -0,0 +1,512 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Memory-mapped [`ObjectStore`] for local files. + +// ObjectStoreError is 72+ bytes, unavoidable when implementing ObjectStore +#![allow(clippy::result_large_err)] +//! +//! [`MmapObjectStore`] maps local files into memory for zero copy reads. +//! Files are mapped on first access and cached to avoid blocking I/O. +//! +//! Writes and metadata operations use [`LocalFileSystem`] and clear the cache + +use std::collections::HashMap; +use std::fmt::{Debug, Display, Formatter}; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::{self, BoxStream, StreamExt}; +use memmap2::Mmap; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +use object_store::{ + Attributes, CopyOptions, Error as ObjectStoreError, GetOptions, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload, PutResult, + Result as ObjectStoreResult, +}; +use parking_lot::RwLock; + +/// An [`ObjectStore`] implementation that memory-maps local files for +/// zero-copy reads. +pub struct MmapObjectStore { + local: LocalFileSystem, + /// Cache of memory mapped files. + cache: Arc<RwLock<HashMap<PathBuf, Bytes>>>, +} + +impl MmapObjectStore { + /// Creates a new `MmapObjectStore` rooted at the filesystem root (`/`) + pub fn new() -> Self { + Self { + local: LocalFileSystem::new(), + cache: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Converts an `object_store::Path` to an absolute filesystem path. + fn to_fs_path(location: &Path) -> PathBuf { + PathBuf::from(format!("/{}", location.as_ref())) + } + + /// Returns the memory-mapped `Bytes` for `location`. + fn get_mmap(&self, location: &Path) -> ObjectStoreResult<Bytes> { + let fs_path = Self::to_fs_path(location); + + { + let cache = self.cache.read(); + if let Some(bytes) = cache.get(&fs_path) { + return Ok(bytes.clone()); + } + } + + let file = std::fs::File::open(&fs_path).map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + ObjectStoreError::NotFound { + path: location.to_string(), + source: Box::new(e), + } + } else { + ObjectStoreError::Generic { + store: "MmapObjectStore", + source: Box::new(e), + } + } + })?; + + let file_len = file.metadata().map(|m| m.len()).unwrap_or(0); + + let bytes = if file_len == 0 { + Bytes::new() + } else { + // SAFETY: In the CLI context, files are static datasets so they won't be truncated + let mmap = unsafe { + Mmap::map(&file).map_err(|e| ObjectStoreError::Generic { + store: "MmapObjectStore", + source: Box::new(e), + })? + }; + Bytes::from_owner(mmap) + }; + + let mut cache = self.cache.write(); + Ok(cache.entry(fs_path).or_insert(bytes).clone()) + } + + /// Evicts cache entry for the location + fn invalidate(&self, location: &Path) { + self.cache.write().remove(&Self::to_fs_path(location)); + } + + /// Validates that range lies within [0, size) + fn check_range( + range: &Range<usize>, + size: usize, + location: &Path, + ) -> ObjectStoreResult<()> { + if range.end > size { + return Err(ObjectStoreError::Generic { + store: "MmapObjectStore", + source: format!( + "requested range {}..{} out of bounds for \ + file '{}' of size {size}", + range.start, range.end, location, + ) + .into(), + }); + } + Ok(()) + } +} + +impl Default for MmapObjectStore { + fn default() -> Self { + Self::new() + } +} + +impl Display for MmapObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "MmapObjectStore") + } +} + +impl Debug for MmapObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MmapObjectStore") + .field("local", &self.local) + .field("cache_entries", &self.cache.read().len()) + .finish() + } +} + +#[async_trait] +impl ObjectStore for MmapObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> ObjectStoreResult<PutResult> { + let result = self.local.put_opts(location, payload, opts).await; + if result.is_ok() { + self.invalidate(location); + } + result + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> ObjectStoreResult<Box<dyn MultipartUpload>> { + self.invalidate(location); + self.local.put_multipart_opts(location, opts).await + } + + /// Handles GET requests + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> ObjectStoreResult<GetResult> { + let has_conditionals = options.if_match.is_some() + || options.if_none_match.is_some() + || options.if_modified_since.is_some() + || options.if_unmodified_since.is_some(); + + if has_conditionals || options.head { + return self.local.get_opts(location, options).await; + } + + match &options.range { + None => { + let bytes = self.get_mmap(location)?; + let meta = self.local.head(location).await?; Review Comment: Would be a good idea to add the `meta` to the cache ? This would avoid the `stat` syscall when the bytes are read from the cache. ########## datafusion-cli/src/object_storage/mmap.rs: ########## @@ -0,0 +1,512 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Memory-mapped [`ObjectStore`] for local files. + +// ObjectStoreError is 72+ bytes, unavoidable when implementing ObjectStore +#![allow(clippy::result_large_err)] +//! +//! [`MmapObjectStore`] maps local files into memory for zero copy reads. +//! Files are mapped on first access and cached to avoid blocking I/O. +//! +//! Writes and metadata operations use [`LocalFileSystem`] and clear the cache + +use std::collections::HashMap; +use std::fmt::{Debug, Display, Formatter}; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::{self, BoxStream, StreamExt}; +use memmap2::Mmap; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +use object_store::{ + Attributes, CopyOptions, Error as ObjectStoreError, GetOptions, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload, PutResult, + Result as ObjectStoreResult, +}; +use parking_lot::RwLock; + +/// An [`ObjectStore`] implementation that memory-maps local files for +/// zero-copy reads. +pub struct MmapObjectStore { + local: LocalFileSystem, + /// Cache of memory mapped files. + cache: Arc<RwLock<HashMap<PathBuf, Bytes>>>, +} + +impl MmapObjectStore { + /// Creates a new `MmapObjectStore` rooted at the filesystem root (`/`) + pub fn new() -> Self { + Self { + local: LocalFileSystem::new(), + cache: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Converts an `object_store::Path` to an absolute filesystem path. + fn to_fs_path(location: &Path) -> PathBuf { + PathBuf::from(format!("/{}", location.as_ref())) + } + + /// Returns the memory-mapped `Bytes` for `location`. + fn get_mmap(&self, location: &Path) -> ObjectStoreResult<Bytes> { + let fs_path = Self::to_fs_path(location); + + { + let cache = self.cache.read(); + if let Some(bytes) = cache.get(&fs_path) { + return Ok(bytes.clone()); + } + } + + let file = std::fs::File::open(&fs_path).map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + ObjectStoreError::NotFound { + path: location.to_string(), + source: Box::new(e), + } + } else { + ObjectStoreError::Generic { + store: "MmapObjectStore", + source: Box::new(e), + } + } + })?; + + let file_len = file.metadata().map(|m| m.len()).unwrap_or(0); + + let bytes = if file_len == 0 { + Bytes::new() + } else { + // SAFETY: In the CLI context, files are static datasets so they won't be truncated + let mmap = unsafe { + Mmap::map(&file).map_err(|e| ObjectStoreError::Generic { + store: "MmapObjectStore", + source: Box::new(e), + })? + }; + Bytes::from_owner(mmap) + }; + + let mut cache = self.cache.write(); + Ok(cache.entry(fs_path).or_insert(bytes).clone()) + } + + /// Evicts cache entry for the location + fn invalidate(&self, location: &Path) { + self.cache.write().remove(&Self::to_fs_path(location)); + } + + /// Validates that range lies within [0, size) + fn check_range( + range: &Range<usize>, + size: usize, + location: &Path, + ) -> ObjectStoreResult<()> { + if range.end > size { + return Err(ObjectStoreError::Generic { + store: "MmapObjectStore", + source: format!( + "requested range {}..{} out of bounds for \ + file '{}' of size {size}", + range.start, range.end, location, + ) + .into(), + }); + } + Ok(()) + } +} + +impl Default for MmapObjectStore { + fn default() -> Self { + Self::new() + } +} + +impl Display for MmapObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "MmapObjectStore") + } +} + +impl Debug for MmapObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MmapObjectStore") + .field("local", &self.local) + .field("cache_entries", &self.cache.read().len()) + .finish() + } +} + +#[async_trait] +impl ObjectStore for MmapObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> ObjectStoreResult<PutResult> { + let result = self.local.put_opts(location, payload, opts).await; + if result.is_ok() { + self.invalidate(location); + } + result + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> ObjectStoreResult<Box<dyn MultipartUpload>> { + self.invalidate(location); + self.local.put_multipart_opts(location, opts).await + } + + /// Handles GET requests + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> ObjectStoreResult<GetResult> { + let has_conditionals = options.if_match.is_some() + || options.if_none_match.is_some() + || options.if_modified_since.is_some() + || options.if_unmodified_since.is_some(); + + if has_conditionals || options.head { + return self.local.get_opts(location, options).await; + } + + match &options.range { + None => { + let bytes = self.get_mmap(location)?; + let meta = self.local.head(location).await?; + let size = bytes.len(); + let size_u64 = size as u64; + Ok(GetResult { + payload: GetResultPayload::Stream(Box::pin(stream::once( + async move { Ok(bytes) }, + ))), + meta, + range: 0..size_u64, + attributes: Attributes::default(), + }) + } + Some(get_range) => { + let bytes = self.get_mmap(location)?; + let meta = self.local.head(location).await?; + let size = bytes.len(); + + use object_store::GetRange; + let range: Range<usize> = match get_range { + GetRange::Bounded(r) => { + let start = usize::try_from(r.start).unwrap_or(0); + let end = usize::try_from(r.end).unwrap_or(size); + start..end + } + GetRange::Offset(o) => { + let start = usize::try_from(*o).unwrap_or(0); + start..size + } + GetRange::Suffix(n) => { + let n_usize = usize::try_from(*n).unwrap_or(size); + size.saturating_sub(n_usize)..size + } + }; + + Self::check_range(&range, size, location)?; + let sliced = bytes.slice(range.clone()); + let result_range = (range.start as u64)..(range.end as u64); + + Ok(GetResult { + payload: GetResultPayload::Stream(Box::pin(stream::once( + async move { Ok(sliced) }, + ))), + meta, + range: result_range, + attributes: Attributes::default(), + }) + } + } + } + + /// Returns multiple byte slices + async fn get_ranges( + &self, + location: &Path, + ranges: &[Range<u64>], + ) -> ObjectStoreResult<Vec<Bytes>> { + let bytes = self.get_mmap(location)?; Review Comment: Same here. `spawn_blocking()` ?! ########## datafusion-cli/src/object_storage/mmap.rs: ########## @@ -0,0 +1,512 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Memory-mapped [`ObjectStore`] for local files. + +// ObjectStoreError is 72+ bytes, unavoidable when implementing ObjectStore +#![allow(clippy::result_large_err)] +//! +//! [`MmapObjectStore`] maps local files into memory for zero copy reads. +//! Files are mapped on first access and cached to avoid blocking I/O. +//! +//! Writes and metadata operations use [`LocalFileSystem`] and clear the cache + +use std::collections::HashMap; +use std::fmt::{Debug, Display, Formatter}; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::{self, BoxStream, StreamExt}; +use memmap2::Mmap; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +use object_store::{ + Attributes, CopyOptions, Error as ObjectStoreError, GetOptions, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload, PutResult, + Result as ObjectStoreResult, +}; +use parking_lot::RwLock; + +/// An [`ObjectStore`] implementation that memory-maps local files for +/// zero-copy reads. +pub struct MmapObjectStore { + local: LocalFileSystem, + /// Cache of memory mapped files. + cache: Arc<RwLock<HashMap<PathBuf, Bytes>>>, +} + +impl MmapObjectStore { + /// Creates a new `MmapObjectStore` rooted at the filesystem root (`/`) + pub fn new() -> Self { + Self { + local: LocalFileSystem::new(), + cache: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Converts an `object_store::Path` to an absolute filesystem path. + fn to_fs_path(location: &Path) -> PathBuf { + PathBuf::from(format!("/{}", location.as_ref())) + } + + /// Returns the memory-mapped `Bytes` for `location`. + fn get_mmap(&self, location: &Path) -> ObjectStoreResult<Bytes> { + let fs_path = Self::to_fs_path(location); + + { + let cache = self.cache.read(); + if let Some(bytes) = cache.get(&fs_path) { + return Ok(bytes.clone()); + } + } + + let file = std::fs::File::open(&fs_path).map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + ObjectStoreError::NotFound { + path: location.to_string(), + source: Box::new(e), + } + } else { + ObjectStoreError::Generic { + store: "MmapObjectStore", + source: Box::new(e), + } + } + })?; + + let file_len = file.metadata().map(|m| m.len()).unwrap_or(0); + + let bytes = if file_len == 0 { + Bytes::new() + } else { + // SAFETY: In the CLI context, files are static datasets so they won't be truncated + let mmap = unsafe { + Mmap::map(&file).map_err(|e| ObjectStoreError::Generic { + store: "MmapObjectStore", + source: Box::new(e), + })? + }; + Bytes::from_owner(mmap) + }; + + let mut cache = self.cache.write(); + Ok(cache.entry(fs_path).or_insert(bytes).clone()) + } + + /// Evicts cache entry for the location + fn invalidate(&self, location: &Path) { + self.cache.write().remove(&Self::to_fs_path(location)); + } + + /// Validates that range lies within [0, size) + fn check_range( + range: &Range<usize>, + size: usize, + location: &Path, + ) -> ObjectStoreResult<()> { + if range.end > size { + return Err(ObjectStoreError::Generic { + store: "MmapObjectStore", + source: format!( + "requested range {}..{} out of bounds for \ + file '{}' of size {size}", + range.start, range.end, location, + ) + .into(), + }); + } + Ok(()) + } +} + +impl Default for MmapObjectStore { + fn default() -> Self { + Self::new() + } +} + +impl Display for MmapObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "MmapObjectStore") + } +} + +impl Debug for MmapObjectStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MmapObjectStore") + .field("local", &self.local) + .field("cache_entries", &self.cache.read().len()) + .finish() + } +} + +#[async_trait] +impl ObjectStore for MmapObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> ObjectStoreResult<PutResult> { + let result = self.local.put_opts(location, payload, opts).await; + if result.is_ok() { + self.invalidate(location); + } + result + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> ObjectStoreResult<Box<dyn MultipartUpload>> { + self.invalidate(location); + self.local.put_multipart_opts(location, opts).await + } + + /// Handles GET requests + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> ObjectStoreResult<GetResult> { + let has_conditionals = options.if_match.is_some() + || options.if_none_match.is_some() + || options.if_modified_since.is_some() + || options.if_unmodified_since.is_some(); + + if has_conditionals || options.head { + return self.local.get_opts(location, options).await; + } + + match &options.range { + None => { + let bytes = self.get_mmap(location)?; + let meta = self.local.head(location).await?; + let size = bytes.len(); + let size_u64 = size as u64; + Ok(GetResult { + payload: GetResultPayload::Stream(Box::pin(stream::once( + async move { Ok(bytes) }, + ))), + meta, + range: 0..size_u64, + attributes: Attributes::default(), + }) + } + Some(get_range) => { + let bytes = self.get_mmap(location)?; Review Comment: `get_mmap()` uses std::io/blocking operations. Consider using tokio::task::spawn_blocking() ########## datafusion-cli/src/object_storage/mmap.rs: ########## @@ -0,0 +1,512 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Memory-mapped [`ObjectStore`] for local files. + +// ObjectStoreError is 72+ bytes, unavoidable when implementing ObjectStore +#![allow(clippy::result_large_err)] +//! +//! [`MmapObjectStore`] maps local files into memory for zero copy reads. +//! Files are mapped on first access and cached to avoid blocking I/O. +//! +//! Writes and metadata operations use [`LocalFileSystem`] and clear the cache + +use std::collections::HashMap; +use std::fmt::{Debug, Display, Formatter}; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::{self, BoxStream, StreamExt}; +use memmap2::Mmap; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +use object_store::{ + Attributes, CopyOptions, Error as ObjectStoreError, GetOptions, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload, PutResult, + Result as ObjectStoreResult, +}; +use parking_lot::RwLock; + +/// An [`ObjectStore`] implementation that memory-maps local files for +/// zero-copy reads. +pub struct MmapObjectStore { + local: LocalFileSystem, + /// Cache of memory mapped files. + cache: Arc<RwLock<HashMap<PathBuf, Bytes>>>, +} + +impl MmapObjectStore { + /// Creates a new `MmapObjectStore` rooted at the filesystem root (`/`) + pub fn new() -> Self { + Self { + local: LocalFileSystem::new(), + cache: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Converts an `object_store::Path` to an absolute filesystem path. + fn to_fs_path(location: &Path) -> PathBuf { + PathBuf::from(format!("/{}", location.as_ref())) + } + + /// Returns the memory-mapped `Bytes` for `location`. + fn get_mmap(&self, location: &Path) -> ObjectStoreResult<Bytes> { + let fs_path = Self::to_fs_path(location); + + { + let cache = self.cache.read(); + if let Some(bytes) = cache.get(&fs_path) { + return Ok(bytes.clone()); + } + } + + let file = std::fs::File::open(&fs_path).map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + ObjectStoreError::NotFound { + path: location.to_string(), + source: Box::new(e), + } + } else { + ObjectStoreError::Generic { + store: "MmapObjectStore", + source: Box::new(e), + } + } + })?; + + let file_len = file.metadata().map(|m| m.len()).unwrap_or(0); + + let bytes = if file_len == 0 { + Bytes::new() + } else { + // SAFETY: In the CLI context, files are static datasets so they won't be truncated + let mmap = unsafe { + Mmap::map(&file).map_err(|e| ObjectStoreError::Generic { + store: "MmapObjectStore", + source: Box::new(e), + })? + }; + Bytes::from_owner(mmap) + }; + + let mut cache = self.cache.write(); + Ok(cache.entry(fs_path).or_insert(bytes).clone()) + } + + /// Evicts cache entry for the location + fn invalidate(&self, location: &Path) { + self.cache.write().remove(&Self::to_fs_path(location)); + } + + /// Validates that range lies within [0, size) + fn check_range( + range: &Range<usize>, + size: usize, + location: &Path, + ) -> ObjectStoreResult<()> { + if range.end > size { Review Comment: In addition with `Bounded(10..5)` it would lead to `start > end`. A check should be added for this too. ########## datafusion-cli/src/object_storage/mmap.rs: ########## @@ -0,0 +1,512 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Memory-mapped [`ObjectStore`] for local files. + +// ObjectStoreError is 72+ bytes, unavoidable when implementing ObjectStore +#![allow(clippy::result_large_err)] +//! +//! [`MmapObjectStore`] maps local files into memory for zero copy reads. +//! Files are mapped on first access and cached to avoid blocking I/O. +//! +//! Writes and metadata operations use [`LocalFileSystem`] and clear the cache + +use std::collections::HashMap; +use std::fmt::{Debug, Display, Formatter}; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::{self, BoxStream, StreamExt}; +use memmap2::Mmap; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +use object_store::{ + Attributes, CopyOptions, Error as ObjectStoreError, GetOptions, GetResult, + GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload, PutResult, + Result as ObjectStoreResult, +}; +use parking_lot::RwLock; + +/// An [`ObjectStore`] implementation that memory-maps local files for +/// zero-copy reads. +pub struct MmapObjectStore { + local: LocalFileSystem, + /// Cache of memory mapped files. + cache: Arc<RwLock<HashMap<PathBuf, Bytes>>>, +} + +impl MmapObjectStore { + /// Creates a new `MmapObjectStore` rooted at the filesystem root (`/`) + pub fn new() -> Self { + Self { + local: LocalFileSystem::new(), + cache: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Converts an `object_store::Path` to an absolute filesystem path. + fn to_fs_path(location: &Path) -> PathBuf { + PathBuf::from(format!("/{}", location.as_ref())) + } + + /// Returns the memory-mapped `Bytes` for `location`. + fn get_mmap(&self, location: &Path) -> ObjectStoreResult<Bytes> { + let fs_path = Self::to_fs_path(location); + + { + let cache = self.cache.read(); + if let Some(bytes) = cache.get(&fs_path) { + return Ok(bytes.clone()); + } + } + + let file = std::fs::File::open(&fs_path).map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + ObjectStoreError::NotFound { + path: location.to_string(), + source: Box::new(e), + } + } else { + ObjectStoreError::Generic { + store: "MmapObjectStore", + source: Box::new(e), + } + } + })?; + + let file_len = file.metadata().map(|m| m.len()).unwrap_or(0); + + let bytes = if file_len == 0 { + Bytes::new() + } else { + // SAFETY: In the CLI context, files are static datasets so they won't be truncated Review Comment: What if the files are edited by an external process ? Unless the file is locked it could be truncated by external processes and this will lead to SIGBUS error when trying to use the mmap-ed view. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
