This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new efa7b3421a Minor: Improve error handling in sqllogictest runner (#8544)
efa7b3421a is described below
commit efa7b3421a4b05d939e92b94554f6f7fb2164d71
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Dec 14 13:52:25 2023 -0500
Minor: Improve error handling in sqllogictest runner (#8544)
---
datafusion/sqllogictest/bin/sqllogictests.rs | 53 ++++++++++++++++++----------
1 file changed, 34 insertions(+), 19 deletions(-)
diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs
b/datafusion/sqllogictest/bin/sqllogictests.rs
index 484677d58e..aeb1cc4ec9 100644
--- a/datafusion/sqllogictest/bin/sqllogictests.rs
+++ b/datafusion/sqllogictest/bin/sqllogictests.rs
@@ -26,7 +26,7 @@ use futures::stream::StreamExt;
use log::info;
use sqllogictest::strict_column_validator;
-use datafusion_common::{exec_err, DataFusionError, Result};
+use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError,
Result};
const TEST_DIRECTORY: &str = "test_files/";
const PG_COMPAT_FILE_PREFIX: &str = "pg_compat_";
@@ -84,7 +84,7 @@ async fn run_tests() -> Result<()> {
// Doing so is safe because each slt file runs with its own
// `SessionContext` and should not have side effects (like
// modifying shared state like `/tmp/`)
- let errors: Vec<_> = futures::stream::iter(read_test_files(&options))
+ let errors: Vec<_> = futures::stream::iter(read_test_files(&options)?)
.map(|test_file| {
tokio::task::spawn(async move {
println!("Running {:?}", test_file.relative_path);
@@ -247,30 +247,45 @@ impl TestFile {
}
}
-fn read_test_files<'a>(options: &'a Options) -> Box<dyn Iterator<Item =
TestFile> + 'a> {
- Box::new(
- read_dir_recursive(TEST_DIRECTORY)
+fn read_test_files<'a>(
+ options: &'a Options,
+) -> Result<Box<dyn Iterator<Item = TestFile> + 'a>> {
+ Ok(Box::new(
+ read_dir_recursive(TEST_DIRECTORY)?
+ .into_iter()
.map(TestFile::new)
.filter(|f| options.check_test_file(&f.relative_path))
.filter(|f| f.is_slt_file())
.filter(|f| f.check_tpch(options))
.filter(|f| options.check_pg_compat_file(f.path.as_path())),
- )
+ ))
}
-fn read_dir_recursive<P: AsRef<Path>>(path: P) -> Box<dyn Iterator<Item =
PathBuf>> {
- Box::new(
- std::fs::read_dir(path)
- .expect("Readable directory")
- .map(|path| path.expect("Readable entry").path())
- .flat_map(|path| {
- if path.is_dir() {
- read_dir_recursive(path)
- } else {
- Box::new(std::iter::once(path))
- }
- }),
- )
+fn read_dir_recursive<P: AsRef<Path>>(path: P) -> Result<Vec<PathBuf>> {
+ let mut dst = vec![];
+ read_dir_recursive_impl(&mut dst, path.as_ref())?;
+ Ok(dst)
+}
+
+/// Append all paths recursively to dst
+fn read_dir_recursive_impl(dst: &mut Vec<PathBuf>, path: &Path) -> Result<()> {
+ let entries = std::fs::read_dir(path)
+ .map_err(|e| exec_datafusion_err!("Error reading directory {path:?}:
{e}"))?;
+ for entry in entries {
+ let path = entry
+ .map_err(|e| {
+ exec_datafusion_err!("Error reading entry in directory
{path:?}: {e}")
+ })?
+ .path();
+
+ if path.is_dir() {
+ read_dir_recursive_impl(dst, &path)?;
+ } else {
+ dst.push(path);
+ }
+ }
+
+ Ok(())
}
/// Parsed command line options