This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new cc23cacd12 Improve object_store docs (#4978)
cc23cacd12 is described below

commit cc23cacd12703ffd604b6ca52715f52b409e0659
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Oct 30 12:02:17 2023 +0000

    Improve object_store docs (#4978)
    
    * Improve object_store docs
    
    * Document configuration system
    
    * Review feedback
---
 object_store/src/lib.rs | 285 ++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 250 insertions(+), 35 deletions(-)

diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 51203ca4a4..69db9d97bc 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -38,13 +38,18 @@
 //!
 //! # Highlights
 //!
-//! 1. A focused, easy to use, idiomatic, well documented, high
-//! performance, `async` API.
+//! 1. A high-performance async API focused on providing a consistent interface
+//! mirroring that of object stores such as [S3]
 //!
 //! 2. Production quality, leading this crate to be used in large
-//! scale production systems, such as [crates.io] and [InfluxDB IOx].
+//! scale production systems, such as [crates.io] and [InfluxDB IOx]
 //!
-//! 3. Stable and predictable governance via the [Apache Arrow] project.
+//! 3. Support for advanced functionality, including atomic, conditional reads
+//! and writes, vectored IO, bulk deletion, and more...
+//!
+//! 4. Stable and predictable governance via the [Apache Arrow] project
+//!
+//! 5. Small dependency footprint, depending on only a small number of common 
crates
 //!
 //! Originally developed for [InfluxDB IOx] and subsequently donated
 //! to [Apache Arrow].
@@ -52,6 +57,8 @@
 //! [Apache Arrow]: https://arrow.apache.org/
 //! [InfluxDB IOx]: https://github.com/influxdata/influxdb_iox/
 //! [crates.io]: https://github.com/rust-lang/crates.io
+//! [ACID]: https://en.wikipedia.org/wiki/ACID
+//! [S3]: https://aws.amazon.com/s3/
 //!
 //! # Available [`ObjectStore`] Implementations
 //!
@@ -79,6 +86,23 @@
     doc = "* [`http`]: [HTTP/WebDAV 
Storage](https://datatracker.ietf.org/doc/html/rfc2518). See 
[`HttpBuilder`](http::HttpBuilder)"
 )]
 //!
+//! # Why not a Filesystem Interface?
+//!
+//! Whilst this crate does provide a [`BufReader`], the [`ObjectStore`] 
interface mirrors the APIs
+//! of object stores and not filesystems, opting to provide stateless APIs 
instead of the cursor
+//! based interfaces such as [`Read`] or [`Seek`] favoured by filesystems.
+//!
+//! This provides some compelling advantages:
+//!
+//! * Except where explicitly stated otherwise, operations are atomic, and 
readers
+//! cannot observe partial and/or failed writes
+//! * Methods map directly to object store APIs, providing both efficiency and 
predictability
+//! * Abstracts away filesystem and operating system specific quirks, ensuring 
portability
+//! * Allows for functionality not native to filesystems, such as operation 
preconditions
+//! and atomic multipart uploads
+//!
+//! [`BufReader`]: buffered::BufReader
+//!
 //! # Adapters
 //!
 //! [`ObjectStore`] instances can be composed with various adapters
@@ -87,8 +111,43 @@
 //! * Rate Throttling: [`ThrottleConfig`](throttle::ThrottleConfig)
 //! * Concurrent Request Limit: [`LimitStore`](limit::LimitStore)
 //!
+//! # Configuration System
+//!
+//! This crate provides a configuration system inspired by the APIs exposed by 
[fsspec],
+//! [PyArrow FileSystem], and [Hadoop FileSystem], allowing creating a 
[`DynObjectStore`]
+//! from a URL and an optional list of key value pairs. This provides a 
flexible interface
+//! to support a wide variety of user-defined store configurations, with 
minimal additional
+//! application complexity.
+//!
+//! ```no_run
+//! # use url::Url;
+//! # use object_store::{parse_url, parse_url_opts};
+//! # use object_store::aws::{AmazonS3, AmazonS3Builder};
+//! #
+//! #
+//! // Can manually create a specific store variant using the appropriate 
builder
+//! let store: AmazonS3 = AmazonS3Builder::from_env()
+//!     .with_bucket_name("my-bucket").build().unwrap();
 //!
-//! # List objects:
+//! // Alternatively can create an ObjectStore from an S3 URL
+//! let url = Url::parse("s3://bucket/path").unwrap();
+//! let (store, path) = parse_url(&url).unwrap();
+//! assert_eq!(path.as_ref(), "path");
+//!
+//! // Potentially with additional options
+//! let (store, path) = parse_url_opts(&url, vec![("aws_access_key_id", 
"...")]).unwrap();
+//!
+//! // Or with URLs that encode the bucket name in the URL path
+//! let url = 
Url::parse("https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket/path";).unwrap();
+//! let (store, path) = parse_url(&url).unwrap();
+//! assert_eq!(path.as_ref(), "path");
+//! ```
+//!
+//! [PyArrow FileSystem]: 
https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.from_uri
+//! [fsspec]: 
https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem
+//! [Hadoop FileSystem]: 
https://hadoop.apache.org/docs/r3.0.0/api/org/apache/hadoop/fs/FileSystem.html#get-java.net.URI-org.apache.hadoop.conf.Configuration-
+//!
+//! # List objects
 //!
 //! Use the [`ObjectStore::list`] method to iterate over objects in
 //! remote storage or files in the local filesystem:
