alamb commented on code in PR #6612:
URL: https://github.com/apache/arrow-rs/pull/6612#discussion_r1818843309


##########
parquet/src/arrow/async_reader/store.rs:
##########
@@ -180,10 +231,69 @@ mod tests {
                 let err = e.to_string();
                 assert!(
                     err.contains("not found: No such file or directory (os 
error 2)"),
-                    "{}",
-                    err
+                    "{err}",
                 );
             }
         }
     }
+
+    #[tokio::test]
+    // We need to mark this with the `target_has_atomic` because the 
spawned_tasks_count() fn is
+    // only available for that cfg
+    async fn test_runtime_is_used() {
+        let num_actions = Arc::new(AtomicUsize::new(0));
+
+        let (a1, a2) = (num_actions.clone(), num_actions.clone());
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .on_thread_park(move || {

Review Comment:
   TIL -- 
https://docs.rs/tokio/latest/tokio/runtime/struct.Builder.html#method.on_thread_park
   
   I wonder if this is assuming too much about how tokio / the reader operates. 
For example, does it assume that only one thread is used to run the tasks from 
the parquet reader?



##########
parquet/src/arrow/async_reader/store.rs:
##########
@@ -180,10 +231,69 @@ mod tests {
                 let err = e.to_string();
                 assert!(
                     err.contains("not found: No such file or directory (os 
error 2)"),
-                    "{}",
-                    err
+                    "{err}",
                 );
             }
         }
     }
+
+    #[tokio::test]
+    // We need to mark this with the `target_has_atomic` because the 
spawned_tasks_count() fn is
+    // only available for that cfg
+    async fn test_runtime_is_used() {

Review Comment:
   An alternate testing strategy that might be easier to verify / less 
dependent on the tokio runtime behavior could be:
   1. Create a new runtime 
   2. Get a handle and register it with the parquet reader
   3. Shutdown the runtime
   4. Try and use the parquet reader 
   
   The test could be marked as `#should_panic` that the runtime was shutdown
   
   



##########
parquet/src/arrow/async_reader/store.rs:
##########
@@ -180,10 +231,69 @@ mod tests {
                 let err = e.to_string();
                 assert!(
                     err.contains("not found: No such file or directory (os 
error 2)"),
-                    "{}",
-                    err
+                    "{err}",
                 );
             }
         }
     }
+
+    #[tokio::test]
+    // We need to mark this with the `target_has_atomic` because the 
spawned_tasks_count() fn is
+    // only available for that cfg
+    async fn test_runtime_is_used() {
+        let num_actions = Arc::new(AtomicUsize::new(0));
+
+        let (a1, a2) = (num_actions.clone(), num_actions.clone());
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .on_thread_park(move || {
+                a1.fetch_add(1, Ordering::Relaxed);
+            })
+            .on_thread_unpark(move || {
+                a2.fetch_add(1, Ordering::Relaxed);
+            })
+            .build()
+            .unwrap();
+
+        let (meta, store) = get_meta_store().await;
+
+        let initial_actions = num_actions.load(Ordering::Relaxed);
+
+        let reader = ParquetObjectReader::new(store, 
meta).with_runtime(rt.handle().clone());
+
+        let builder = 
ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
+        let batches: Vec<_> = 
builder.build().unwrap().try_collect().await.unwrap();
+
+        // Just copied these assert_eqs from the `test_timple` above
+        assert_eq!(batches.len(), 1);
+        assert_eq!(batches[0].num_rows(), 8);
+
+        assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);
+
+        // Runtimes have to be dropped in blocking contexts, so we need to 
move this one to a new
+        // blocking thread to drop it.
+        tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
+    }
+
+    #[tokio::test]
+    async fn test_runtime_thread_id_different() {

Review Comment:
   ```suggestion
       /// Unit test that `ParquetObjectReader::spawn`spawns on the provided 
runtime
       #[tokio::test]
       async fn test_runtime_thread_id_different() {
   ```
   
   
   
   👍 
   
   This is a good unit test of `ParquetObjectReader::spawn` -- 
   
   I like how the previous test verifies that the relevant functions in 
`ParquetObjectReader` call `spawn` rather than running directly on the same 
runtime
   



##########
parquet/src/arrow/async_reader/store.rs:
##########
@@ -180,10 +231,69 @@ mod tests {
                 let err = e.to_string();
                 assert!(
                     err.contains("not found: No such file or directory (os 
error 2)"),
-                    "{}",
-                    err
+                    "{err}",
                 );
             }
         }
     }
