This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 22bc772177 Add `ParquetObjectReader::with_runtime` (#6612)
22bc772177 is described below

commit 22bc77217797968dee34fb8fe4a6534dc04a2144
Author: June <[email protected]>
AuthorDate: Sat Nov 2 05:41:45 2024 -0600

    Add `ParquetObjectReader::with_runtime` (#6612)
    
    * Add ParquetObjectReader::with_runtime (#6248)
    
    * Add test for ParquetObjectReader::with_runtime and fix clippy complaints
    
    * Switch ParquetObjectReader runtime tests to not depend on tokio_unstable 
anymore
    
    * Add doc-comment for test_runtime_thread_id_different
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * - Add comment about why we don't use spawn for metadata
    - Remove outdated comment about target_has_atomic
    - Add test to verify reader fails when spawned on a shutdown runtime
    
    * Avoid use of Infallable and From conversion
    
    ---------
    
    Co-authored-by: Raphael Taylor-Davies <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 arrow-json/src/writer/encoder.rs        |   2 +-
 arrow-string/src/predicate.rs           |   2 +-
 parquet/Cargo.toml                      |   2 +-
 parquet/src/arrow/async_reader/store.rs | 187 +++++++++++++++++++++++++++-----
 parquet/src/errors.rs                   |   1 -
 5 files changed, 162 insertions(+), 32 deletions(-)

diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs
index 84ed384cfd..ed430fe6a1 100644
--- a/arrow-json/src/writer/encoder.rs
+++ b/arrow-json/src/writer/encoder.rs
@@ -454,7 +454,7 @@ impl Encoder for ArrayFormatter<'_> {
 /// A newtype wrapper around [`ArrayFormatter`] that skips surrounding the 
value with `"`
 struct RawArrayFormatter<'a>(ArrayFormatter<'a>);
 
-impl<'a> Encoder for RawArrayFormatter<'a> {
+impl Encoder for RawArrayFormatter<'_> {
     fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
         let _ = write!(out, "{}", self.0.value(idx));
     }
diff --git a/arrow-string/src/predicate.rs b/arrow-string/src/predicate.rs
index f559088e6c..408d9d45cc 100644
--- a/arrow-string/src/predicate.rs
+++ b/arrow-string/src/predicate.rs
@@ -239,7 +239,7 @@ fn equals_kernel((n, h): (&u8, &u8)) -> bool {
 }
 
 fn equals_ignore_ascii_case_kernel((n, h): (&u8, &u8)) -> bool {
-    n.to_ascii_lowercase() == h.to_ascii_lowercase()
+    n.eq_ignore_ascii_case(h)
 }
 
 /// Transforms a like `pattern` to a regex compatible pattern. To achieve 
that, it does:
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 32bc13b62a..133b5b212b 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -81,7 +81,7 @@ lz4_flex = { version = "0.11", default-features = false, 
features = ["std", "fra
 zstd = { version = "0.13", default-features = false }
 serde_json = { version = "1.0", features = ["std"], default-features = false }
 arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", 
"json"] }
-tokio = { version = "1.0", default-features = false, features = ["macros", 
"rt", "io-util", "fs"] }
+tokio = { version = "1.0", default-features = false, features = ["macros", 
"rt-multi-thread", "io-util", "fs"] }
 rand = { version = "0.8", default-features = false, features = ["std", 
"std_rng"] }
 object_store = { version = "0.11.0", default-features = false, features = 
["azure"] }
 
diff --git a/parquet/src/arrow/async_reader/store.rs 
b/parquet/src/arrow/async_reader/store.rs
index e6b47856eb..fd0397b5e1 100644
--- a/parquet/src/arrow/async_reader/store.rs
+++ b/parquet/src/arrow/async_reader/store.rs
@@ -15,17 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::ops::Range;
-use std::sync::Arc;
+use std::{ops::Range, sync::Arc};
 
 use bytes::Bytes;
-use futures::future::BoxFuture;
-use futures::{FutureExt, TryFutureExt};
-
-use object_store::{ObjectMeta, ObjectStore};
+use futures::{future::BoxFuture, FutureExt, TryFutureExt};
+use object_store::{path::Path, ObjectMeta, ObjectStore};
+use tokio::runtime::Handle;
 
 use crate::arrow::async_reader::AsyncFileReader;
-use crate::errors::Result;
+use crate::errors::{ParquetError, Result};
 use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
 
 /// Reads Parquet files in object storage using [`ObjectStore`].
@@ -59,6 +57,7 @@ pub struct ParquetObjectReader {
     metadata_size_hint: Option<usize>,
     preload_column_index: bool,
     preload_offset_index: bool,
+    runtime: Option<Handle>,
 }
 
 impl ParquetObjectReader {
@@ -72,6 +71,7 @@ impl ParquetObjectReader {
             metadata_size_hint: None,
             preload_column_index: false,
             preload_offset_index: false,
+            runtime: None,
         }
     }
 
@@ -99,29 +99,70 @@ impl ParquetObjectReader {
             ..self
         }
     }
+
+    /// Perform IO on the provided tokio runtime
+    ///
+    /// Tokio is a cooperative scheduler, and relies on tasks yielding in a 
timely manner
+    /// to service IO. Therefore, running IO and CPU-bound tasks, such as 
parquet decoding,
+    /// on the same tokio runtime can lead to degraded throughput, dropped 
connections and
+    /// other issues. For more information see [here].
+    ///
+    /// [here]: 
https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
+    pub fn with_runtime(self, handle: Handle) -> Self {
+        Self {
+            runtime: Some(handle),
+            ..self
+        }
+    }
+
+    fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
+    where
+        F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, 
Result<O, E>>
+            + Send
+            + 'static,
+        O: Send + 'static,
+        E: Into<ParquetError> + Send + 'static,
+    {
+        match &self.runtime {
+            Some(handle) => {
+                let path = self.meta.location.clone();
+                let store = Arc::clone(&self.store);
+                handle
+                    .spawn(async move { f(&store, &path).await })
+                    .map_ok_or_else(
+                        |e| match e.try_into_panic() {
+                            Err(e) => Err(ParquetError::External(Box::new(e))),
+                            Ok(p) => std::panic::resume_unwind(p),
+                        },
+                        |res| res.map_err(|e| e.into()),
+                    )
+                    .boxed()
+            }
+            None => f(&self.store, &self.meta.location)
+                .map_err(|e| e.into())
+                .boxed(),
+        }
+    }
 }
 
 impl AsyncFileReader for ParquetObjectReader {
     fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, 
Result<Bytes>> {
-        self.store
-            .get_range(&self.meta.location, range)
-            .map_err(|e| e.into())
-            .boxed()
+        self.spawn(|store, path| store.get_range(path, range))
     }
 
     fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, 
Result<Vec<Bytes>>>
     where
         Self: Send,
     {
-        async move {
-            self.store
-                .get_ranges(&self.meta.location, &ranges)
-                .await
-                .map_err(|e| e.into())
-        }
-        .boxed()
+        self.spawn(|store, path| async move { store.get_ranges(path, 
&ranges).await }.boxed())
     }
 
+    // This method doesn't directly call `self.spawn` because all of the IO 
that is done down the
+    // line due to this method call is done through `self.get_bytes` and/or 
`self.get_byte_ranges`.
+    // When `self` is passed into `ParquetMetaDataReader::load_and_finish`, it 
treats it as
+    // an `impl MetadataFetch` and calls those methods to get data from it. 
Due to `Self`'s impl of
+    // `AsyncFileReader`, the calls to `MetadataFetch::fetch` are just 
delegated to
+    // `Self::get_bytes`.
     fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
         Box::pin(async move {
             let file_size = self.meta.size;
@@ -138,30 +179,40 @@ impl AsyncFileReader for ParquetObjectReader {
 
 #[cfg(test)]
 mod tests {
-    use std::sync::Arc;
+    use std::sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc,
+    };
 
     use futures::TryStreamExt;
 
     use arrow::util::test_util::parquet_test_data;
+    use futures::FutureExt;
     use object_store::local::LocalFileSystem;
     use object_store::path::Path;
-    use object_store::ObjectStore;
+    use object_store::{ObjectMeta, ObjectStore};
 
-    use crate::arrow::async_reader::ParquetObjectReader;
+    use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
     use crate::arrow::ParquetRecordBatchStreamBuilder;
+    use crate::errors::ParquetError;
 
-    #[tokio::test]
-    async fn test_simple() {
+    async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
         let res = parquet_test_data();
         let store = LocalFileSystem::new_with_prefix(res).unwrap();
 
-        let mut meta = store
+        let meta = store
             .head(&Path::from("alltypes_plain.parquet"))
             .await
             .unwrap();
 
-        let store = Arc::new(store) as Arc<dyn ObjectStore>;
-        let object_reader = ParquetObjectReader::new(Arc::clone(&store), 
meta.clone());
+        (meta, Arc::new(store) as Arc<dyn ObjectStore>)
+    }
+
+    #[tokio::test]
+    async fn test_simple() {
+        let (meta, store) = get_meta_store().await;
+        let object_reader = ParquetObjectReader::new(store, meta);
+
         let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
             .await
             .unwrap();
@@ -169,7 +220,11 @@ mod tests {
 
         assert_eq!(batches.len(), 1);
         assert_eq!(batches[0].num_rows(), 8);
+    }
 
+    #[tokio::test]
+    async fn test_not_found() {
+        let (mut meta, store) = get_meta_store().await;
         meta.location = Path::from("I don't exist.parquet");
 
         let object_reader = ParquetObjectReader::new(store, meta);
@@ -180,10 +235,86 @@ mod tests {
                 let err = e.to_string();
                 assert!(
                     err.contains("not found: No such file or directory (os 
error 2)"),
-                    "{}",
-                    err
+                    "{err}",
                 );
             }
         }
     }
+
+    #[tokio::test]
+    async fn test_runtime_is_used() {
+        let num_actions = Arc::new(AtomicUsize::new(0));
+
+        let (a1, a2) = (num_actions.clone(), num_actions.clone());
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .on_thread_park(move || {
+                a1.fetch_add(1, Ordering::Relaxed);
+            })
+            .on_thread_unpark(move || {
+                a2.fetch_add(1, Ordering::Relaxed);
+            })
+            .build()
+            .unwrap();
+
+        let (meta, store) = get_meta_store().await;
+
+        let initial_actions = num_actions.load(Ordering::Relaxed);
+
+        let reader = ParquetObjectReader::new(store, 
meta).with_runtime(rt.handle().clone());
+
+        let builder = 
ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
+        let batches: Vec<_> = 
builder.build().unwrap().try_collect().await.unwrap();
+
+        // Just copied these assert_eqs from the `test_simple` above
+        assert_eq!(batches.len(), 1);
+        assert_eq!(batches[0].num_rows(), 8);
+
+        assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);
+
+        // Runtimes have to be dropped in blocking contexts, so we need to 
move this one to a new
+        // blocking thread to drop it.
+        tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
+    }
+
+    /// Unit test that `ParquetObjectReader::spawn`spawns on the provided 
runtime
+    #[tokio::test]
+    async fn test_runtime_thread_id_different() {
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .worker_threads(1)
+            .build()
+            .unwrap();
+
+        let (meta, store) = get_meta_store().await;
+
+        let reader = ParquetObjectReader::new(store, 
meta).with_runtime(rt.handle().clone());
+
+        let current_id = std::thread::current().id();
+
+        let other_id = reader
+            .spawn(|_, _| async move { Ok::<_, 
ParquetError>(std::thread::current().id()) }.boxed())
+            .await
+            .unwrap();
+
+        assert_ne!(current_id, other_id);
+
+        tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
+    }
+
+    #[tokio::test]
+    async fn io_fails_on_shutdown_runtime() {
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .worker_threads(1)
+            .build()
+            .unwrap();
+
+        let (meta, store) = get_meta_store().await;
+
+        let mut reader = ParquetObjectReader::new(store, 
meta).with_runtime(rt.handle().clone());
+
+        rt.shutdown_background();
+
+        let err = reader.get_bytes(0..1).await.unwrap_err().to_string();
+
+        assert!(err.to_string().contains("was cancelled"));
+    }
 }
diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs
index bb4d2543c7..6adbffa2a2 100644
--- a/parquet/src/errors.rs
+++ b/parquet/src/errors.rs
@@ -106,7 +106,6 @@ impl From<str::Utf8Error> for ParquetError {
         ParquetError::External(Box::new(e))
     }
 }
-
 #[cfg(feature = "arrow")]
 impl From<ArrowError> for ParquetError {
     fn from(e: ArrowError) -> ParquetError {

Reply via email to