@@ -111,7 +170,7 @@
 //! // Recursively list all files below the 'data' path.
 //! // 1. On AWS S3 this would be the 'data/' prefix
 //! // 2. On a local filesystem, this would be the 'data' directory
-//! let prefix: Path = "data".try_into().unwrap();
+//! let prefix = Path::from("data");
 //!
 //! // Get an `async` stream of Metadata objects:
 //! let mut list_stream = object_store.list(Some(&prefix));
@@ -141,25 +200,34 @@
 //! # use futures::TryStreamExt;
 //! # use object_store::local::LocalFileSystem;
 //! # use std::sync::Arc;
-//! # use object_store::{path::Path, ObjectStore};
+//! #  use bytes::Bytes;
+//! # use object_store::{path::Path, ObjectStore, GetResult};
 //! # fn get_object_store() -> Arc<dyn ObjectStore> {
 //! #   Arc::new(LocalFileSystem::new())
 //! # }
 //! #
 //! # async fn example() {
 //! #
-//! // create an ObjectStore
+//! // Create an ObjectStore
 //! let object_store: Arc<dyn ObjectStore> = get_object_store();
 //!
 //! // Retrieve a specific file
-//! let path: Path = "data/file01.parquet".try_into().unwrap();
+//! let path = Path::from("data/file01.parquet");
+//!
+//! // Fetch just the file metadata
+//! let meta = object_store.head(&path).await.unwrap();
+//! println!("{meta:?}");
+//!
+//! // Fetch the object including metadata
+//! let result: GetResult = object_store.get(&path).await.unwrap();
+//! assert_eq!(result.meta, meta);
+//!
+//! // Buffer the entire object in memory
+//! let object: Bytes = result.bytes().await.unwrap();
+//! assert_eq!(object.len(), meta.size);
 //!
-//! // fetch the bytes from object store
-//! let stream = object_store
-//!     .get(&path)
-//!     .await
-//!     .unwrap()
-//!     .into_stream();
+//! // Alternatively stream the bytes from object storage
+//! let stream = object_store.get(&path).await.unwrap().into_stream();
 //!
 //! // Count the '0's using `try_fold` from `TryStreamExt` trait
 //! let num_zeros = stream
@@ -171,13 +239,9 @@
 //! # }
 //! ```
 //!
-//! Which will print out something like the following:
+//! #  Put Object
 //!
-//! ```text
-//! Num zeros in data/file01.parquet is 657
-//! ```
-//! #  Put object
-//! Use the [`ObjectStore::put`] method to save data in remote storage or 
local filesystem.
+//! Use the [`ObjectStore::put`] method to atomically write data.
 //!
 //! ```
 //! # use object_store::local::LocalFileSystem;
@@ -190,15 +254,17 @@
 //! # }
 //! # async fn put() {
 //! #
-//!  let object_store: Arc<dyn ObjectStore> = get_object_store();
-//!  let path: Path = "data/file1".try_into().unwrap();
-//!  let bytes = Bytes::from_static(b"hello");
-//!  object_store.put(&path, bytes).await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/file1");
+//! let bytes = Bytes::from_static(b"hello");
+//! object_store.put(&path, bytes).await.unwrap();
 //! # }
 //! ```
 //!
-//! #  Multipart put object
-//! Use the [`ObjectStore::put_multipart`] method to save large amount of data 
in chunks.
+//! #  Multipart Upload
+//!
+//! Use the [`ObjectStore::put_multipart`] method to atomically write a large 
amount of data,
+//! with implementations automatically handling parallel, chunked upload where 
appropriate.
 //!
 //! ```
 //! # use object_store::local::LocalFileSystem;
@@ -212,16 +278,165 @@
 //! # }
 //! # async fn multi_upload() {
 //! #
-//!  let object_store: Arc<dyn ObjectStore> = get_object_store();
-//!  let path: Path = "data/large_file".try_into().unwrap();
-//!  let (_id, mut writer) =  object_store.put_multipart(&path).await.unwrap();
-//!
-//!  let bytes = Bytes::from_static(b"hello");
-//!  writer.write_all(&bytes).await.unwrap();
-//!  writer.flush().await.unwrap();
-//!  writer.shutdown().await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let (_id, mut writer) =  object_store.put_multipart(&path).await.unwrap();
+//!
+//! let bytes = Bytes::from_static(b"hello");
+//! writer.write_all(&bytes).await.unwrap();
+//! writer.flush().await.unwrap();
+//! writer.shutdown().await.unwrap();
 //! # }
 //! ```
