This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 22bc772177 Add `ParquetObjectReader::with_runtime` (#6612)
22bc772177 is described below
commit 22bc77217797968dee34fb8fe4a6534dc04a2144
Author: June <[email protected]>
AuthorDate: Sat Nov 2 05:41:45 2024 -0600
Add `ParquetObjectReader::with_runtime` (#6612)
* Add ParquetObjectReader::with_runtime (#6248)
* Add test for ParquetObjectReader::with_runtime and fix clippy complaints
* Switch ParquetObjectReader runtime tests to not depend on tokio_unstable
anymore
* Add doc-comment for test_runtime_thread_id_different
Co-authored-by: Andrew Lamb <[email protected]>
* - Add comment about why we don't use spawn for metadata
- Remove outdated comment about target_has_atomic
- Add test to verify reader fails when spawned on a shutdown runtime
* Avoid use of Infallable and From conversion
---------
Co-authored-by: Raphael Taylor-Davies <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
arrow-json/src/writer/encoder.rs | 2 +-
arrow-string/src/predicate.rs | 2 +-
parquet/Cargo.toml | 2 +-
parquet/src/arrow/async_reader/store.rs | 187 +++++++++++++++++++++++++++-----
parquet/src/errors.rs | 1 -
5 files changed, 162 insertions(+), 32 deletions(-)
diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs
index 84ed384cfd..ed430fe6a1 100644
--- a/arrow-json/src/writer/encoder.rs
+++ b/arrow-json/src/writer/encoder.rs
@@ -454,7 +454,7 @@ impl Encoder for ArrayFormatter<'_> {
/// A newtype wrapper around [`ArrayFormatter`] that skips surrounding the
value with `"`
struct RawArrayFormatter<'a>(ArrayFormatter<'a>);
-impl<'a> Encoder for RawArrayFormatter<'a> {
+impl Encoder for RawArrayFormatter<'_> {
fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
let _ = write!(out, "{}", self.0.value(idx));
}
diff --git a/arrow-string/src/predicate.rs b/arrow-string/src/predicate.rs
index f559088e6c..408d9d45cc 100644
--- a/arrow-string/src/predicate.rs
+++ b/arrow-string/src/predicate.rs
@@ -239,7 +239,7 @@ fn equals_kernel((n, h): (&u8, &u8)) -> bool {
}
fn equals_ignore_ascii_case_kernel((n, h): (&u8, &u8)) -> bool {
- n.to_ascii_lowercase() == h.to_ascii_lowercase()
+ n.eq_ignore_ascii_case(h)
}
/// Transforms a like `pattern` to a regex compatible pattern. To achieve
that, it does:
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 32bc13b62a..133b5b212b 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -81,7 +81,7 @@ lz4_flex = { version = "0.11", default-features = false,
features = ["std", "fra
zstd = { version = "0.13", default-features = false }
serde_json = { version = "1.0", features = ["std"], default-features = false }
arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint",
"json"] }
-tokio = { version = "1.0", default-features = false, features = ["macros",
"rt", "io-util", "fs"] }
+tokio = { version = "1.0", default-features = false, features = ["macros",
"rt-multi-thread", "io-util", "fs"] }
rand = { version = "0.8", default-features = false, features = ["std",
"std_rng"] }
object_store = { version = "0.11.0", default-features = false, features =
["azure"] }
diff --git a/parquet/src/arrow/async_reader/store.rs
b/parquet/src/arrow/async_reader/store.rs
index e6b47856eb..fd0397b5e1 100644
--- a/parquet/src/arrow/async_reader/store.rs
+++ b/parquet/src/arrow/async_reader/store.rs
@@ -15,17 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-use std::ops::Range;
-use std::sync::Arc;
+use std::{ops::Range, sync::Arc};
use bytes::Bytes;
-use futures::future::BoxFuture;
-use futures::{FutureExt, TryFutureExt};
-
-use object_store::{ObjectMeta, ObjectStore};
+use futures::{future::BoxFuture, FutureExt, TryFutureExt};
+use object_store::{path::Path, ObjectMeta, ObjectStore};
+use tokio::runtime::Handle;
use crate::arrow::async_reader::AsyncFileReader;
-use crate::errors::Result;
+use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
/// Reads Parquet files in object storage using [`ObjectStore`].
@@ -59,6 +57,7 @@ pub struct ParquetObjectReader {
metadata_size_hint: Option<usize>,
preload_column_index: bool,
preload_offset_index: bool,
+ runtime: Option<Handle>,
}
impl ParquetObjectReader {
@@ -72,6 +71,7 @@ impl ParquetObjectReader {
metadata_size_hint: None,
preload_column_index: false,
preload_offset_index: false,
+ runtime: None,
}
}
@@ -99,29 +99,70 @@ impl ParquetObjectReader {
..self
}
}
+
+ /// Perform IO on the provided tokio runtime
+ ///
+ /// Tokio is a cooperative scheduler, and relies on tasks yielding in a
timely manner
+ /// to service IO. Therefore, running IO and CPU-bound tasks, such as
parquet decoding,
+ /// on the same tokio runtime can lead to degraded throughput, dropped
connections and
+ /// other issues. For more information see [here].
+ ///
+ /// [here]:
https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
+ pub fn with_runtime(self, handle: Handle) -> Self {
+ Self {
+ runtime: Some(handle),
+ ..self
+ }
+ }
+
+ fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
+ where
+ F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a,
Result<O, E>>
+ + Send
+ + 'static,
+ O: Send + 'static,
+ E: Into<ParquetError> + Send + 'static,
+ {
+ match &self.runtime {
+ Some(handle) => {
+ let path = self.meta.location.clone();
+ let store = Arc::clone(&self.store);
+ handle
+ .spawn(async move { f(&store, &path).await })
+ .map_ok_or_else(
+ |e| match e.try_into_panic() {
+ Err(e) => Err(ParquetError::External(Box::new(e))),
+ Ok(p) => std::panic::resume_unwind(p),
+ },
+ |res| res.map_err(|e| e.into()),
+ )
+ .boxed()
+ }
+ None => f(&self.store, &self.meta.location)
+ .map_err(|e| e.into())
+ .boxed(),
+ }
+ }
}
impl AsyncFileReader for ParquetObjectReader {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_,
Result<Bytes>> {
- self.store
- .get_range(&self.meta.location, range)
- .map_err(|e| e.into())
- .boxed()
+ self.spawn(|store, path| store.get_range(path, range))
}
fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_,
Result<Vec<Bytes>>>
where
Self: Send,
{
- async move {
- self.store
- .get_ranges(&self.meta.location, &ranges)
- .await
- .map_err(|e| e.into())
- }
- .boxed()
+ self.spawn(|store, path| async move { store.get_ranges(path,
&ranges).await }.boxed())
}
+ // This method doesn't directly call `self.spawn` because all of the IO
that is done down the
+ // line due to this method call is done through `self.get_bytes` and/or
`self.get_byte_ranges`.
+ // When `self` is passed into `ParquetMetaDataReader::load_and_finish`, it
treats it as
+ // an `impl MetadataFetch` and calls those methods to get data from it.
Due to `Self`'s impl of
+ // `AsyncFileReader`, the calls to `MetadataFetch::fetch` are just
delegated to
+ // `Self::get_bytes`.
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let file_size = self.meta.size;
@@ -138,30 +179,40 @@ impl AsyncFileReader for ParquetObjectReader {
#[cfg(test)]
mod tests {
- use std::sync::Arc;
+ use std::sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+ };
use futures::TryStreamExt;
use arrow::util::test_util::parquet_test_data;
+ use futures::FutureExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
- use object_store::ObjectStore;
+ use object_store::{ObjectMeta, ObjectStore};
- use crate::arrow::async_reader::ParquetObjectReader;
+ use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use crate::arrow::ParquetRecordBatchStreamBuilder;
+ use crate::errors::ParquetError;
- #[tokio::test]
- async fn test_simple() {
+ async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
let res = parquet_test_data();
let store = LocalFileSystem::new_with_prefix(res).unwrap();
- let mut meta = store
+ let meta = store
.head(&Path::from("alltypes_plain.parquet"))
.await
.unwrap();
- let store = Arc::new(store) as Arc<dyn ObjectStore>;
- let object_reader = ParquetObjectReader::new(Arc::clone(&store),
meta.clone());
+ (meta, Arc::new(store) as Arc<dyn ObjectStore>)
+ }
+
+ #[tokio::test]
+ async fn test_simple() {
+ let (meta, store) = get_meta_store().await;
+ let object_reader = ParquetObjectReader::new(store, meta);
+
let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
.await
.unwrap();
@@ -169,7 +220,11 @@ mod tests {
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 8);
+ }
+ #[tokio::test]
+ async fn test_not_found() {
+ let (mut meta, store) = get_meta_store().await;
meta.location = Path::from("I don't exist.parquet");
let object_reader = ParquetObjectReader::new(store, meta);
@@ -180,10 +235,86 @@ mod tests {
let err = e.to_string();
assert!(
err.contains("not found: No such file or directory (os
error 2)"),
- "{}",
- err
+ "{err}",
);
}
}
}
+
+ #[tokio::test]
+ async fn test_runtime_is_used() {
+ let num_actions = Arc::new(AtomicUsize::new(0));
+
+ let (a1, a2) = (num_actions.clone(), num_actions.clone());
+ let rt = tokio::runtime::Builder::new_multi_thread()
+ .on_thread_park(move || {
+ a1.fetch_add(1, Ordering::Relaxed);
+ })
+ .on_thread_unpark(move || {
+ a2.fetch_add(1, Ordering::Relaxed);
+ })
+ .build()
+ .unwrap();
+
+ let (meta, store) = get_meta_store().await;
+
+ let initial_actions = num_actions.load(Ordering::Relaxed);
+
+ let reader = ParquetObjectReader::new(store,
meta).with_runtime(rt.handle().clone());
+
+ let builder =
ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
+ let batches: Vec<_> =
builder.build().unwrap().try_collect().await.unwrap();
+
+ // Just copied these assert_eqs from the `test_simple` above
+ assert_eq!(batches.len(), 1);
+ assert_eq!(batches[0].num_rows(), 8);
+
+ assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);
+
+ // Runtimes have to be dropped in blocking contexts, so we need to
move this one to a new
+ // blocking thread to drop it.
+ tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
+ }
+
+ /// Unit test that `ParquetObjectReader::spawn`spawns on the provided
runtime
+ #[tokio::test]
+ async fn test_runtime_thread_id_different() {
+ let rt = tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(1)
+ .build()
+ .unwrap();
+
+ let (meta, store) = get_meta_store().await;
+
+ let reader = ParquetObjectReader::new(store,
meta).with_runtime(rt.handle().clone());
+
+ let current_id = std::thread::current().id();
+
+ let other_id = reader
+ .spawn(|_, _| async move { Ok::<_,
ParquetError>(std::thread::current().id()) }.boxed())
+ .await
+ .unwrap();
+
+ assert_ne!(current_id, other_id);
+
+ tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
+ }
+
+ #[tokio::test]
+ async fn io_fails_on_shutdown_runtime() {
+ let rt = tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(1)
+ .build()
+ .unwrap();
+
+ let (meta, store) = get_meta_store().await;
+
+ let mut reader = ParquetObjectReader::new(store,
meta).with_runtime(rt.handle().clone());
+
+ rt.shutdown_background();
+
+ let err = reader.get_bytes(0..1).await.unwrap_err().to_string();
+
+ assert!(err.to_string().contains("was cancelled"));
+ }
}
diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs
index bb4d2543c7..6adbffa2a2 100644
--- a/parquet/src/errors.rs
+++ b/parquet/src/errors.rs
@@ -106,7 +106,6 @@ impl From<str::Utf8Error> for ParquetError {
ParquetError::External(Box::new(e))
}
}
-
#[cfg(feature = "arrow")]
impl From<ArrowError> for ParquetError {
fn from(e: ArrowError) -> ParquetError {