+
+    #[tokio::test]
+    // We need to mark this with the `target_has_atomic` because the 
spawned_tasks_count() fn is
+    // only available for that cfg
+    async fn test_runtime_is_used() {
+        let num_actions = Arc::new(AtomicUsize::new(0));
+
+        let (a1, a2) = (num_actions.clone(), num_actions.clone());
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .on_thread_park(move || {
+                a1.fetch_add(1, Ordering::Relaxed);
+            })
+            .on_thread_unpark(move || {
+                a2.fetch_add(1, Ordering::Relaxed);
+            })
+            .build()
+            .unwrap();
+
+        let (meta, store) = get_meta_store().await;
+
+        let initial_actions = num_actions.load(Ordering::Relaxed);
+
+        let reader = ParquetObjectReader::new(store, 
meta).with_runtime(rt.handle().clone());
+
+        let builder = 
ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
+        let batches: Vec<_> = 
builder.build().unwrap().try_collect().await.unwrap();
+
+        // Just copied these assert_eqs from the `test_timple` above
+        assert_eq!(batches.len(), 1);
+        assert_eq!(batches[0].num_rows(), 8);
+
+        assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);
+
+        // Runtimes have to be dropped in blocking contexts, so we need to 
move this one to a new
+        // blocking thread to drop it.
+        tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
+    }
+
+    #[tokio::test]
+    async fn test_runtime_thread_id_different() {
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .worker_threads(1)
+            .build()
+            .unwrap();
+
+        let (meta, store) = get_meta_store().await;
+
+        let reader = ParquetObjectReader::new(store, 
meta).with_runtime(rt.handle().clone());
+
+        let current_id = std::thread::current().id();
+
+        let other_id = reader
+            .spawn(|_, _| async move { Ok::<_, 
Infallible>(std::thread::current().id()) }.boxed())
+            .await
+            .unwrap();
+
+        assert_ne!(current_id, other_id);
+
+        tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));

Review Comment:
   Can you also add unit tests for each of the three APIs in 
ParquetObjectReader that spawn is used?
   * `get_bytes`
   * `get_byte_ranges`
   * `get_metadata`?



##########
parquet/src/arrow/async_reader/store.rs:
##########
@@ -99,27 +99,62 @@ impl ParquetObjectReader {
             ..self
         }
     }
+
+    /// Perform IO on the provided tokio runtime
+    ///
+    /// Tokio is a cooperative scheduler, and relies on tasks yielding in a 
timely manner
+    /// to service IO. Therefore, running IO and CPU-bound tasks, such as 
parquet decoding,
+    /// on the same tokio runtime can lead to degraded throughput, dropped 
connections and
+    /// other issues. For more information see [here].
+    ///
+    /// [here]: 
https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
+    pub fn with_runtime(self, handle: Handle) -> Self {
+        Self {
+            runtime: Some(handle),
+            ..self
+        }
+    }
+
+    fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
+    where
+        F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, 
Result<O, E>>
+            + Send
+            + 'static,
+        O: Send + 'static,
+        E: Into<ParquetError> + Send + 'static,
+    {
+        match &self.runtime {
+            Some(handle) => {
+                let path = self.meta.location.clone();
+                let store = Arc::clone(&self.store);
+                handle
+                    .spawn(async move { f(&store, &path).await })
+                    .map_ok_or_else(
+                        |e| match e.try_into_panic() {
+                            Err(e) => Err(ParquetError::External(Box::new(e))),
+                            Ok(p) => std::panic::resume_unwind(p),
+                        },
+                        |res| res.map_err(|e| e.into()),
+                    )
+                    .boxed()
+            }
+            None => f(&self.store, &self.meta.location)
+                .map_err(|e| e.into())
+                .boxed(),
+        }
+    }
 }
 
 impl AsyncFileReader for ParquetObjectReader {
     fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, 
Result<Bytes>> {
-        self.store
-            .get_range(&self.meta.location, range)
-            .map_err(|e| e.into())
-            .boxed()
+        self.spawn(|store, path| store.get_range(path, range))
     }
 
     fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, 
Result<Vec<Bytes>>>
     where
         Self: Send,
     {
-        async move {
-            self.store
-                .get_ranges(&self.meta.location, &ranges)
-                .await
-                .map_err(|e| e.into())
-        }
-        .boxed()
+        self.spawn(|store, path| async move { store.get_ranges(path, 
&ranges).await }.boxed())

Review Comment:
   I believe we also need to wrap the future returned by `get_metadata` as well 
in `spawn`



##########
parquet/src/arrow/async_reader/store.rs:
##########
@@ -180,10 +231,69 @@ mod tests {
                 let err = e.to_string();
                 assert!(
                     err.contains("not found: No such file or directory (os 
error 2)"),
-                    "{}",
-                    err
+                    "{err}",
                 );
             }
         }
     }
+
+    #[tokio::test]
+    // We need to mark this with the `target_has_atomic` because the 
spawned_tasks_count() fn is

Review Comment:
   I don't understand this comment -- I don't see any other reference to 
`target_has_atomic` in this PR 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to