This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 94fe6bb4b0 Remove ObjectStore::append (#5016)
94fe6bb4b0 is described below
commit 94fe6bb4b0dde6f00d8853e6bebefd6b55e3f965
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Nov 1 14:33:37 2023 +0000
Remove ObjectStore::append (#5016)
---
object_store/Cargo.toml | 5 --
object_store/src/lib.rs | 31 +----------
object_store/src/limit.rs | 7 ---
object_store/src/local.rs | 126 -------------------------------------------
object_store/src/memory.rs | 99 ----------------------------------
object_store/src/prefix.rs | 6 ---
object_store/src/throttle.rs | 4 --
7 files changed, 1 insertion(+), 277 deletions(-)
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index cb820b509a..c8cf4e2802 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -53,11 +53,6 @@ rand = { version = "0.8", default-features = false, features
= ["std", "std_rng"
reqwest = { version = "0.11", default-features = false, features =
["rustls-tls"], optional = true }
ring = { version = "0.17", default-features = false, features = ["std"],
optional = true }
rustls-pemfile = { version = "1.0", default-features = false, optional = true }
-
-[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
-tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time",
"io-util", "fs"] }
-
-[target.'cfg(target_arch = "wasm32")'.dependencies]
tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time",
"io-util"] }
[target.'cfg(target_family="unix")'.dev-dependencies]
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 69db9d97bc..1b94f816b1 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -94,8 +94,7 @@
//!
//! This provides some compelling advantages:
//!
-//! * Except where explicitly stated otherwise, operations are atomic, and
readers
-//! cannot observe partial and/or failed writes
+//! * All operations are atomic, and readers cannot observe partial and/or
failed writes
//! * Methods map directly to object store APIs, providing both efficiency and
predictability
//! * Abstracts away filesystem and operating system specific quirks, ensuring
portability
//! * Allows for functionality not native to filesystems, such as operation
preconditions
@@ -559,30 +558,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// vary by object store.
async fn abort_multipart(&self, location: &Path, multipart_id:
&MultipartId) -> Result<()>;
- /// Returns an [`AsyncWrite`] that can be used to append to the object at
`location`
- ///
- /// A new object will be created if it doesn't already exist, otherwise it
will be
- /// opened, with subsequent writes appended to the end.
- ///
- /// This operation cannot be supported by all stores, most use-cases
should prefer
- /// [`ObjectStore::put`] and [`ObjectStore::put_multipart`] for better
portability
- /// and stronger guarantees
- ///
- /// This API is not guaranteed to be atomic, in particular
- ///
- /// * On error, `location` may contain partial data
- /// * Concurrent calls to [`ObjectStore::list`] may return partially
written objects
- /// * Concurrent calls to [`ObjectStore::get`] may return partially
written data
- /// * Concurrent calls to [`ObjectStore::put`] may result in data loss /
corruption
- /// * Concurrent calls to [`ObjectStore::append`] may result in data loss
/ corruption
- ///
- /// Additionally some stores, such as Azure, may only support appending to
objects created
- /// with [`ObjectStore::append`], and not with [`ObjectStore::put`],
[`ObjectStore::copy`], or
- /// [`ObjectStore::put_multipart`]
- async fn append(&self, _location: &Path) -> Result<Box<dyn AsyncWrite +
Unpin + Send>> {
- Err(Error::NotImplemented)
- }
-
/// Return the bytes that are stored at the specified location.
async fn get(&self, location: &Path) -> Result<GetResult> {
self.get_opts(location, GetOptions::default()).await
@@ -779,10 +754,6 @@ macro_rules! as_ref_impl {
self.as_ref().abort_multipart(location, multipart_id).await
}
- async fn append(&self, location: &Path) -> Result<Box<dyn
AsyncWrite + Unpin + Send>> {
- self.as_ref().append(location).await
- }
-
async fn get(&self, location: &Path) -> Result<GetResult> {
self.as_ref().get(location).await
}
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index 39cc605c47..d1363d9a4d 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -94,13 +94,6 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.abort_multipart(location, multipart_id).await
}
-
- async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite +
Unpin + Send>> {
- let permit =
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
- let write = self.inner.append(location).await?;
- Ok(Box::new(PermitWrapper::new(write, permit)))
- }
-
async fn get(&self, location: &Path) -> Result<GetResult> {
let permit =
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
let r = self.inner.get(location).await?;
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 919baf71b0..1a87dc33c7 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -350,45 +350,6 @@ impl ObjectStore for LocalFileSystem {
.await
}
- async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite +
Unpin + Send>> {
- // Get the path to the file from the configuration.
- let path = self.config.path_to_filesystem(location)?;
- loop {
- // Create new `OpenOptions`.
- let mut options = tokio::fs::OpenOptions::new();
-
- // Attempt to open the file with the given options.
- match options
- .truncate(false)
- .append(true)
- .create(true)
- .open(&path)
- .await
- {
- // If the file was successfully opened, return it wrapped in a
boxed `AsyncWrite` trait object.
- Ok(file) => return Ok(Box::new(file)),
- // If the error is that the file was not found, attempt to
create the file and any necessary parent directories.
- Err(source) if source.kind() == ErrorKind::NotFound => {
- // Get the path to the parent directory of the file.
- let parent = path.parent().ok_or_else(||
Error::UnableToCreateFile {
- path: path.to_path_buf(),
- source,
- })?;
-
- // Create the parent directory and any necessary ancestors.
- tokio::fs::create_dir_all(parent)
- .await
- // If creating the directory fails, return a
`UnableToCreateDirSnafu` error.
- .context(UnableToCreateDirSnafu { path: parent })?;
- // Try again to open the file.
- continue;
- }
- // If any other error occurs, return a `UnableToOpenFile`
error.
- Err(source) => return Err(Error::UnableToOpenFile { source,
path }.into()),
- }
- }
- }
-
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
let location = location.clone();
let path = self.config.path_to_filesystem(&location)?;
@@ -1449,97 +1410,10 @@ mod tests {
mod not_wasm_tests {
use crate::local::LocalFileSystem;
use crate::{ObjectStore, Path};
- use bytes::Bytes;
use std::time::Duration;
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;
- #[tokio::test]
- async fn creates_dir_if_not_present_append() {
- let root = TempDir::new().unwrap();
- let integration =
LocalFileSystem::new_with_prefix(root.path()).unwrap();
-
- let location = Path::from("nested/file/test_file");
-
- let data = Bytes::from("arbitrary data");
- let expected_data = data.clone();
-
- let mut writer = integration.append(&location).await.unwrap();
-
- writer.write_all(data.as_ref()).await.unwrap();
-
- writer.flush().await.unwrap();
-
- let read_data = integration
- .get(&location)
- .await
- .unwrap()
- .bytes()
- .await
- .unwrap();
- assert_eq!(&*read_data, expected_data);
- }
-
- #[tokio::test]
- async fn unknown_length_append() {
- let root = TempDir::new().unwrap();
- let integration =
LocalFileSystem::new_with_prefix(root.path()).unwrap();
-
- let location = Path::from("some_file");
-
- let data = Bytes::from("arbitrary data");
- let expected_data = data.clone();
- let mut writer = integration.append(&location).await.unwrap();
-
- writer.write_all(data.as_ref()).await.unwrap();
- writer.flush().await.unwrap();
-
- let read_data = integration
- .get(&location)
- .await
- .unwrap()
- .bytes()
- .await
- .unwrap();
- assert_eq!(&*read_data, expected_data);
- }
-
- #[tokio::test]
- async fn multiple_append() {
- let root = TempDir::new().unwrap();
- let integration =
LocalFileSystem::new_with_prefix(root.path()).unwrap();
-
- let location = Path::from("some_file");
-
- let data = vec![
- Bytes::from("arbitrary"),
- Bytes::from("data"),
- Bytes::from("gnz"),
- ];
-
- let mut writer = integration.append(&location).await.unwrap();
- for d in &data {
- writer.write_all(d).await.unwrap();
- }
- writer.flush().await.unwrap();
-
- let mut writer = integration.append(&location).await.unwrap();
- for d in &data {
- writer.write_all(d).await.unwrap();
- }
- writer.flush().await.unwrap();
-
- let read_data = integration
- .get(&location)
- .await
- .unwrap()
- .bytes()
- .await
- .unwrap();
- let expected_data = Bytes::from("arbitrarydatagnzarbitrarydatagnz");
- assert_eq!(&*read_data, expected_data);
- }
-
#[tokio::test]
async fn test_cleanup_intermediate_files() {
let root = TempDir::new().unwrap();
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index 9d79a798ad..3823001238 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -205,14 +205,6 @@ impl ObjectStore for InMemory {
Ok(())
}
- async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite +
Unpin + Send>> {
- Ok(Box::new(InMemoryAppend {
- location: location.clone(),
- data: Vec::<u8>::new(),
- storage: SharedStorage::clone(&self.storage),
- }))
- }
-
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
let entry = self.entry(location).await?;
let e_tag = entry.e_tag.to_string();
@@ -443,53 +435,8 @@ impl AsyncWrite for InMemoryUpload {
}
}
-struct InMemoryAppend {
- location: Path,
- data: Vec<u8>,
- storage: Arc<RwLock<Storage>>,
-}
-
-impl AsyncWrite for InMemoryAppend {
- fn poll_write(
- mut self: Pin<&mut Self>,
- _cx: &mut std::task::Context<'_>,
- buf: &[u8],
- ) -> Poll<Result<usize, io::Error>> {
- self.data.extend_from_slice(buf);
- Poll::Ready(Ok(buf.len()))
- }
-
- fn poll_flush(
- mut self: Pin<&mut Self>,
- _cx: &mut std::task::Context<'_>,
- ) -> Poll<Result<(), io::Error>> {
- let storage = Arc::clone(&self.storage);
-
- let mut writer = storage.write();
-
- if let Some(entry) = writer.map.remove(&self.location) {
- let buf = std::mem::take(&mut self.data);
- let concat = Bytes::from_iter(entry.data.into_iter().chain(buf));
- writer.insert(&self.location, concat);
- } else {
- let data = Bytes::from(std::mem::take(&mut self.data));
- writer.insert(&self.location, data);
- };
- Poll::Ready(Ok(()))
- }
-
- fn poll_shutdown(
- self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> Poll<Result<(), io::Error>> {
- self.poll_flush(cx)
- }
-}
-
#[cfg(test)]
mod tests {
- use tokio::io::AsyncWriteExt;
-
use super::*;
use crate::tests::*;
@@ -577,50 +524,4 @@ mod tests {
panic!("unexpected error type: {err:?}");
}
}
-
- #[tokio::test]
- async fn test_append_new() {
- let in_memory = InMemory::new();
- let location = Path::from("some_file");
- let data = Bytes::from("arbitrary data");
- let expected_data = data.clone();
-
- let mut writer = in_memory.append(&location).await.unwrap();
- writer.write_all(&data).await.unwrap();
- writer.flush().await.unwrap();
-
- let read_data = in_memory
- .get(&location)
- .await
- .unwrap()
- .bytes()
- .await
- .unwrap();
- assert_eq!(&*read_data, expected_data);
- }
-
- #[tokio::test]
- async fn test_append_existing() {
- let in_memory = InMemory::new();
- let location = Path::from("some_file");
- let data = Bytes::from("arbitrary");
- let data_appended = Bytes::from(" data");
- let expected_data = Bytes::from("arbitrary data");
-
- let mut writer = in_memory.append(&location).await.unwrap();
- writer.write_all(&data).await.unwrap();
- writer.flush().await.unwrap();
-
- writer.write_all(&data_appended).await.unwrap();
- writer.flush().await.unwrap();
-
- let read_data = in_memory
- .get(&location)
- .await
- .unwrap()
- .bytes()
- .await
- .unwrap();
- assert_eq!(&*read_data, expected_data);
- }
}
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index 68101307fb..38f9b07bbd 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -103,12 +103,6 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
let full_path = self.full_path(location);
self.inner.abort_multipart(&full_path, multipart_id).await
}
-
- async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite +
Unpin + Send>> {
- let full_path = self.full_path(location);
- self.inner.append(&full_path).await
- }
-
async fn get(&self, location: &Path) -> Result<GetResult> {
let full_path = self.full_path(location);
self.inner.get(&full_path).await
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index dcd2c04bcf..252256a459 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -169,10 +169,6 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
Err(super::Error::NotImplemented)
}
- async fn append(&self, _location: &Path) -> Result<Box<dyn AsyncWrite +
Unpin + Send>> {
- Err(super::Error::NotImplemented)
- }
-
async fn get(&self, location: &Path) -> Result<GetResult> {
sleep(self.config().wait_get_per_call).await;