tustvold commented on code in PR #6612:
URL: https://github.com/apache/arrow-rs/pull/6612#discussion_r1822415739
##########
parquet/src/arrow/async_reader/store.rs:
##########
@@ -180,10 +237,86 @@ mod tests {
let err = e.to_string();
assert!(
err.contains("not found: No such file or directory (os
error 2)"),
- "{}",
- err
+ "{err}",
);
}
}
}
+
+ #[tokio::test]
+ async fn test_runtime_is_used() {
+ let num_actions = Arc::new(AtomicUsize::new(0));
+
+ let (a1, a2) = (num_actions.clone(), num_actions.clone());
+ let rt = tokio::runtime::Builder::new_multi_thread()
+ .on_thread_park(move || {
+ a1.fetch_add(1, Ordering::Relaxed);
+ })
+ .on_thread_unpark(move || {
+ a2.fetch_add(1, Ordering::Relaxed);
+ })
+ .build()
+ .unwrap();
+
+ let (meta, store) = get_meta_store().await;
+
+ let initial_actions = num_actions.load(Ordering::Relaxed);
+
+ let reader = ParquetObjectReader::new(store,
meta).with_runtime(rt.handle().clone());
+
+ let builder =
ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
+ let batches: Vec<_> =
builder.build().unwrap().try_collect().await.unwrap();
+
+ // Just copied these assert_eqs from the `test_simple` above
+ assert_eq!(batches.len(), 1);
+ assert_eq!(batches[0].num_rows(), 8);
+
+ assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);
+
+ // Runtimes have to be dropped in blocking contexts, so we need to
move this one to a new
+ // blocking thread to drop it.
+ tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
+ }
+
+ /// Unit test that `ParquetObjectReader::spawn`spawns on the provided
runtime
+ #[tokio::test]
+ async fn test_runtime_thread_id_different() {
+ let rt = tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(1)
+ .build()
+ .unwrap();
+
+ let (meta, store) = get_meta_store().await;
+
+ let reader = ParquetObjectReader::new(store,
meta).with_runtime(rt.handle().clone());
+
+ let current_id = std::thread::current().id();
+
+ let other_id = reader
+ .spawn(|_, _| async move { Ok::<_,
Infallible>(std::thread::current().id()) }.boxed())
Review Comment:
```suggestion
.spawn(|_, _| async move { Ok::<_,
ParquetError>(std::thread::current().id()) }.boxed())
```
Would remove the need for the `std::convert::Infallible` conversion
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]