This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 94fe6bb4b0 Remove ObjectStore::append (#5016)
94fe6bb4b0 is described below

commit 94fe6bb4b0dde6f00d8853e6bebefd6b55e3f965
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Nov 1 14:33:37 2023 +0000

    Remove ObjectStore::append (#5016)
---
 object_store/Cargo.toml      |   5 --
 object_store/src/lib.rs      |  31 +----------
 object_store/src/limit.rs    |   7 ---
 object_store/src/local.rs    | 126 -------------------------------------------
 object_store/src/memory.rs   |  99 ----------------------------------
 object_store/src/prefix.rs   |   6 ---
 object_store/src/throttle.rs |   4 --
 7 files changed, 1 insertion(+), 277 deletions(-)

diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index cb820b509a..c8cf4e2802 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -53,11 +53,6 @@ rand = { version = "0.8", default-features = false, features 
= ["std", "std_rng"
 reqwest = { version = "0.11", default-features = false, features = 
["rustls-tls"], optional = true }
 ring = { version = "0.17", default-features = false, features = ["std"], 
optional = true }
 rustls-pemfile = { version = "1.0", default-features = false, optional = true }
-
-[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
-tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", 
"io-util", "fs"] }
-
-[target.'cfg(target_arch = "wasm32")'.dependencies]
 tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", 
"io-util"] }
 
 [target.'cfg(target_family="unix")'.dev-dependencies]
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 69db9d97bc..1b94f816b1 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -94,8 +94,7 @@
 //!
 //! This provides some compelling advantages:
 //!
-//! * Except where explicitly stated otherwise, operations are atomic, and 
readers
-//! cannot observe partial and/or failed writes
+//! * All operations are atomic, and readers cannot observe partial and/or 
failed writes
 //! * Methods map directly to object store APIs, providing both efficiency and 
predictability
 //! * Abstracts away filesystem and operating system specific quirks, ensuring 
portability
 //! * Allows for functionality not native to filesystems, such as operation 
preconditions
@@ -559,30 +558,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// vary by object store.
     async fn abort_multipart(&self, location: &Path, multipart_id: 
&MultipartId) -> Result<()>;
 
-    /// Returns an [`AsyncWrite`] that can be used to append to the object at 
`location`
-    ///
-    /// A new object will be created if it doesn't already exist, otherwise it 
will be
-    /// opened, with subsequent writes appended to the end.
-    ///
-    /// This operation cannot be supported by all stores, most use-cases 
should prefer
-    /// [`ObjectStore::put`] and [`ObjectStore::put_multipart`] for better 
portability
-    /// and stronger guarantees
-    ///
-    /// This API is not guaranteed to be atomic, in particular
-    ///
-    /// * On error, `location` may contain partial data
-    /// * Concurrent calls to [`ObjectStore::list`] may return partially 
written objects
-    /// * Concurrent calls to [`ObjectStore::get`] may return partially 
written data
-    /// * Concurrent calls to [`ObjectStore::put`] may result in data loss / 
corruption
-    /// * Concurrent calls to [`ObjectStore::append`] may result in data loss 
/ corruption
-    ///
-    /// Additionally some stores, such as Azure, may only support appending to 
objects created
-    /// with [`ObjectStore::append`], and not with [`ObjectStore::put`], 
[`ObjectStore::copy`], or
-    /// [`ObjectStore::put_multipart`]
-    async fn append(&self, _location: &Path) -> Result<Box<dyn AsyncWrite + 
Unpin + Send>> {
-        Err(Error::NotImplemented)
-    }
-
     /// Return the bytes that are stored at the specified location.
     async fn get(&self, location: &Path) -> Result<GetResult> {
         self.get_opts(location, GetOptions::default()).await
@@ -779,10 +754,6 @@ macro_rules! as_ref_impl {
                 self.as_ref().abort_multipart(location, multipart_id).await
             }
 
-            async fn append(&self, location: &Path) -> Result<Box<dyn 
AsyncWrite + Unpin + Send>> {
-                self.as_ref().append(location).await
-            }
-
             async fn get(&self, location: &Path) -> Result<GetResult> {
                 self.as_ref().get(location).await
             }
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index 39cc605c47..d1363d9a4d 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -94,13 +94,6 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
         let _permit = self.semaphore.acquire().await.unwrap();
         self.inner.abort_multipart(location, multipart_id).await
     }
-
-    async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite + 
Unpin + Send>> {
-        let permit = 
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
-        let write = self.inner.append(location).await?;
-        Ok(Box::new(PermitWrapper::new(write, permit)))
-    }
-
     async fn get(&self, location: &Path) -> Result<GetResult> {
         let permit = 
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
         let r = self.inner.get(location).await?;
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 919baf71b0..1a87dc33c7 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -350,45 +350,6 @@ impl ObjectStore for LocalFileSystem {
         .await
     }
 
-    async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite + 
Unpin + Send>> {
-        // Get the path to the file from the configuration.
-        let path = self.config.path_to_filesystem(location)?;
-        loop {
-            // Create new `OpenOptions`.
-            let mut options = tokio::fs::OpenOptions::new();
-
-            // Attempt to open the file with the given options.
-            match options
-                .truncate(false)
-                .append(true)
-                .create(true)
-                .open(&path)
-                .await
-            {
-                // If the file was successfully opened, return it wrapped in a 
boxed `AsyncWrite` trait object.
-                Ok(file) => return Ok(Box::new(file)),
-                // If the error is that the file was not found, attempt to 
create the file and any necessary parent directories.
-                Err(source) if source.kind() == ErrorKind::NotFound => {
-                    // Get the path to the parent directory of the file.
-                    let parent = path.parent().ok_or_else(|| 
Error::UnableToCreateFile {
-                        path: path.to_path_buf(),
-                        source,
-                    })?;
-
-                    // Create the parent directory and any necessary ancestors.
-                    tokio::fs::create_dir_all(parent)
-                        .await
-                        // If creating the directory fails, return a 
`UnableToCreateDirSnafu` error.
-                        .context(UnableToCreateDirSnafu { path: parent })?;
-                    // Try again to open the file.
-                    continue;
-                }
-                // If any other error occurs, return a `UnableToOpenFile` 
error.
-                Err(source) => return Err(Error::UnableToOpenFile { source, 
path }.into()),
-            }
-        }
-    }
-
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
         let location = location.clone();
         let path = self.config.path_to_filesystem(&location)?;
@@ -1449,97 +1410,10 @@ mod tests {
 mod not_wasm_tests {
     use crate::local::LocalFileSystem;
     use crate::{ObjectStore, Path};
-    use bytes::Bytes;
     use std::time::Duration;
     use tempfile::TempDir;
     use tokio::io::AsyncWriteExt;
 
-    #[tokio::test]
-    async fn creates_dir_if_not_present_append() {
-        let root = TempDir::new().unwrap();
-        let integration = 
LocalFileSystem::new_with_prefix(root.path()).unwrap();
-
-        let location = Path::from("nested/file/test_file");
-
-        let data = Bytes::from("arbitrary data");
-        let expected_data = data.clone();
-
-        let mut writer = integration.append(&location).await.unwrap();
-
-        writer.write_all(data.as_ref()).await.unwrap();
-
-        writer.flush().await.unwrap();
-
-        let read_data = integration
-            .get(&location)
-            .await
-            .unwrap()
-            .bytes()
-            .await
-            .unwrap();
-        assert_eq!(&*read_data, expected_data);
-    }
-
-    #[tokio::test]
-    async fn unknown_length_append() {
-        let root = TempDir::new().unwrap();
-        let integration = 
LocalFileSystem::new_with_prefix(root.path()).unwrap();
-
-        let location = Path::from("some_file");
-
-        let data = Bytes::from("arbitrary data");
-        let expected_data = data.clone();
-        let mut writer = integration.append(&location).await.unwrap();
-
-        writer.write_all(data.as_ref()).await.unwrap();
-        writer.flush().await.unwrap();
-
-        let read_data = integration
-            .get(&location)
-            .await
-            .unwrap()
-            .bytes()
-            .await
-            .unwrap();
-        assert_eq!(&*read_data, expected_data);
-    }
-
-    #[tokio::test]
-    async fn multiple_append() {
-        let root = TempDir::new().unwrap();
-        let integration = 
LocalFileSystem::new_with_prefix(root.path()).unwrap();
-
-        let location = Path::from("some_file");
-
-        let data = vec![
-            Bytes::from("arbitrary"),
-            Bytes::from("data"),
-            Bytes::from("gnz"),
-        ];
-
-        let mut writer = integration.append(&location).await.unwrap();
-        for d in &data {
-            writer.write_all(d).await.unwrap();
-        }
-        writer.flush().await.unwrap();
-
-        let mut writer = integration.append(&location).await.unwrap();
-        for d in &data {
-            writer.write_all(d).await.unwrap();
-        }
-        writer.flush().await.unwrap();
-
-        let read_data = integration
-            .get(&location)
-            .await
-            .unwrap()
-            .bytes()
-            .await
-            .unwrap();
-        let expected_data = Bytes::from("arbitrarydatagnzarbitrarydatagnz");
-        assert_eq!(&*read_data, expected_data);
-    }
-
     #[tokio::test]
     async fn test_cleanup_intermediate_files() {
         let root = TempDir::new().unwrap();
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index 9d79a798ad..3823001238 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -205,14 +205,6 @@ impl ObjectStore for InMemory {
         Ok(())
     }
 
-    async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite + 
Unpin + Send>> {
-        Ok(Box::new(InMemoryAppend {
-            location: location.clone(),
-            data: Vec::<u8>::new(),
-            storage: SharedStorage::clone(&self.storage),
-        }))
-    }
-
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
         let entry = self.entry(location).await?;
         let e_tag = entry.e_tag.to_string();
@@ -443,53 +435,8 @@ impl AsyncWrite for InMemoryUpload {
     }
 }
 
-struct InMemoryAppend {
-    location: Path,
-    data: Vec<u8>,
-    storage: Arc<RwLock<Storage>>,
-}
-
-impl AsyncWrite for InMemoryAppend {
-    fn poll_write(
-        mut self: Pin<&mut Self>,
-        _cx: &mut std::task::Context<'_>,
-        buf: &[u8],
-    ) -> Poll<Result<usize, io::Error>> {
-        self.data.extend_from_slice(buf);
-        Poll::Ready(Ok(buf.len()))
-    }
-
-    fn poll_flush(
-        mut self: Pin<&mut Self>,
-        _cx: &mut std::task::Context<'_>,
-    ) -> Poll<Result<(), io::Error>> {
-        let storage = Arc::clone(&self.storage);
-
-        let mut writer = storage.write();
-
-        if let Some(entry) = writer.map.remove(&self.location) {
-            let buf = std::mem::take(&mut self.data);
-            let concat = Bytes::from_iter(entry.data.into_iter().chain(buf));
-            writer.insert(&self.location, concat);
-        } else {
-            let data = Bytes::from(std::mem::take(&mut self.data));
-            writer.insert(&self.location, data);
-        };
-        Poll::Ready(Ok(()))
-    }
-
-    fn poll_shutdown(
-        self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Result<(), io::Error>> {
-        self.poll_flush(cx)
-    }
-}
-
 #[cfg(test)]
 mod tests {
-    use tokio::io::AsyncWriteExt;
-
     use super::*;
 
     use crate::tests::*;
@@ -577,50 +524,4 @@ mod tests {
             panic!("unexpected error type: {err:?}");
         }
     }
-
-    #[tokio::test]
-    async fn test_append_new() {
-        let in_memory = InMemory::new();
-        let location = Path::from("some_file");
-        let data = Bytes::from("arbitrary data");
-        let expected_data = data.clone();
-
-        let mut writer = in_memory.append(&location).await.unwrap();
-        writer.write_all(&data).await.unwrap();
-        writer.flush().await.unwrap();
-
-        let read_data = in_memory
-            .get(&location)
-            .await
-            .unwrap()
-            .bytes()
-            .await
-            .unwrap();
-        assert_eq!(&*read_data, expected_data);
-    }
-
-    #[tokio::test]
-    async fn test_append_existing() {
-        let in_memory = InMemory::new();
-        let location = Path::from("some_file");
-        let data = Bytes::from("arbitrary");
-        let data_appended = Bytes::from(" data");
-        let expected_data = Bytes::from("arbitrary data");
-
-        let mut writer = in_memory.append(&location).await.unwrap();
-        writer.write_all(&data).await.unwrap();
-        writer.flush().await.unwrap();
-
-        writer.write_all(&data_appended).await.unwrap();
-        writer.flush().await.unwrap();
-
-        let read_data = in_memory
-            .get(&location)
-            .await
-            .unwrap()
-            .bytes()
-            .await
-            .unwrap();
-        assert_eq!(&*read_data, expected_data);
-    }
 }
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index 68101307fb..38f9b07bbd 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -103,12 +103,6 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
         let full_path = self.full_path(location);
         self.inner.abort_multipart(&full_path, multipart_id).await
     }
-
-    async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite + 
Unpin + Send>> {
-        let full_path = self.full_path(location);
-        self.inner.append(&full_path).await
-    }
-
     async fn get(&self, location: &Path) -> Result<GetResult> {
         let full_path = self.full_path(location);
         self.inner.get(&full_path).await
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index dcd2c04bcf..252256a459 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -169,10 +169,6 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
         Err(super::Error::NotImplemented)
     }
 
-    async fn append(&self, _location: &Path) -> Result<Box<dyn AsyncWrite + 
Unpin + Send>> {
-        Err(super::Error::NotImplemented)
-    }
-
     async fn get(&self, location: &Path) -> Result<GetResult> {
         sleep(self.config().wait_get_per_call).await;
 

Reply via email to