alamb commented on code in PR #6393:
URL: https://github.com/apache/arrow-datafusion/pull/6393#discussion_r1204672358
##########
datafusion/core/tests/sqllogictests/src/main.rs:
##########
@@ -56,65 +57,116 @@ pub fn main() {
#[tokio::main]
#[cfg(not(target_family = "windows"))]
-pub async fn main() -> Result<(), Box<dyn Error>> {
+pub async fn main() -> Result<()> {
run_tests().await
}
-async fn run_tests() -> Result<(), Box<dyn Error>> {
+async fn run_tests() -> Result<()> {
// Enable logging (e.g. set RUST_LOG=debug to see debug logs)
env_logger::init();
let options = Options::new();
- for (path, relative_path) in read_test_files(&options) {
- if options.complete_mode {
- run_complete_file(&path, relative_path).await?;
- } else if options.postgres_runner {
- run_test_file_with_postgres(&path, relative_path).await?;
- } else {
- run_test_file(&path, relative_path).await?;
+ let test_files: Vec<_> = read_test_files(&options).collect();
+
+ // Run all tests in parallel, reporting failures at the end
Review Comment:
In deb9a4e20 and 139b59422
##########
datafusion/core/tests/sqllogictests/src/main.rs:
##########
@@ -56,65 +57,116 @@ pub fn main() {
#[tokio::main]
#[cfg(not(target_family = "windows"))]
-pub async fn main() -> Result<(), Box<dyn Error>> {
+pub async fn main() -> Result<()> {
run_tests().await
}
-async fn run_tests() -> Result<(), Box<dyn Error>> {
+async fn run_tests() -> Result<()> {
// Enable logging (e.g. set RUST_LOG=debug to see debug logs)
env_logger::init();
let options = Options::new();
- for (path, relative_path) in read_test_files(&options) {
- if options.complete_mode {
- run_complete_file(&path, relative_path).await?;
- } else if options.postgres_runner {
- run_test_file_with_postgres(&path, relative_path).await?;
- } else {
- run_test_file(&path, relative_path).await?;
+ let test_files: Vec<_> = read_test_files(&options).collect();
+
+ // Run all tests in parallel, reporting failures at the end
+ let results: Vec<_> = futures::stream::iter(test_files)
+ .map(|test_file| {
+ tokio::task::spawn(async move {
+ println!("Running {:?}", test_file.relative_path);
+ if options.complete_mode {
+ run_complete_file(test_file).await?;
+ } else if options.postgres_runner {
+ run_test_file_with_postgres(test_file).await?;
+ } else {
+ run_test_file(test_file).await?;
+ }
+ Ok(()) as Result<()>
+ })
+ })
+ // run up to num_cpus streams in parallel
+ .buffer_unordered(num_cpus::get())
+ .collect()
+ .await;
+
+ // Collect and examine errors
+ let errors: Vec<_> = results
+ .into_iter()
+ .filter_map(|result| {
+ match result {
+ // Tokio panic error
+ Err(e) => Some(DataFusionError::External(Box::new(e))),
+ Ok(thread_result) => match thread_result {
+ // Test run error
+ Err(e) => Some(e),
+ // success
+ Ok(_) => None,
+ },
+ }
+ })
+ .collect();
Review Comment:
> Another to achieve the same effect is to replace
futures::stream::iter(Some/None) by returning
tokio_stream::empty()/tokio_stream::once() instead of Some/None in the branches
of match.
I actually tried this:
```rust
.flat_map(|result| {
// filter to keep only the errors, so we can report on them
match result {
// Tokio panic error
Err(e) =>
tokio_stream::once(DataFusionError::External(Box::new(e))),
Ok(thread_result) => match thread_result {
// Test run error
Err(e) => tokio_stream::once(e),
// success
Ok(_) => tokio_stream::empty()
},
}
})
```
And sadly the compiler complained that the match arms had different types:
```
error[E0308]: `match` arms have incompatible types
--> datafusion/core/tests/sqllogictests/src/main.rs:100:30
|
96 | Ok(thread_result) => match thread_result {
| ______________________________________-
97 | | // Test run error
98 | | Err(e) => tokio_stream::once(e),
| | --------------------- this is found to
be of type `tokio_stream::Once<DataFusionError>`
99 | | // success
100 | | Ok(_) => tokio_stream::empty()
| | ^^^^^^^^^^^^^^^^^^^^^ expected
`Once<DataFusionError>`, found `Empty<_>`
101 | | },
| |_________________- `match` arms have incompatible types
|
= note: expected struct `tokio_stream::Once<DataFusionError>`
found struct `tokio_stream::Empty<_>`
```
##########
datafusion/core/tests/sqllogictests/src/main.rs:
##########
@@ -56,65 +57,116 @@ pub fn main() {
#[tokio::main]
#[cfg(not(target_family = "windows"))]
-pub async fn main() -> Result<(), Box<dyn Error>> {
+pub async fn main() -> Result<()> {
run_tests().await
}
-async fn run_tests() -> Result<(), Box<dyn Error>> {
+async fn run_tests() -> Result<()> {
// Enable logging (e.g. set RUST_LOG=debug to see debug logs)
env_logger::init();
let options = Options::new();
- for (path, relative_path) in read_test_files(&options) {
- if options.complete_mode {
- run_complete_file(&path, relative_path).await?;
- } else if options.postgres_runner {
- run_test_file_with_postgres(&path, relative_path).await?;
- } else {
- run_test_file(&path, relative_path).await?;
+ let test_files: Vec<_> = read_test_files(&options).collect();
+
+ // Run all tests in parallel, reporting failures at the end
+ let results: Vec<_> = futures::stream::iter(test_files)
+ .map(|test_file| {
+ tokio::task::spawn(async move {
+ println!("Running {:?}", test_file.relative_path);
+ if options.complete_mode {
+ run_complete_file(test_file).await?;
+ } else if options.postgres_runner {
+ run_test_file_with_postgres(test_file).await?;
+ } else {
+ run_test_file(test_file).await?;
+ }
+ Ok(()) as Result<()>
+ })
+ })
+ // run up to num_cpus streams in parallel
+ .buffer_unordered(num_cpus::get())
+ .collect()
+ .await;
+
+ // Collect and examine errors
+ let errors: Vec<_> = results
+ .into_iter()
+ .filter_map(|result| {
+ match result {
+ // Tokio panic error
+ Err(e) => Some(DataFusionError::External(Box::new(e))),
+ Ok(thread_result) => match thread_result {
+ // Test run error
+ Err(e) => Some(e),
+ // success
+ Ok(_) => None,
+ },
+ }
+ })
+ .collect();
Review Comment:
TIL -- very cool @melgenek -- thank you
##########
datafusion/core/tests/sqllogictests/src/main.rs:
##########
@@ -56,65 +57,116 @@ pub fn main() {
Review Comment:
in 19f145c77 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]