This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new efa7b3421a Minor: Improve error handling in sqllogictest runner (#8544)
efa7b3421a is described below

commit efa7b3421a4b05d939e92b94554f6f7fb2164d71
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Dec 14 13:52:25 2023 -0500

    Minor: Improve error handling in sqllogictest runner (#8544)
---
 datafusion/sqllogictest/bin/sqllogictests.rs | 53 ++++++++++++++++++----------
 1 file changed, 34 insertions(+), 19 deletions(-)

diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs 
b/datafusion/sqllogictest/bin/sqllogictests.rs
index 484677d58e..aeb1cc4ec9 100644
--- a/datafusion/sqllogictest/bin/sqllogictests.rs
+++ b/datafusion/sqllogictest/bin/sqllogictests.rs
@@ -26,7 +26,7 @@ use futures::stream::StreamExt;
 use log::info;
 use sqllogictest::strict_column_validator;
 
-use datafusion_common::{exec_err, DataFusionError, Result};
+use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, 
Result};
 
 const TEST_DIRECTORY: &str = "test_files/";
 const PG_COMPAT_FILE_PREFIX: &str = "pg_compat_";
@@ -84,7 +84,7 @@ async fn run_tests() -> Result<()> {
     // Doing so is safe because each slt file runs with its own
     // `SessionContext` and should not have side effects (like
     // modifying shared state like `/tmp/`)
-    let errors: Vec<_> = futures::stream::iter(read_test_files(&options))
+    let errors: Vec<_> = futures::stream::iter(read_test_files(&options)?)
         .map(|test_file| {
             tokio::task::spawn(async move {
                 println!("Running {:?}", test_file.relative_path);
@@ -247,30 +247,45 @@ impl TestFile {
     }
 }
 
-fn read_test_files<'a>(options: &'a Options) -> Box<dyn Iterator<Item = 
TestFile> + 'a> {
-    Box::new(
-        read_dir_recursive(TEST_DIRECTORY)
+fn read_test_files<'a>(
+    options: &'a Options,
+) -> Result<Box<dyn Iterator<Item = TestFile> + 'a>> {
+    Ok(Box::new(
+        read_dir_recursive(TEST_DIRECTORY)?
+            .into_iter()
             .map(TestFile::new)
             .filter(|f| options.check_test_file(&f.relative_path))
             .filter(|f| f.is_slt_file())
             .filter(|f| f.check_tpch(options))
             .filter(|f| options.check_pg_compat_file(f.path.as_path())),
-    )
+    ))
 }
 
-fn read_dir_recursive<P: AsRef<Path>>(path: P) -> Box<dyn Iterator<Item = 
PathBuf>> {
-    Box::new(
-        std::fs::read_dir(path)
-            .expect("Readable directory")
-            .map(|path| path.expect("Readable entry").path())
-            .flat_map(|path| {
-                if path.is_dir() {
-                    read_dir_recursive(path)
-                } else {
-                    Box::new(std::iter::once(path))
-                }
-            }),
-    )
+fn read_dir_recursive<P: AsRef<Path>>(path: P) -> Result<Vec<PathBuf>> {
+    let mut dst = vec![];
+    read_dir_recursive_impl(&mut dst, path.as_ref())?;
+    Ok(dst)
+}
+
+/// Append all paths recursively to dst
+fn read_dir_recursive_impl(dst: &mut Vec<PathBuf>, path: &Path) -> Result<()> {
+    let entries = std::fs::read_dir(path)
+        .map_err(|e| exec_datafusion_err!("Error reading directory {path:?}: 
{e}"))?;
+    for entry in entries {
+        let path = entry
+            .map_err(|e| {
+                exec_datafusion_err!("Error reading entry in directory 
{path:?}: {e}")
+            })?
+            .path();
+
+        if path.is_dir() {
+            read_dir_recursive_impl(dst, &path)?;
+        } else {
+            dst.push(path);
+        }
+    }
+
+    Ok(())
 }
 
 /// Parsed command line options

Reply via email to