This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 2c9e2e9a95 Add ObjectStore BufReader (#4762) (#4857)
2c9e2e9a95 is described below
commit 2c9e2e9a95b9defd59d4ad59970b87a6fb7fa58c
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Sep 25 18:24:15 2023 +0100
Add ObjectStore BufReader (#4762) (#4857)
* Add ObjectStore BufReader (#4762)
* Clippy
* More Clippy
* Fix MSRV
* Fix doc
---
object_store/src/buffered.rs | 293 +++++++++++++++++++++++++++++++++++++++++++
object_store/src/lib.rs | 1 +
2 files changed, 294 insertions(+)
diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs
new file mode 100644
index 0000000000..bdc3f4c772
--- /dev/null
+++ b/object_store/src/buffered.rs
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilities for performing tokio-style buffered IO
+
+use crate::path::Path;
+use crate::{ObjectMeta, ObjectStore};
+use bytes::Bytes;
+use futures::future::{BoxFuture, FutureExt};
+use futures::ready;
+use std::cmp::Ordering;
+use std::io::{Error, ErrorKind, SeekFrom};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, ReadBuf};
+
+/// The default buffer size used by [`BufReader`]
+pub const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024;
+
+/// An async-buffered reader compatible with the tokio IO traits
+///
+/// Internally this maintains a buffer of the requested size, and uses
[`ObjectStore::get_range`]
+/// to populate its internal buffer once depleted. This buffer is cleared on
seek.
+///
+/// Whilst simple, this interface will typically be outperformed by the native
[`ObjectStore`]
+/// methods that better map to the network APIs. This is because most object
stores have
+/// very [high first-byte latencies], on the order of 100-200ms, and so
avoiding unnecessary
+/// round-trips is critical to throughput.
+///
+/// Systems looking to sequentially scan a file should instead consider using
[`ObjectStore::get`],
+/// or [`ObjectStore::get_opts`], or [`ObjectStore::get_range`] to read a
particular range.
+///
+/// Systems looking to read multiple ranges of a file should instead consider
using
+/// [`ObjectStore::get_ranges`], which will optimise the vectored IO.
+///
+/// [high first-byte latencies]:
https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html
+pub struct BufReader {
+ /// The object store to fetch data from
+ store: Arc<dyn ObjectStore>,
+ /// The size of the object
+ size: u64,
+ /// The path to the object
+ path: Path,
+ /// The current position in the object
+ cursor: u64,
+ /// The number of bytes to read in a single request
+ capacity: usize,
+ /// The buffered data if any
+ buffer: Buffer,
+}
+
+impl std::fmt::Debug for BufReader {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("BufReader")
+ .field("path", &self.path)
+ .field("size", &self.size)
+ .field("capacity", &self.capacity)
+ .finish()
+ }
+}
+
+enum Buffer {
+ Empty,
+ Pending(BoxFuture<'static, std::io::Result<Bytes>>),
+ Ready(Bytes),
+}
+
+impl BufReader {
+ /// Create a new [`BufReader`] from the provided [`ObjectMeta`] and
[`ObjectStore`]
+ pub fn new(store: Arc<dyn ObjectStore>, meta: &ObjectMeta) -> Self {
+ Self::with_capacity(store, meta, DEFAULT_BUFFER_SIZE)
+ }
+
+ /// Create a new [`BufReader`] from the provided [`ObjectMeta`],
[`ObjectStore`], and `capacity`
+ pub fn with_capacity(
+ store: Arc<dyn ObjectStore>,
+ meta: &ObjectMeta,
+ capacity: usize,
+ ) -> Self {
+ Self {
+ path: meta.location.clone(),
+ size: meta.size as _,
+ store,
+ capacity,
+ cursor: 0,
+ buffer: Buffer::Empty,
+ }
+ }
+
+ fn poll_fill_buf_impl(
+ &mut self,
+ cx: &mut Context<'_>,
+ amnt: usize,
+ ) -> Poll<std::io::Result<&[u8]>> {
+ let buf = &mut self.buffer;
+ loop {
+ match buf {
+ Buffer::Empty => {
+ let store = Arc::clone(&self.store);
+ let path = self.path.clone();
+ let start = self.cursor.min(self.size) as _;
+ let end = self.cursor.saturating_add(amnt as
u64).min(self.size) as _;
+
+ if start == end {
+ return Poll::Ready(Ok(&[]));
+ }
+
+ *buf = Buffer::Pending(Box::pin(async move {
+ Ok(store.get_range(&path, start..end).await?)
+ }))
+ }
+ Buffer::Pending(fut) => match ready!(fut.poll_unpin(cx)) {
+ Ok(b) => *buf = Buffer::Ready(b),
+ Err(e) => return Poll::Ready(Err(e)),
+ },
+ Buffer::Ready(r) => return Poll::Ready(Ok(r)),
+ }
+ }
+ }
+}
+
+impl AsyncSeek for BufReader {
+ fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) ->
std::io::Result<()> {
+ self.cursor = match position {
+ SeekFrom::Start(offset) => offset,
+ SeekFrom::End(offset) => {
+ checked_add_signed(self.size,offset).ok_or_else(||
Error::new(ErrorKind::InvalidInput, format!("Seeking {offset} from end of {}
byte file would result in overflow", self.size)))?
+ }
+ SeekFrom::Current(offset) => {
+ checked_add_signed(self.cursor, offset).ok_or_else(||
Error::new(ErrorKind::InvalidInput, format!("Seeking {offset} from current
offset of {} would result in overflow", self.cursor)))?
+ }
+ };
+ self.buffer = Buffer::Empty;
+ Ok(())
+ }
+
+ fn poll_complete(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<std::io::Result<u64>> {
+ Poll::Ready(Ok(self.cursor))
+ }
+}
+
+impl AsyncRead for BufReader {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ out: &mut ReadBuf<'_>,
+ ) -> Poll<std::io::Result<()>> {
+ // Read the maximum of the internal buffer and `out`
+ let to_read = out.remaining().max(self.capacity);
+ let r = match ready!(self.poll_fill_buf_impl(cx, to_read)) {
+ Ok(buf) => {
+ let to_consume = out.remaining().min(buf.len());
+ out.put_slice(&buf[..to_consume]);
+ self.consume(to_consume);
+ Ok(())
+ }
+ Err(e) => Err(e),
+ };
+ Poll::Ready(r)
+ }
+}
+
+impl AsyncBufRead for BufReader {
+ fn poll_fill_buf(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<std::io::Result<&[u8]>> {
+ let capacity = self.capacity;
+ self.get_mut().poll_fill_buf_impl(cx, capacity)
+ }
+
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ match &mut self.buffer {
+ Buffer::Empty => assert_eq!(amt, 0, "cannot consume from empty
buffer"),
+ Buffer::Ready(b) => match b.len().cmp(&amt) {
+ Ordering::Less => panic!("{amt} exceeds buffer sized of {}",
b.len()),
+ Ordering::Greater => *b = b.slice(amt..),
+ Ordering::Equal => self.buffer = Buffer::Empty,
+ },
+ Buffer::Pending(_) => panic!("cannot consume from pending buffer"),
+ }
+ self.cursor += amt as u64;
+ }
+}
+
+/// Port of standardised function as requires Rust 1.66
+///
+///
<https://github.com/rust-lang/rust/pull/87601/files#diff-b9390ee807a1dae3c3128dce36df56748ad8d23c6e361c0ebba4d744bf6efdb9R1533>
+#[inline]
+fn checked_add_signed(a: u64, rhs: i64) -> Option<u64> {
+ let (res, overflowed) = a.overflowing_add(rhs as _);
+ let overflow = overflowed ^ (rhs < 0);
+ (!overflow).then_some(res)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::memory::InMemory;
+ use crate::path::Path;
+ use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt};
+
+ #[tokio::test]
+ async fn test_buf_reader() {
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+ let existent = Path::from("exists.txt");
+ const BYTES: usize = 4096;
+
+ let data: Bytes =
b"12345678".iter().cycle().copied().take(BYTES).collect();
+ store.put(&existent, data.clone()).await.unwrap();
+
+ let meta = store.head(&existent).await.unwrap();
+
+ let mut reader = BufReader::new(Arc::clone(&store), &meta);
+ let mut out = Vec::with_capacity(BYTES);
+ let read = reader.read_to_end(&mut out).await.unwrap();
+
+ assert_eq!(read, BYTES);
+ assert_eq!(&out, &data);
+
+ let err = reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap_err();
+ assert_eq!(err.to_string(), "Seeking -9223372036854775808 from current
offset of 4096 would result in overflow");
+
+ reader.rewind().await.unwrap();
+
+ let err = reader.seek(SeekFrom::Current(-1)).await.unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Seeking -1 from current offset of 0 would result in overflow"
+ );
+
+ // Seeking beyond the bounds of the file is permitted but should
return no data
+ reader.seek(SeekFrom::Start(u64::MAX)).await.unwrap();
+ let buf = reader.fill_buf().await.unwrap();
+ assert!(buf.is_empty());
+
+ let err = reader.seek(SeekFrom::Current(1)).await.unwrap_err();
+ assert_eq!(err.to_string(), "Seeking 1 from current offset of
18446744073709551615 would result in overflow");
+
+ for capacity in [200, 1024, 4096, DEFAULT_BUFFER_SIZE] {
+ let store = Arc::clone(&store);
+ let mut reader = BufReader::with_capacity(store, &meta, capacity);
+
+ let mut bytes_read = 0;
+ loop {
+ let buf = reader.fill_buf().await.unwrap();
+ if buf.is_empty() {
+ assert_eq!(bytes_read, BYTES);
+ break;
+ }
+ assert!(buf.starts_with(b"12345678"));
+ bytes_read += 8;
+ reader.consume(8);
+ }
+
+ let mut buf = Vec::with_capacity(76);
+ reader.seek(SeekFrom::Current(-76)).await.unwrap();
+ reader.read_to_end(&mut buf).await.unwrap();
+ assert_eq!(&buf, &data[BYTES - 76..]);
+
+ reader.rewind().await.unwrap();
+ let buffer = reader.fill_buf().await.unwrap();
+ assert_eq!(buffer, &data[..capacity.min(BYTES)]);
+
+ reader.seek(SeekFrom::Start(325)).await.unwrap();
+ let buffer = reader.fill_buf().await.unwrap();
+ assert_eq!(buffer, &data[325..(325 + capacity).min(BYTES)]);
+
+ reader.seek(SeekFrom::End(0)).await.unwrap();
+ let buffer = reader.fill_buf().await.unwrap();
+ assert!(buffer.is_empty());
+ }
+ }
+}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index cef10f1dd4..3fd363fd4f 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -253,6 +253,7 @@ compile_error!("Features 'gcp', 'aws', 'azure', 'http' are
not supported on wasm
pub mod aws;
#[cfg(feature = "azure")]
pub mod azure;
+pub mod buffered;
#[cfg(not(target_arch = "wasm32"))]
pub mod chunked;
pub mod delimited;