This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 79518cf67 Make InMemory object store track last modified time for each 
entry (#3796)
79518cf67 is described below

commit 79518cf67a6dd5fc391e271fd92c0c21ee7e8a74
Author: Alex Huang <[email protected]>
AuthorDate: Sat Mar 4 18:37:06 2023 +0100

    Make InMemory object store track last modified time for each entry (#3796)
    
    * refactor: allow InMemoryUpload to store timestamp
    
    * use new last modified timestamp
---
 object_store/src/memory.rs | 67 +++++++++++++++++++++++++---------------------
 1 file changed, 37 insertions(+), 30 deletions(-)

diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index 40eee55a1..1433701e8 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -20,7 +20,7 @@ use crate::MultipartId;
 use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, 
Result};
 use async_trait::async_trait;
 use bytes::Bytes;
-use chrono::Utc;
+use chrono::{DateTime, Utc};
 use futures::{stream::BoxStream, StreamExt};
 use parking_lot::RwLock;
 use snafu::{ensure, OptionExt, Snafu};
@@ -33,6 +33,9 @@ use std::sync::Arc;
 use std::task::Poll;
 use tokio::io::AsyncWrite;
 
+type Entry = (Bytes, DateTime<Utc>);
+type StorageType = Arc<RwLock<BTreeMap<Path, Entry>>>;
+
 /// A specialized `Error` for in-memory object store-related errors
 #[derive(Debug, Snafu)]
 #[allow(missing_docs)]
@@ -73,7 +76,7 @@ impl From<Error> for super::Error {
 /// storage provider.
 #[derive(Debug, Default)]
 pub struct InMemory {
-    storage: Arc<RwLock<BTreeMap<Path, Bytes>>>,
+    storage: StorageType,
 }
 
 impl std::fmt::Display for InMemory {
@@ -85,7 +88,9 @@ impl std::fmt::Display for InMemory {
 #[async_trait]
 impl ObjectStore for InMemory {
     async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
-        self.storage.write().insert(location.clone(), bytes);
+        self.storage
+            .write()
+            .insert(location.clone(), (bytes, Utc::now()));
         Ok(())
     }
 
@@ -113,19 +118,19 @@ impl ObjectStore for InMemory {
     }
 
     async fn get(&self, location: &Path) -> Result<GetResult> {
-        let data = self.get_bytes(location).await?;
+        let data = self.entry(location).await?;
 
         Ok(GetResult::Stream(
-            futures::stream::once(async move { Ok(data) }).boxed(),
+            futures::stream::once(async move { Ok(data.0) }).boxed(),
         ))
     }
 
     async fn get_range(&self, location: &Path, range: Range<usize>) -> 
Result<Bytes> {
-        let data = self.get_bytes(location).await?;
-        ensure!(range.end <= data.len(), OutOfRangeSnafu);
+        let data = self.entry(location).await?;
+        ensure!(range.end <= data.0.len(), OutOfRangeSnafu);
         ensure!(range.start <= range.end, BadRangeSnafu);
 
-        Ok(data.slice(range))
+        Ok(data.0.slice(range))
     }
 
     async fn get_ranges(
@@ -133,24 +138,23 @@ impl ObjectStore for InMemory {
         location: &Path,
         ranges: &[Range<usize>],
     ) -> Result<Vec<Bytes>> {
-        let data = self.get_bytes(location).await?;
+        let data = self.entry(location).await?;
         ranges
             .iter()
             .map(|range| {
-                ensure!(range.end <= data.len(), OutOfRangeSnafu);
+                ensure!(range.end <= data.0.len(), OutOfRangeSnafu);
                 ensure!(range.start <= range.end, BadRangeSnafu);
-                Ok(data.slice(range.clone()))
+                Ok(data.0.slice(range.clone()))
             })
             .collect()
     }
 
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
-        let last_modified = Utc::now();
-        let bytes = self.get_bytes(location).await?;
+        let entry = self.entry(location).await?;
         Ok(ObjectMeta {
             location: location.clone(),
-            last_modified,
-            size: bytes.len(),
+            last_modified: entry.1,
+            size: entry.0.len(),
         })
     }
 
@@ -165,7 +169,6 @@ impl ObjectStore for InMemory {
     ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
         let root = Path::default();
         let prefix = prefix.unwrap_or(&root);
-        let last_modified = Utc::now();
 
         let storage = self.storage.read();
         let values: Vec<_> = storage
@@ -180,8 +183,8 @@ impl ObjectStore for InMemory {
             .map(|(key, value)| {
                 Ok(ObjectMeta {
                     location: key.clone(),
-                    last_modified,
-                    size: value.len(),
+                    last_modified: value.1,
+                    size: value.0.len(),
                 })
             })
             .collect();
@@ -197,7 +200,6 @@ impl ObjectStore for InMemory {
         let prefix = prefix.unwrap_or(&root);
 
         let mut common_prefixes = BTreeSet::new();
-        let last_modified = Utc::now();
 
         // Only objects in this base level should be returned in the
         // response. Otherwise, we just collect the common prefixes.
@@ -224,8 +226,8 @@ impl ObjectStore for InMemory {
             } else {
                 let object = ObjectMeta {
                     location: k.clone(),
-                    last_modified,
-                    size: v.len(),
+                    last_modified: v.1,
+                    size: v.0.len(),
                 };
                 objects.push(object);
             }
@@ -238,13 +240,15 @@ impl ObjectStore for InMemory {
     }
 
     async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
-        let data = self.get_bytes(from).await?;
-        self.storage.write().insert(to.clone(), data);
+        let data = self.entry(from).await?;
+        self.storage
+            .write()
+            .insert(to.clone(), (data.0, Utc::now()));
         Ok(())
     }
 
     async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
-        let data = self.get_bytes(from).await?;
+        let data = self.entry(from).await?;
         let mut storage = self.storage.write();
         if storage.contains_key(to) {
             return Err(Error::AlreadyExists {
@@ -252,7 +256,7 @@ impl ObjectStore for InMemory {
             }
             .into());
         }
-        storage.insert(to.clone(), data);
+        storage.insert(to.clone(), (data.0, Utc::now()));
         Ok(())
     }
 }
@@ -273,22 +277,23 @@ impl InMemory {
         }
     }
 
-    async fn get_bytes(&self, location: &Path) -> Result<Bytes> {
+    async fn entry(&self, location: &Path) -> Result<(Bytes, DateTime<Utc>)> {
         let storage = self.storage.read();
-        let bytes = storage
+        let value = storage
             .get(location)
             .cloned()
             .context(NoDataInMemorySnafu {
                 path: location.to_string(),
             })?;
-        Ok(bytes)
+
+        Ok(value)
     }
 }
 
 struct InMemoryUpload {
     location: Path,
     data: Vec<u8>,
-    storage: Arc<RwLock<BTreeMap<Path, Bytes>>>,
+    storage: StorageType,
 }
 
 impl AsyncWrite for InMemoryUpload {
@@ -313,7 +318,9 @@ impl AsyncWrite for InMemoryUpload {
         _cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Result<(), io::Error>> {
         let data = Bytes::from(std::mem::take(&mut self.data));
-        self.storage.write().insert(self.location.clone(), data);
+        self.storage
+            .write()
+            .insert(self.location.clone(), (data, Utc::now()));
         Poll::Ready(Ok(()))
     }
 }

Reply via email to