melgenek commented on code in PR #6393:
URL: https://github.com/apache/arrow-datafusion/pull/6393#discussion_r1202961565
##########
datafusion/core/tests/sqllogictests/src/main.rs:
##########
@@ -56,65 +57,116 @@ pub fn main() {
#[tokio::main]
#[cfg(not(target_family = "windows"))]
-pub async fn main() -> Result<(), Box<dyn Error>> {
+pub async fn main() -> Result<()> {
run_tests().await
}
-async fn run_tests() -> Result<(), Box<dyn Error>> {
+async fn run_tests() -> Result<()> {
// Enable logging (e.g. set RUST_LOG=debug to see debug logs)
env_logger::init();
let options = Options::new();
- for (path, relative_path) in read_test_files(&options) {
- if options.complete_mode {
- run_complete_file(&path, relative_path).await?;
- } else if options.postgres_runner {
- run_test_file_with_postgres(&path, relative_path).await?;
- } else {
- run_test_file(&path, relative_path).await?;
+ let test_files: Vec<_> = read_test_files(&options).collect();
+
+ // Run all tests in parallel, reporting failures at the end
+ let results: Vec<_> = futures::stream::iter(test_files)
Review Comment:
```suggestion
// Run all tests in parallel, reporting failures at the end
let results: Vec<_> = futures::stream::iter(read_test_files(&options))
```
There's no need to collect in between.
##########
datafusion/core/tests/sqllogictests/src/main.rs:
##########
@@ -56,65 +57,116 @@ pub fn main() {
Review Comment:
The `main` function above for Windows creates a single-threaded executor. I
made it this way by mistake.
It should be `tokio::runtime::Builder::new_multi_thread()`.
##########
datafusion/core/tests/sqllogictests/src/main.rs:
##########
@@ -56,65 +57,116 @@ pub fn main() {
#[tokio::main]
#[cfg(not(target_family = "windows"))]
-pub async fn main() -> Result<(), Box<dyn Error>> {
+pub async fn main() -> Result<()> {
run_tests().await
}
-async fn run_tests() -> Result<(), Box<dyn Error>> {
+async fn run_tests() -> Result<()> {
// Enable logging (e.g. set RUST_LOG=debug to see debug logs)
env_logger::init();
let options = Options::new();
- for (path, relative_path) in read_test_files(&options) {
- if options.complete_mode {
- run_complete_file(&path, relative_path).await?;
- } else if options.postgres_runner {
- run_test_file_with_postgres(&path, relative_path).await?;
- } else {
- run_test_file(&path, relative_path).await?;
+ let test_files: Vec<_> = read_test_files(&options).collect();
+
+ // Run all tests in parallel, reporting failures at the end
+ let results: Vec<_> = futures::stream::iter(test_files)
+ .map(|test_file| {
+ tokio::task::spawn(async move {
+ println!("Running {:?}", test_file.relative_path);
+ if options.complete_mode {
+ run_complete_file(test_file).await?;
+ } else if options.postgres_runner {
+ run_test_file_with_postgres(test_file).await?;
+ } else {
+ run_test_file(test_file).await?;
+ }
+ Ok(()) as Result<()>
+ })
+ })
+ // run up to num_cpus streams in parallel
+ .buffer_unordered(num_cpus::get())
+ .collect()
+ .await;
+
+ // Collect and examine errors
+ let errors: Vec<_> = results
+ .into_iter()
+ .filter_map(|result| {
+ match result {
+ // Tokio panic error
+ Err(e) => Some(DataFusionError::External(Box::new(e))),
+ Ok(thread_result) => match thread_result {
+ // Test run error
+ Err(e) => Some(e),
+ // success
+ Ok(_) => None,
+ },
+ }
+ })
+ .collect();
Review Comment:
```suggestion
.buffer_unordered(num_cpus::get())
.flat_map(|result| {
futures::stream::iter(match result {
// Tokio panic error
Err(e) => Some(DataFusionError::External(Box::new(e))),
Ok(thread_result) => match thread_result {
// Test run error
Err(e) => Some(e),
// success
Ok(_) => None,
},
})
})
.collect()
.await;
```
It is possible to avoid an intermediate `collect+into_iter` to transform
results. `flat_map + futures::stream::iter` would flatten the stream with 0 or
1 elements.
Another to achieve the same effect is to replace
`futures::stream::iter(Some/None)` by returning
`tokio_stream::empty()`/`tokio_stream::once()` instead of Some/None in the
branches of `match`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]