This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 79518cf67 Make InMemory object store track last modified time for each
entry (#3796)
79518cf67 is described below
commit 79518cf67a6dd5fc391e271fd92c0c21ee7e8a74
Author: Alex Huang <[email protected]>
AuthorDate: Sat Mar 4 18:37:06 2023 +0100
Make InMemory object store track last modified time for each entry (#3796)
* refactor: allow InMemoryUpload to store timestamp
* use new last modified timestamp
---
object_store/src/memory.rs | 67 +++++++++++++++++++++++++---------------------
1 file changed, 37 insertions(+), 30 deletions(-)
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index 40eee55a1..1433701e8 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -20,7 +20,7 @@ use crate::MultipartId;
use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore,
Result};
use async_trait::async_trait;
use bytes::Bytes;
-use chrono::Utc;
+use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt};
use parking_lot::RwLock;
use snafu::{ensure, OptionExt, Snafu};
@@ -33,6 +33,9 @@ use std::sync::Arc;
use std::task::Poll;
use tokio::io::AsyncWrite;
+type Entry = (Bytes, DateTime<Utc>);
+type StorageType = Arc<RwLock<BTreeMap<Path, Entry>>>;
+
/// A specialized `Error` for in-memory object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
@@ -73,7 +76,7 @@ impl From<Error> for super::Error {
/// storage provider.
#[derive(Debug, Default)]
pub struct InMemory {
- storage: Arc<RwLock<BTreeMap<Path, Bytes>>>,
+ storage: StorageType,
}
impl std::fmt::Display for InMemory {
@@ -85,7 +88,9 @@ impl std::fmt::Display for InMemory {
#[async_trait]
impl ObjectStore for InMemory {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
- self.storage.write().insert(location.clone(), bytes);
+ self.storage
+ .write()
+ .insert(location.clone(), (bytes, Utc::now()));
Ok(())
}
@@ -113,19 +118,19 @@ impl ObjectStore for InMemory {
}
async fn get(&self, location: &Path) -> Result<GetResult> {
- let data = self.get_bytes(location).await?;
+ let data = self.entry(location).await?;
Ok(GetResult::Stream(
- futures::stream::once(async move { Ok(data) }).boxed(),
+ futures::stream::once(async move { Ok(data.0) }).boxed(),
))
}
async fn get_range(&self, location: &Path, range: Range<usize>) ->
Result<Bytes> {
- let data = self.get_bytes(location).await?;
- ensure!(range.end <= data.len(), OutOfRangeSnafu);
+ let data = self.entry(location).await?;
+ ensure!(range.end <= data.0.len(), OutOfRangeSnafu);
ensure!(range.start <= range.end, BadRangeSnafu);
- Ok(data.slice(range))
+ Ok(data.0.slice(range))
}
async fn get_ranges(
@@ -133,24 +138,23 @@ impl ObjectStore for InMemory {
location: &Path,
ranges: &[Range<usize>],
) -> Result<Vec<Bytes>> {
- let data = self.get_bytes(location).await?;
+ let data = self.entry(location).await?;
ranges
.iter()
.map(|range| {
- ensure!(range.end <= data.len(), OutOfRangeSnafu);
+ ensure!(range.end <= data.0.len(), OutOfRangeSnafu);
ensure!(range.start <= range.end, BadRangeSnafu);
- Ok(data.slice(range.clone()))
+ Ok(data.0.slice(range.clone()))
})
.collect()
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
- let last_modified = Utc::now();
- let bytes = self.get_bytes(location).await?;
+ let entry = self.entry(location).await?;
Ok(ObjectMeta {
location: location.clone(),
- last_modified,
- size: bytes.len(),
+ last_modified: entry.1,
+ size: entry.0.len(),
})
}
@@ -165,7 +169,6 @@ impl ObjectStore for InMemory {
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let root = Path::default();
let prefix = prefix.unwrap_or(&root);
- let last_modified = Utc::now();
let storage = self.storage.read();
let values: Vec<_> = storage
@@ -180,8 +183,8 @@ impl ObjectStore for InMemory {
.map(|(key, value)| {
Ok(ObjectMeta {
location: key.clone(),
- last_modified,
- size: value.len(),
+ last_modified: value.1,
+ size: value.0.len(),
})
})
.collect();
@@ -197,7 +200,6 @@ impl ObjectStore for InMemory {
let prefix = prefix.unwrap_or(&root);
let mut common_prefixes = BTreeSet::new();
- let last_modified = Utc::now();
// Only objects in this base level should be returned in the
// response. Otherwise, we just collect the common prefixes.
@@ -224,8 +226,8 @@ impl ObjectStore for InMemory {
} else {
let object = ObjectMeta {
location: k.clone(),
- last_modified,
- size: v.len(),
+ last_modified: v.1,
+ size: v.0.len(),
};
objects.push(object);
}
@@ -238,13 +240,15 @@ impl ObjectStore for InMemory {
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
- let data = self.get_bytes(from).await?;
- self.storage.write().insert(to.clone(), data);
+ let data = self.entry(from).await?;
+ self.storage
+ .write()
+ .insert(to.clone(), (data.0, Utc::now()));
Ok(())
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
- let data = self.get_bytes(from).await?;
+ let data = self.entry(from).await?;
let mut storage = self.storage.write();
if storage.contains_key(to) {
return Err(Error::AlreadyExists {
@@ -252,7 +256,7 @@ impl ObjectStore for InMemory {
}
.into());
}
- storage.insert(to.clone(), data);
+ storage.insert(to.clone(), (data.0, Utc::now()));
Ok(())
}
}
@@ -273,22 +277,23 @@ impl InMemory {
}
}
- async fn get_bytes(&self, location: &Path) -> Result<Bytes> {
+ async fn entry(&self, location: &Path) -> Result<(Bytes, DateTime<Utc>)> {
let storage = self.storage.read();
- let bytes = storage
+ let value = storage
.get(location)
.cloned()
.context(NoDataInMemorySnafu {
path: location.to_string(),
})?;
- Ok(bytes)
+
+ Ok(value)
}
}
struct InMemoryUpload {
location: Path,
data: Vec<u8>,
- storage: Arc<RwLock<BTreeMap<Path, Bytes>>>,
+ storage: StorageType,
}
impl AsyncWrite for InMemoryUpload {
@@ -313,7 +318,9 @@ impl AsyncWrite for InMemoryUpload {
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
let data = Bytes::from(std::mem::take(&mut self.data));
- self.storage.write().insert(self.location.clone(), data);
+ self.storage
+ .write()
+ .insert(self.location.clone(), (data, Utc::now()));
Poll::Ready(Ok(()))
}
}