This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 225ea9327 feat(object_store): add PrefixObjectStore (#3329)
225ea9327 is described below
commit 225ea9327cc5b18d7a3b88a38362c8c5893a24b7
Author: Robert Pack <[email protected]>
AuthorDate: Mon Dec 12 16:35:03 2022 +0100
feat(object_store): add PrefixObjectStore (#3329)
* feat(object_store): add PrefixObjectStore
* Apply suggestions from code review
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
* chore: PR comments
* refactor: infallible full_path
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
---
object_store/src/lib.rs | 1 +
object_store/src/prefix.rs | 281 +++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 282 insertions(+)
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index ec41f3812..0cd56612e 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -170,6 +170,7 @@ pub mod limit;
pub mod local;
pub mod memory;
pub mod path;
+pub mod prefix;
pub mod throttle;
#[cfg(any(feature = "gcp", feature = "aws", feature = "azure"))]
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
new file mode 100644
index 000000000..d61fc2227
--- /dev/null
+++ b/object_store/src/prefix.rs
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store wrapper handling a constant path prefix
+use bytes::Bytes;
+use futures::{stream::BoxStream, StreamExt, TryStreamExt};
+use std::ops::Range;
+use tokio::io::AsyncWrite;
+
+use crate::path::Path;
+use crate::{
+ GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
+ Result as ObjectStoreResult,
+};
+
+/// Store wrapper that applies a constant prefix to all paths handled by the
store.
+#[derive(Debug, Clone)]
+pub struct PrefixObjectStore<T: ObjectStore> {
+ prefix: Path,
+ inner: T,
+}
+
+impl<T: ObjectStore> std::fmt::Display for PrefixObjectStore<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "PrefixObjectStore({})", self.prefix.as_ref())
+ }
+}
+
+impl<T: ObjectStore> PrefixObjectStore<T> {
+ /// Create a new instance of [`PrefixObjectStore`]
+ pub fn new(store: T, prefix: impl Into<Path>) -> Self {
+ Self {
+ prefix: prefix.into(),
+ inner: store,
+ }
+ }
+
+ /// Create the full path from a path relative to prefix
+ fn full_path(&self, location: &Path) -> Path {
+ self.prefix.parts().chain(location.parts()).collect()
+ }
+
+ /// Strip the constant prefix from a given path
+ fn strip_prefix(&self, path: &Path) -> Option<Path> {
+ Some(path.prefix_match(&self.prefix)?.collect())
+ }
+}
+
+#[async_trait::async_trait]
+impl<T: ObjectStore> ObjectStore for PrefixObjectStore<T> {
+ /// Save the provided bytes to the specified location.
+ async fn put(&self, location: &Path, bytes: Bytes) ->
ObjectStoreResult<()> {
+ let full_path = self.full_path(location);
+ self.inner.put(&full_path, bytes).await
+ }
+
+ /// Return the bytes that are stored at the specified location.
+ async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
+ let full_path = self.full_path(location);
+ self.inner.get(&full_path).await
+ }
+
+ /// Return the bytes that are stored at the specified location
+ /// in the given byte range
+ async fn get_range(
+ &self,
+ location: &Path,
+ range: Range<usize>,
+ ) -> ObjectStoreResult<Bytes> {
+ let full_path = self.full_path(location);
+ self.inner.get_range(&full_path, range).await
+ }
+
+ /// Return the metadata for the specified location
+ async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
+ let full_path = self.full_path(location);
+ self.inner.head(&full_path).await.map(|meta| ObjectMeta {
+ last_modified: meta.last_modified,
+ size: meta.size,
+ location:
self.strip_prefix(&meta.location).unwrap_or(meta.location),
+ })
+ }
+
+ /// Delete the object at the specified location.
+ async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
+ let full_path = self.full_path(location);
+ self.inner.delete(&full_path).await
+ }
+
+ /// List all the objects with the given prefix.
+ ///
+ /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a
prefix of `foo/bar/x` but not of
+ /// `foo/bar_baz/x`.
+ async fn list(
+ &self,
+ prefix: Option<&Path>,
+ ) -> ObjectStoreResult<BoxStream<'_, ObjectStoreResult<ObjectMeta>>> {
+ Ok(self
+ .inner
+ .list(Some(&self.full_path(prefix.unwrap_or(&Path::from("/")))))
+ .await?
+ .map_ok(|meta| ObjectMeta {
+ last_modified: meta.last_modified,
+ size: meta.size,
+ location:
self.strip_prefix(&meta.location).unwrap_or(meta.location),
+ })
+ .boxed())
+ }
+
+ /// List objects with the given prefix and an implementation specific
+ /// delimiter. Returns common prefixes (directories) in addition to object
+ /// metadata.
+ ///
+ /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a
prefix of `foo/bar/x` but not of
+ /// `foo/bar_baz/x`.
+ async fn list_with_delimiter(
+ &self,
+ prefix: Option<&Path>,
+ ) -> ObjectStoreResult<ListResult> {
+ self.inner
+ .list_with_delimiter(Some(
+ &self.full_path(prefix.unwrap_or(&Path::from("/"))),
+ ))
+ .await
+ .map(|lst| ListResult {
+ common_prefixes: lst
+ .common_prefixes
+ .iter()
+ .filter_map(|p| self.strip_prefix(p))
+ .collect(),
+ objects: lst
+ .objects
+ .iter()
+ .filter_map(|meta| {
+ Some(ObjectMeta {
+ last_modified: meta.last_modified,
+ size: meta.size,
+ location: self.strip_prefix(&meta.location)?,
+ })
+ })
+ .collect(),
+ })
+ }
+
+ /// Copy an object from one path to another in the same object store.
+ ///
+ /// If there exists an object at the destination, it will be overwritten.
+ async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
+ let full_from = self.full_path(from);
+ let full_to = self.full_path(to);
+ self.inner.copy(&full_from, &full_to).await
+ }
+
+ /// Copy an object from one path to another, only if destination is empty.
+ ///
+ /// Will return an error if the destination already has an object.
+ async fn copy_if_not_exists(&self, from: &Path, to: &Path) ->
ObjectStoreResult<()> {
+ let full_from = self.full_path(from);
+ let full_to = self.full_path(to);
+ self.inner.copy_if_not_exists(&full_from, &full_to).await
+ }
+
+ /// Move an object from one path to another in the same object store.
+ ///
+ /// Will return an error if the destination already has an object.
+ async fn rename_if_not_exists(
+ &self,
+ from: &Path,
+ to: &Path,
+ ) -> ObjectStoreResult<()> {
+ let full_from = self.full_path(from);
+ let full_to = self.full_path(to);
+ self.inner.rename_if_not_exists(&full_from, &full_to).await
+ }
+
+ async fn put_multipart(
+ &self,
+ location: &Path,
+ ) -> ObjectStoreResult<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+ let full_path = self.full_path(location);
+ self.inner.put_multipart(&full_path).await
+ }
+
+ async fn abort_multipart(
+ &self,
+ location: &Path,
+ multipart_id: &MultipartId,
+ ) -> ObjectStoreResult<()> {
+ let full_path = self.full_path(location);
+ self.inner.abort_multipart(&full_path, multipart_id).await
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::local::LocalFileSystem;
+ use crate::test_util::flatten_list_stream;
+ use crate::tests::{
+ copy_if_not_exists, list_uses_directories_correctly,
list_with_delimiter,
+ put_get_delete_list, rename_and_copy, stream_get,
+ };
+
+ use tempfile::TempDir;
+
+ #[tokio::test]
+ async fn prefix_test() {
+ let root = TempDir::new().unwrap();
+ let inner = LocalFileSystem::new_with_prefix(root.path()).unwrap();
+ let integration = PrefixObjectStore::new(inner, "prefix");
+
+ put_get_delete_list(&integration).await;
+ list_uses_directories_correctly(&integration).await;
+ list_with_delimiter(&integration).await;
+ rename_and_copy(&integration).await;
+ copy_if_not_exists(&integration).await;
+ stream_get(&integration).await;
+ }
+
+ #[tokio::test]
+ async fn prefix_test_applies_prefix() {
+ let tmpdir = TempDir::new().unwrap();
+ let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
+
+ let location = Path::from("prefix/test_file.json");
+ let data = Bytes::from("arbitrary data");
+ let expected_data = data.clone();
+
+ local.put(&location, data).await.unwrap();
+
+ let prefix = PrefixObjectStore::new(local, "prefix");
+ let location_prefix = Path::from("test_file.json");
+
+ let content_list = flatten_list_stream(&prefix, None).await.unwrap();
+ assert_eq!(content_list, &[location_prefix.clone()]);
+
+ let root = Path::from("/");
+ let content_list = flatten_list_stream(&prefix,
Some(&root)).await.unwrap();
+ assert_eq!(content_list, &[location_prefix.clone()]);
+
+ let read_data = prefix
+ .get(&location_prefix)
+ .await
+ .unwrap()
+ .bytes()
+ .await
+ .unwrap();
+ assert_eq!(&*read_data, expected_data);
+
+ let target_prefix = Path::from("/test_written.json");
+ prefix
+ .put(&target_prefix, expected_data.clone())
+ .await
+ .unwrap();
+
+ prefix.delete(&location_prefix).await.unwrap();
+
+ let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap();
+
+ let err = local.get(&location).await.unwrap_err();
+ assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+ let location = Path::from("prefix/test_written.json");
+ let read_data =
local.get(&location).await.unwrap().bytes().await.unwrap();
+ assert_eq!(&*read_data, expected_data)
+ }
+}