+//!
+//! # Vectored Read
+//!
+//! A common pattern, especially when reading structured datasets, is to need 
to fetch
+//! multiple, potentially non-contiguous, ranges of a particular object.
+//!
+//! [`ObjectStore::get_ranges`] provides an efficient way to perform such 
vectored IO, and will
+//! automatically coalesce adjacent ranges into an appropriate number of 
parallel requests.
+//!
+//! ```
+//! # use object_store::local::LocalFileSystem;
+//! # use object_store::ObjectStore;
+//! # use std::sync::Arc;
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! #   Arc::new(LocalFileSystem::new())
+//! # }
+//! # async fn multi_upload() {
+//! #
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 
0..10]).await.unwrap();
+//! assert_eq!(ranges.len(), 3);
+//! assert_eq!(ranges[0].len(), 10);
+//! # }
+//! ```
+//!
+//! # Conditional Fetch
+//!
+//! More complex object retrieval can be supported by 
[`ObjectStore::get_opts`].
+//!
+//! For example, efficiently refreshing a cache without re-fetching the entire 
object
+//! data if the object hasn't been modified.
+//!
+//! ```
+//! # use std::collections::btree_map::Entry;
+//! # use std::collections::HashMap;
+//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
+//! # use std::sync::Arc;
+//! # use std::time::{Duration, Instant};
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! struct CacheEntry {
+//!     /// Data returned by last request
+//!     data: Bytes,
+//!     /// ETag identifying the object returned by the server
+//!     e_tag: String,
+//!     /// Instant of last refresh
+//!     refreshed_at: Instant,
+//! }
+//!
+//! /// Example cache that checks entries after 10 seconds for a new version
+//! struct Cache {
+//!     entries: HashMap<Path, CacheEntry>,
+//!     store: Arc<dyn ObjectStore>,
+//! }
+//!
+//! impl Cache {
+//!     pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
+//!         Ok(match self.entries.get_mut(path) {
+//!             Some(e) => match e.refreshed_at.elapsed() < 
Duration::from_secs(10) {
+//!                 true => e.data.clone(), // Return cached data
+//!                 false => { // Check if remote version has changed
+//!                     let opts = GetOptions {
+//!                         if_none_match: Some(e.e_tag.clone()),
+//!                         ..GetOptions::default()
+//!                     };
+//!                     match self.store.get_opts(&path, opts).await {
+//!                         Ok(d) => e.data = d.bytes().await?,
+//!                         Err(Error::NotModified { .. }) => {} // Data has 
not changed
+//!                         Err(e) => return Err(e),
+//!                     };
+//!                     e.refreshed_at = Instant::now();
+//!                     e.data.clone()
+//!                 }
+//!             },
+//!             None => { // Not cached, fetch data
+//!                 let get = self.store.get(&path).await?;
+//!                 let e_tag = get.meta.e_tag.clone();
+//!                 let data = get.bytes().await?;
+//!                 if let Some(e_tag) = e_tag {
+//!                     let entry = CacheEntry {
+//!                         e_tag,
+//!                         data: data.clone(),
+//!                         refreshed_at: Instant::now(),
+//!                     };
+//!                     self.entries.insert(path.clone(), entry);
+//!                 }
+//!                 data
+//!             }
+//!         })
+//!     }
+//! }
+//! ```
+//!
+//! # Conditional Put
+//!
+//! The default behaviour when writing data is to upsert any existing object 
at the given path,
+//! overwriting any previous value. More complex behaviours can be achieved 
using [`PutMode`], and
+//! can be used to build [Optimistic Concurrency Control] based transactions. 
This facilitates
+//! building metadata catalogs, such as [Apache Iceberg] or [Delta Lake], 
directly on top of object
+//! storage, without relying on a separate DBMS.
+//!
+//! ```
+//! # use object_store::{Error, ObjectStore, PutMode, UpdateVersion};
+//! # use std::sync::Arc;
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::memory::InMemory;
+//! # use object_store::path::Path;
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! #   Arc::new(InMemory::new())
+//! # }
+//! # fn do_update(b: Bytes) -> Bytes {b}
+//! # async fn conditional_put() {
+//! let store = get_object_store();
+//! let path = Path::from("test");
+//!
+//! // Perform a conditional update on path
+//! loop {
+//!     // Perform get request
+//!     let r = store.get(&path).await.unwrap();
+//!
+//!     // Save version information fetched
+//!     let version = UpdateVersion {
+//!         e_tag: r.meta.e_tag.clone(),
+//!         version: r.meta.version.clone(),
+//!     };
+//!
+//!     // Compute new version of object contents
+//!     let new = do_update(r.bytes().await.unwrap());
+//!
+//!     // Attempt to commit transaction
+//!     match store.put_opts(&path, new, 
PutMode::Update(version).into()).await {
+//!         Ok(_) => break, // Successfully committed
+//!         Err(Error::Precondition { .. }) => continue, // Object has 
changed, try again
+//!         Err(e) => panic!("{e}")
+//!     }
+//! }
+//! # }
+//! ```
+//!
+//! [Optimistic Concurrency Control]: 
https://en.wikipedia.org/wiki/Optimistic_concurrency_control
+//! [Apache Iceberg]: https://iceberg.apache.org/
+//! [Delta Lake]: https://delta.io/
+//!
 
 #[cfg(all(
     target_arch = "wasm32",

Reply via email to