alamb commented on code in PR #6393:
URL: https://github.com/apache/arrow-datafusion/pull/6393#discussion_r1204672358


##########
datafusion/core/tests/sqllogictests/src/main.rs:
##########
@@ -56,65 +57,116 @@ pub fn main() {
 
 #[tokio::main]
 #[cfg(not(target_family = "windows"))]
-pub async fn main() -> Result<(), Box<dyn Error>> {
+pub async fn main() -> Result<()> {
     run_tests().await
 }
 
-async fn run_tests() -> Result<(), Box<dyn Error>> {
+async fn run_tests() -> Result<()> {
     // Enable logging (e.g. set RUST_LOG=debug to see debug logs)
     env_logger::init();
 
     let options = Options::new();
 
-    for (path, relative_path) in read_test_files(&options) {
-        if options.complete_mode {
-            run_complete_file(&path, relative_path).await?;
-        } else if options.postgres_runner {
-            run_test_file_with_postgres(&path, relative_path).await?;
-        } else {
-            run_test_file(&path, relative_path).await?;
+    let test_files: Vec<_> = read_test_files(&options).collect();
+
+    // Run all tests in parallel, reporting failures at the end

Review Comment:
   In deb9a4e20 and 139b59422



##########
datafusion/core/tests/sqllogictests/src/main.rs:
##########
@@ -56,65 +57,116 @@ pub fn main() {
 
 #[tokio::main]
 #[cfg(not(target_family = "windows"))]
-pub async fn main() -> Result<(), Box<dyn Error>> {
+pub async fn main() -> Result<()> {
     run_tests().await
 }
 
-async fn run_tests() -> Result<(), Box<dyn Error>> {
+async fn run_tests() -> Result<()> {
     // Enable logging (e.g. set RUST_LOG=debug to see debug logs)
     env_logger::init();
 
     let options = Options::new();
 
-    for (path, relative_path) in read_test_files(&options) {
-        if options.complete_mode {
-            run_complete_file(&path, relative_path).await?;
-        } else if options.postgres_runner {
-            run_test_file_with_postgres(&path, relative_path).await?;
-        } else {
-            run_test_file(&path, relative_path).await?;
+    let test_files: Vec<_> = read_test_files(&options).collect();
+
+    // Run all tests in parallel, reporting failures at the end
+    let results: Vec<_> = futures::stream::iter(test_files)
+        .map(|test_file| {
+            tokio::task::spawn(async move {
+                println!("Running {:?}", test_file.relative_path);
+                if options.complete_mode {
+                    run_complete_file(test_file).await?;
+                } else if options.postgres_runner {
+                    run_test_file_with_postgres(test_file).await?;
+                } else {
+                    run_test_file(test_file).await?;
+                }
+                Ok(()) as Result<()>
+            })
+        })
+        // run up to num_cpus streams in parallel
+        .buffer_unordered(num_cpus::get())
+        .collect()
+        .await;
+
+    // Collect and examine errors
+    let errors: Vec<_> = results
+        .into_iter()
+        .filter_map(|result| {
+            match result {
+                // Tokio panic error
+                Err(e) => Some(DataFusionError::External(Box::new(e))),
+                Ok(thread_result) => match thread_result {
+                    // Test run error
+                    Err(e) => Some(e),
+                    // success
+                    Ok(_) => None,
+                },
+            }
+        })
+        .collect();

Review Comment:
   > Another to achieve the same effect is to replace 
futures::stream::iter(Some/None) by returning 
tokio_stream::empty()/tokio_stream::once() instead of Some/None in the branches 
of match.
   
   I actually tried this:
   
   ```rust
           .flat_map(|result| {
               // filter to keep only the errors, so we can report on them
               match result {
                   // Tokio panic error
                   Err(e) => 
tokio_stream::once(DataFusionError::External(Box::new(e))),
                   Ok(thread_result) => match thread_result {
                       // Test run error
                       Err(e) => tokio_stream::once(e),
                       // success
                       Ok(_) => tokio_stream::empty()
                   },
               }
           })
   ```
   
   And sadly the compiler complained that the match arms had different types:
   
   ```
   error[E0308]: `match` arms have incompatible types
      --> datafusion/core/tests/sqllogictests/src/main.rs:100:30
       |
   96  |                   Ok(thread_result) => match thread_result {
       |  ______________________________________-
   97  | |                     // Test run error
   98  | |                     Err(e) => tokio_stream::once(e),
       | |                               --------------------- this is found to 
be of type `tokio_stream::Once<DataFusionError>`
   99  | |                     // success
   100 | |                     Ok(_) => tokio_stream::empty()
       | |                              ^^^^^^^^^^^^^^^^^^^^^ expected 
`Once<DataFusionError>`, found `Empty<_>`
   101 | |                 },
       | |_________________- `match` arms have incompatible types
       |
       = note: expected struct `tokio_stream::Once<DataFusionError>`
                  found struct `tokio_stream::Empty<_>`
   
   ```



##########
datafusion/core/tests/sqllogictests/src/main.rs:
##########
@@ -56,65 +57,116 @@ pub fn main() {
 
 #[tokio::main]
 #[cfg(not(target_family = "windows"))]
-pub async fn main() -> Result<(), Box<dyn Error>> {
+pub async fn main() -> Result<()> {
     run_tests().await
 }
 
-async fn run_tests() -> Result<(), Box<dyn Error>> {
+async fn run_tests() -> Result<()> {
     // Enable logging (e.g. set RUST_LOG=debug to see debug logs)
     env_logger::init();
 
     let options = Options::new();
 
-    for (path, relative_path) in read_test_files(&options) {
-        if options.complete_mode {
-            run_complete_file(&path, relative_path).await?;
-        } else if options.postgres_runner {
-            run_test_file_with_postgres(&path, relative_path).await?;
-        } else {
-            run_test_file(&path, relative_path).await?;
+    let test_files: Vec<_> = read_test_files(&options).collect();
+
+    // Run all tests in parallel, reporting failures at the end
+    let results: Vec<_> = futures::stream::iter(test_files)
+        .map(|test_file| {
+            tokio::task::spawn(async move {
+                println!("Running {:?}", test_file.relative_path);
+                if options.complete_mode {
+                    run_complete_file(test_file).await?;
+                } else if options.postgres_runner {
+                    run_test_file_with_postgres(test_file).await?;
+                } else {
+                    run_test_file(test_file).await?;
+                }
+                Ok(()) as Result<()>
+            })
+        })
+        // run up to num_cpus streams in parallel
+        .buffer_unordered(num_cpus::get())
+        .collect()
+        .await;
+
+    // Collect and examine errors
+    let errors: Vec<_> = results
+        .into_iter()
+        .filter_map(|result| {
+            match result {
+                // Tokio panic error
+                Err(e) => Some(DataFusionError::External(Box::new(e))),
+                Ok(thread_result) => match thread_result {
+                    // Test run error
+                    Err(e) => Some(e),
+                    // success
+                    Ok(_) => None,
+                },
+            }
+        })
+        .collect();

Review Comment:
   TIL -- very cool @melgenek  -- thank you



##########
datafusion/core/tests/sqllogictests/src/main.rs:
##########
@@ -56,65 +57,116 @@ pub fn main() {
 

Review Comment:
   in 19f145c77 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to