alamb commented on code in PR #6612:
URL: https://github.com/apache/arrow-rs/pull/6612#discussion_r1826547937


##########
parquet/src/arrow/async_reader/store.rs:
##########
@@ -180,10 +237,86 @@ mod tests {
                 let err = e.to_string();
                 assert!(
                     err.contains("not found: No such file or directory (os 
error 2)"),
-                    "{}",
-                    err
+                    "{err}",
                 );
             }
         }
     }
+
+    #[tokio::test]
+    async fn test_runtime_is_used() {
+        let num_actions = Arc::new(AtomicUsize::new(0));
+
+        let (a1, a2) = (num_actions.clone(), num_actions.clone());
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .on_thread_park(move || {
+                a1.fetch_add(1, Ordering::Relaxed);
+            })
+            .on_thread_unpark(move || {
+                a2.fetch_add(1, Ordering::Relaxed);
+            })
+            .build()
+            .unwrap();
+
+        let (meta, store) = get_meta_store().await;
+
+        let initial_actions = num_actions.load(Ordering::Relaxed);
+
+        let reader = ParquetObjectReader::new(store, 
meta).with_runtime(rt.handle().clone());
+
+        let builder = 
ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
+        let batches: Vec<_> = 
builder.build().unwrap().try_collect().await.unwrap();
+
+        // Just copied these assert_eqs from the `test_simple` above
+        assert_eq!(batches.len(), 1);
+        assert_eq!(batches[0].num_rows(), 8);
+
+        assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);
+
+        // Runtimes have to be dropped in blocking contexts, so we need to 
move this one to a new
+        // blocking thread to drop it.
+        tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
+    }
+
+    /// Unit test that `ParquetObjectReader::spawn`spawns on the provided 
runtime
+    #[tokio::test]
+    async fn test_runtime_thread_id_different() {
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .worker_threads(1)
+            .build()
+            .unwrap();
+
+        let (meta, store) = get_meta_store().await;
+
+        let reader = ParquetObjectReader::new(store, 
meta).with_runtime(rt.handle().clone());
+
+        let current_id = std::thread::current().id();
+
+        let other_id = reader
+            .spawn(|_, _| async move { Ok::<_, 
Infallible>(std::thread::current().id()) }.boxed())

Review Comment:
   I had this repo checked out and in the editor, so I just made this change to 
accelerate getting this PR in in 8d24cd79cb
   
   It results in a nice simplification



##########
parquet/src/errors.rs:
##########
@@ -107,6 +107,13 @@ impl From<str::Utf8Error> for ParquetError {
     }
 }
 
+#[cfg(test)]
+impl From<std::convert::Infallible> for ParquetError {

Review Comment:
   Removed in 8d24cd79cb



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to