This is an automated email from the ASF dual-hosted git repository.
jonah pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 377a4c553b Improve AggregationFuzzer error reporting (#12832)
377a4c553b is described below
commit 377a4c553b04fbcf7609384a501af9a30fe02dbe
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Oct 14 22:24:48 2024 -0400
Improve AggregationFuzzer error reporting (#12832)
* Improve AggregationFuzzer error reporting
* simplify
* Update datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
* fmt
---
datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs | 2 +-
.../tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs | 90 ++++++++++++++--------
2 files changed, 59 insertions(+), 33 deletions(-)
diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
index 64a7514ebd..34061a64d7 100644
--- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
@@ -83,7 +83,7 @@ async fn test_basic_prim_aggr_no_group() {
.table_name("fuzz_table")
.build();
- fuzzer.run().await;
+ fuzzer.run().await
}
/// Fuzz test for `basic prim aggr(sum/sum distinct/max/min/count/avg)` +
`group by single int64`
diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
index abb3404828..6daebc8942 100644
--- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
+++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
@@ -19,6 +19,7 @@ use std::sync::Arc;
use arrow::util::pretty::pretty_format_batches;
use arrow_array::RecordBatch;
+use datafusion_common::{DataFusionError, Result};
use rand::{thread_rng, Rng};
use tokio::task::JoinSet;
@@ -132,7 +133,20 @@ struct QueryGroup {
}
impl AggregationFuzzer {
+ /// Run the fuzzer, printing an error and panicking if any of the tasks
fail
pub async fn run(&self) {
+ let res = self.run_inner().await;
+
+ if let Err(e) = res {
+ // Print the error via `Display` so that it displays nicely (the
default `unwrap()`
+ // prints using `Debug` which escapes newlines, and makes
multi-line messages
+ // hard to read
+ println!("{e}");
+ panic!("Error!");
+ }
+ }
+
+ async fn run_inner(&self) -> Result<()> {
let mut join_set = JoinSet::new();
let mut rng = thread_rng();
@@ -157,16 +171,20 @@ impl AggregationFuzzer {
let tasks = self.generate_fuzz_tasks(query_groups).await;
for task in tasks {
- join_set.spawn(async move {
- task.run().await;
- });
+ join_set.spawn(async move { task.run().await });
}
}
while let Some(join_handle) = join_set.join_next().await {
// propagate errors
- join_handle.unwrap();
+ join_handle.map_err(|e| {
+ DataFusionError::Internal(format!(
+ "AggregationFuzzer task error: {:?}",
+ e
+ ))
+ })??;
}
+ Ok(())
}
async fn generate_fuzz_tasks(
@@ -237,45 +255,53 @@ struct AggregationFuzzTestTask {
}
impl AggregationFuzzTestTask {
- async fn run(&self) {
+ async fn run(&self) -> Result<()> {
let task_result = run_sql(&self.sql, &self.ctx_with_params.ctx)
.await
- .expect("should success to run sql");
- self.check_result(&task_result, &self.expected_result);
+ .map_err(|e| e.context(self.context_error_report()))?;
+ self.check_result(&task_result, &self.expected_result)
}
- // TODO: maybe we should persist the `expected_result` and `task_result`,
- // because the readability is not so good if we just print it.
- fn check_result(&self, task_result: &[RecordBatch], expected_result:
&[RecordBatch]) {
- let result = check_equality_of_batches(task_result, expected_result);
- if let Err(e) = result {
+ fn check_result(
+ &self,
+ task_result: &[RecordBatch],
+ expected_result: &[RecordBatch],
+ ) -> Result<()> {
+ check_equality_of_batches(task_result, expected_result).map_err(|e| {
// If we found inconsistent result, we print the test details for
reproducing at first
- println!(
- "##### AggregationFuzzer error report #####
- ### Sql:\n{}\n\
- ### Schema:\n{}\n\
- ### Session context params:\n{:?}\n\
- ### Inconsistent row:\n\
- - row_idx:{}\n\
- - task_row:{}\n\
- - expected_row:{}\n\
- ### Task total result:\n{}\n\
- ### Expected total result:\n{}\n\
- ### Input:\n{}\n\
- ",
- self.sql,
- self.dataset_ref.batches[0].schema_ref(),
- self.ctx_with_params.params,
+ let message = format!(
+ "{}\n\
+ ### Inconsistent row:\n\
+ - row_idx:{}\n\
+ - task_row:{}\n\
+ - expected_row:{}\n\
+ ### Task total result:\n{}\n\
+ ### Expected total result:\n{}\n\
+ ",
+ self.context_error_report(),
e.row_idx,
e.lhs_row,
e.rhs_row,
pretty_format_batches(task_result).unwrap(),
pretty_format_batches(expected_result).unwrap(),
- pretty_format_batches(&self.dataset_ref.batches).unwrap(),
);
+ DataFusionError::Internal(message)
+ })
+ }
- // Then we just panic
- panic!();
- }
+ /// Returns a formatted error message
+ fn context_error_report(&self) -> String {
+ format!(
+ "##### AggregationFuzzer error report #####\n\
+ ### Sql:\n{}\n\
+ ### Schema:\n{}\n\
+ ### Session context params:\n{:?}\n\
+ ### Input:\n{}\n\
+ ",
+ self.sql,
+ self.dataset_ref.batches[0].schema_ref(),
+ self.ctx_with_params.params,
+ pretty_format_batches(&self.dataset_ref.batches).unwrap(),
+ )
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]