This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new cc23cacd12 Improve object_store docs (#4978)
cc23cacd12 is described below
commit cc23cacd12703ffd604b6ca52715f52b409e0659
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Oct 30 12:02:17 2023 +0000
Improve object_store docs (#4978)
* Improve object_store docs
* Document configuration system
* Review feedback
---
object_store/src/lib.rs | 285 ++++++++++++++++++++++++++++++++++++++++++------
1 file changed, 250 insertions(+), 35 deletions(-)
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 51203ca4a4..69db9d97bc 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -38,13 +38,18 @@
//!
//! # Highlights
//!
-//! 1. A focused, easy to use, idiomatic, well documented, high
-//! performance, `async` API.
+//! 1. A high-performance async API focused on providing a consistent interface
+//! mirroring that of object stores such as [S3]
//!
//! 2. Production quality, leading this crate to be used in large
-//! scale production systems, such as [crates.io] and [InfluxDB IOx].
+//! scale production systems, such as [crates.io] and [InfluxDB IOx]
//!
-//! 3. Stable and predictable governance via the [Apache Arrow] project.
+//! 3. Support for advanced functionality, including atomic, conditional reads
+//! and writes, vectored IO, bulk deletion, and more...
+//!
+//! 4. Stable and predictable governance via the [Apache Arrow] project
+//!
+//! 5. Small dependency footprint, depending on only a small number of common
crates
//!
//! Originally developed for [InfluxDB IOx] and subsequently donated
//! to [Apache Arrow].
@@ -52,6 +57,8 @@
//! [Apache Arrow]: https://arrow.apache.org/
//! [InfluxDB IOx]: https://github.com/influxdata/influxdb_iox/
//! [crates.io]: https://github.com/rust-lang/crates.io
+//! [ACID]: https://en.wikipedia.org/wiki/ACID
+//! [S3]: https://aws.amazon.com/s3/
//!
//! # Available [`ObjectStore`] Implementations
//!
@@ -79,6 +86,23 @@
doc = "* [`http`]: [HTTP/WebDAV
Storage](https://datatracker.ietf.org/doc/html/rfc2518). See
[`HttpBuilder`](http::HttpBuilder)"
)]
//!
+//! # Why not a Filesystem Interface?
+//!
+//! Whilst this crate does provide a [`BufReader`], the [`ObjectStore`]
interface mirrors the APIs
+//! of object stores and not filesystems, opting to provide stateless APIs
instead of the cursor
+//! based interfaces such as [`Read`] or [`Seek`] favoured by filesystems.
+//!
+//! This provides some compelling advantages:
+//!
+//! * Except where explicitly stated otherwise, operations are atomic, and
readers
+//! cannot observe partial and/or failed writes
+//! * Methods map directly to object store APIs, providing both efficiency and
predictability
+//! * Abstracts away filesystem and operating system specific quirks, ensuring
portability
+//! * Allows for functionality not native to filesystems, such as operation
preconditions
+//! and atomic multipart uploads
+//!
+//! [`BufReader`]: buffered::BufReader
+//!
//! # Adapters
//!
//! [`ObjectStore`] instances can be composed with various adapters
@@ -87,8 +111,43 @@
//! * Rate Throttling: [`ThrottleConfig`](throttle::ThrottleConfig)
//! * Concurrent Request Limit: [`LimitStore`](limit::LimitStore)
//!
+//! # Configuration System
+//!
+//! This crate provides a configuration system inspired by the APIs exposed by
[fsspec],
+//! [PyArrow FileSystem], and [Hadoop FileSystem], allowing creating a
[`DynObjectStore`]
+//! from a URL and an optional list of key value pairs. This provides a
flexible interface
+//! to support a wide variety of user-defined store configurations, with
minimal additional
+//! application complexity.
+//!
+//! ```no_run
+//! # use url::Url;
+//! # use object_store::{parse_url, parse_url_opts};
+//! # use object_store::aws::{AmazonS3, AmazonS3Builder};
+//! #
+//! #
+//! // Can manually create a specific store variant using the appropriate
builder
+//! let store: AmazonS3 = AmazonS3Builder::from_env()
+//! .with_bucket_name("my-bucket").build().unwrap();
//!
-//! # List objects:
+//! // Alternatively can create an ObjectStore from an S3 URL
+//! let url = Url::parse("s3://bucket/path").unwrap();
+//! let (store, path) = parse_url(&url).unwrap();
+//! assert_eq!(path.as_ref(), "path");
+//!
+//! // Potentially with additional options
+//! let (store, path) = parse_url_opts(&url, vec![("aws_access_key_id",
"...")]).unwrap();
+//!
+//! // Or with URLs that encode the bucket name in the URL path
+//! let url =
Url::parse("https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket/path").unwrap();
+//! let (store, path) = parse_url(&url).unwrap();
+//! assert_eq!(path.as_ref(), "path");
+//! ```
+//!
+//! [PyArrow FileSystem]:
https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.from_uri
+//! [fsspec]:
https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem
+//! [Hadoop FileSystem]:
https://hadoop.apache.org/docs/r3.0.0/api/org/apache/hadoop/fs/FileSystem.html#get-java.net.URI-org.apache.hadoop.conf.Configuration-
+//!
+//! # List objects
//!
//! Use the [`ObjectStore::list`] method to iterate over objects in
//! remote storage or files in the local filesystem:
@@ -111,7 +170,7 @@
//! // Recursively list all files below the 'data' path.
//! // 1. On AWS S3 this would be the 'data/' prefix
//! // 2. On a local filesystem, this would be the 'data' directory
-//! let prefix: Path = "data".try_into().unwrap();
+//! let prefix = Path::from("data");
//!
//! // Get an `async` stream of Metadata objects:
//! let mut list_stream = object_store.list(Some(&prefix));
@@ -141,25 +200,34 @@
//! # use futures::TryStreamExt;
//! # use object_store::local::LocalFileSystem;
//! # use std::sync::Arc;
-//! # use object_store::{path::Path, ObjectStore};
+//! # use bytes::Bytes;
+//! # use object_store::{path::Path, ObjectStore, GetResult};
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(LocalFileSystem::new())
//! # }
//! #
//! # async fn example() {
//! #
-//! // create an ObjectStore
+//! // Create an ObjectStore
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//!
//! // Retrieve a specific file
-//! let path: Path = "data/file01.parquet".try_into().unwrap();
+//! let path = Path::from("data/file01.parquet");
+//!
+//! // Fetch just the file metadata
+//! let meta = object_store.head(&path).await.unwrap();
+//! println!("{meta:?}");
+//!
+//! // Fetch the object including metadata
+//! let result: GetResult = object_store.get(&path).await.unwrap();
+//! assert_eq!(result.meta, meta);
+//!
+//! // Buffer the entire object in memory
+//! let object: Bytes = result.bytes().await.unwrap();
+//! assert_eq!(object.len(), meta.size);
//!
-//! // fetch the bytes from object store
-//! let stream = object_store
-//! .get(&path)
-//! .await
-//! .unwrap()
-//! .into_stream();
+//! // Alternatively stream the bytes from object storage
+//! let stream = object_store.get(&path).await.unwrap().into_stream();
//!
//! // Count the '0's using `try_fold` from `TryStreamExt` trait
//! let num_zeros = stream
@@ -171,13 +239,9 @@
//! # }
//! ```
//!
-//! Which will print out something like the following:
+//! # Put Object
//!
-//! ```text
-//! Num zeros in data/file01.parquet is 657
-//! ```
-//! # Put object
-//! Use the [`ObjectStore::put`] method to save data in remote storage or
local filesystem.
+//! Use the [`ObjectStore::put`] method to atomically write data.
//!
//! ```
//! # use object_store::local::LocalFileSystem;
@@ -190,15 +254,17 @@
//! # }
//! # async fn put() {
//! #
-//! let object_store: Arc<dyn ObjectStore> = get_object_store();
-//! let path: Path = "data/file1".try_into().unwrap();
-//! let bytes = Bytes::from_static(b"hello");
-//! object_store.put(&path, bytes).await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/file1");
+//! let bytes = Bytes::from_static(b"hello");
+//! object_store.put(&path, bytes).await.unwrap();
//! # }
//! ```
//!
-//! # Multipart put object
-//! Use the [`ObjectStore::put_multipart`] method to save large amount of data
in chunks.
+//! # Multipart Upload
+//!
+//! Use the [`ObjectStore::put_multipart`] method to atomically write a large
amount of data,
+//! with implementations automatically handling parallel, chunked upload where
appropriate.
//!
//! ```
//! # use object_store::local::LocalFileSystem;
@@ -212,16 +278,165 @@
//! # }
//! # async fn multi_upload() {
//! #
-//! let object_store: Arc<dyn ObjectStore> = get_object_store();
-//! let path: Path = "data/large_file".try_into().unwrap();
-//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
-//!
-//! let bytes = Bytes::from_static(b"hello");
-//! writer.write_all(&bytes).await.unwrap();
-//! writer.flush().await.unwrap();
-//! writer.shutdown().await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
+//!
+//! let bytes = Bytes::from_static(b"hello");
+//! writer.write_all(&bytes).await.unwrap();
+//! writer.flush().await.unwrap();
+//! writer.shutdown().await.unwrap();
//! # }
//! ```
+//!
+//! # Vectored Read
+//!
+//! A common pattern, especially when reading structured datasets, is to need
to fetch
+//! multiple, potentially non-contiguous, ranges of a particular object.
+//!
+//! [`ObjectStore::get_ranges`] provides an efficient way to perform such
vectored IO, and will
+//! automatically coalesce adjacent ranges into an appropriate number of
parallel requests.
+//!
+//! ```
+//! # use object_store::local::LocalFileSystem;
+//! # use object_store::ObjectStore;
+//! # use std::sync::Arc;
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! # Arc::new(LocalFileSystem::new())
+//! # }
+//! # async fn multi_upload() {
+//! #
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600,
0..10]).await.unwrap();
+//! assert_eq!(ranges.len(), 3);
+//! assert_eq!(ranges[0].len(), 10);
+//! # }
+//! ```
+//!
+//! # Conditional Fetch
+//!
+//! More complex object retrieval can be supported by
[`ObjectStore::get_opts`].
+//!
+//! For example, efficiently refreshing a cache without re-fetching the entire
object
+//! data if the object hasn't been modified.
+//!
+//! ```
+//! # use std::collections::btree_map::Entry;
+//! # use std::collections::HashMap;
+//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
+//! # use std::sync::Arc;
+//! # use std::time::{Duration, Instant};
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! struct CacheEntry {
+//! /// Data returned by last request
+//! data: Bytes,
+//! /// ETag identifying the object returned by the server
+//! e_tag: String,
+//! /// Instant of last refresh
+//! refreshed_at: Instant,
+//! }
+//!
+//! /// Example cache that checks entries after 10 seconds for a new version
+//! struct Cache {
+//! entries: HashMap<Path, CacheEntry>,
+//! store: Arc<dyn ObjectStore>,
+//! }
+//!
+//! impl Cache {
+//! pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
+//! Ok(match self.entries.get_mut(path) {
+//! Some(e) => match e.refreshed_at.elapsed() <
Duration::from_secs(10) {
+//! true => e.data.clone(), // Return cached data
+//! false => { // Check if remote version has changed
+//! let opts = GetOptions {
+//! if_none_match: Some(e.e_tag.clone()),
+//! ..GetOptions::default()
+//! };
+//! match self.store.get_opts(&path, opts).await {
+//! Ok(d) => e.data = d.bytes().await?,
+//! Err(Error::NotModified { .. }) => {} // Data has
not changed
+//! Err(e) => return Err(e),
+//! };
+//! e.refreshed_at = Instant::now();
+//! e.data.clone()
+//! }
+//! },
+//! None => { // Not cached, fetch data
+//! let get = self.store.get(&path).await?;
+//! let e_tag = get.meta.e_tag.clone();
+//! let data = get.bytes().await?;
+//! if let Some(e_tag) = e_tag {
+//! let entry = CacheEntry {
+//! e_tag,
+//! data: data.clone(),
+//! refreshed_at: Instant::now(),
+//! };
+//! self.entries.insert(path.clone(), entry);
+//! }
+//! data
+//! }
+//! })
+//! }
+//! }
+//! ```
+//!
+//! # Conditional Put
+//!
+//! The default behaviour when writing data is to upsert any existing object
at the given path,
+//! overwriting any previous value. More complex behaviours can be achieved
using [`PutMode`], and
+//! can be used to build [Optimistic Concurrency Control] based transactions.
This facilitates
+//! building metadata catalogs, such as [Apache Iceberg] or [Delta Lake],
directly on top of object
+//! storage, without relying on a separate DBMS.
+//!
+//! ```
+//! # use object_store::{Error, ObjectStore, PutMode, UpdateVersion};
+//! # use std::sync::Arc;
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::memory::InMemory;
+//! # use object_store::path::Path;
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! # Arc::new(InMemory::new())
+//! # }
+//! # fn do_update(b: Bytes) -> Bytes {b}
+//! # async fn conditional_put() {
+//! let store = get_object_store();
+//! let path = Path::from("test");
+//!
+//! // Perform a conditional update on path
+//! loop {
+//! // Perform get request
+//! let r = store.get(&path).await.unwrap();
+//!
+//! // Save version information fetched
+//! let version = UpdateVersion {
+//! e_tag: r.meta.e_tag.clone(),
+//! version: r.meta.version.clone(),
+//! };
+//!
+//! // Compute new version of object contents
+//! let new = do_update(r.bytes().await.unwrap());
+//!
+//! // Attempt to commit transaction
+//! match store.put_opts(&path, new,
PutMode::Update(version).into()).await {
+//! Ok(_) => break, // Successfully committed
+//! Err(Error::Precondition { .. }) => continue, // Object has
changed, try again
+//! Err(e) => panic!("{e}")
+//! }
+//! }
+//! # }
+//! ```
+//!
+//! [Optimistic Concurrency Control]:
https://en.wikipedia.org/wiki/Optimistic_concurrency_control
+//! [Apache Iceberg]: https://iceberg.apache.org/
+//! [Delta Lake]: https://delta.io/
+//!
#[cfg(all(
target_arch = "wasm32",