This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new aafec07e08 Add sqlite test files, progress bar, and automatic postgres 
container management into sqllogictests (#13936)
aafec07e08 is described below

commit aafec07e086463fc7ed72c704e9f7e367460618a
Author: Bruce Ritchie <[email protected]>
AuthorDate: Wed Jan 1 07:51:54 2025 -0500

    Add sqlite test files, progress bar, and automatic postgres container 
management into sqllogictests (#13936)
    
    * Fix md5 return_type to only return Utf8 as per current code impl.
    
    * Add support for sqlite test files to sqllogictest
    
    * Force version 0.24.0 of sqllogictest dependency until issue with labels 
is fixed.
    
    * Removed workaround for bug that was fixed.
    
    * Git submodule update ... err update, link to sqlite tests.
    
    * Git submodule update
    
    * Readd submodule
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .gitmodules                                        |   4 +
 datafusion-testing                                 |   1 +
 datafusion/sqllogictest/Cargo.toml                 |   9 +-
 datafusion/sqllogictest/README.md                  |  46 +-
 datafusion/sqllogictest/bin/sqllogictests.rs       | 522 ++++++++++++++++++---
 .../src/engines/datafusion_engine/runner.rs        |  65 ++-
 .../src/engines/postgres_engine/mod.rs             |  63 ++-
 docs/source/contributor-guide/getting_started.md   |   2 +-
 8 files changed, 611 insertions(+), 101 deletions(-)

diff --git a/.gitmodules b/.gitmodules
index ec5d6208b8..037accdbe4 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -4,3 +4,7 @@
 [submodule "testing"]
        path = testing
        url = https://github.com/apache/arrow-testing
+[submodule "datafusion-testing"]
+       path = datafusion-testing
+       url = https://github.com/apache/datafusion-testing.git
+       branch = main
diff --git a/datafusion-testing b/datafusion-testing
new file mode 160000
index 0000000000..e2e320c947
--- /dev/null
+++ b/datafusion-testing
@@ -0,0 +1 @@
+Subproject commit e2e320c9477a6d8ab09662eae255887733c0e304
diff --git a/datafusion/sqllogictest/Cargo.toml 
b/datafusion/sqllogictest/Cargo.toml
index 5c7d909d5c..1bb88a8bd4 100644
--- a/datafusion/sqllogictest/Cargo.toml
+++ b/datafusion/sqllogictest/Cargo.toml
@@ -45,9 +45,11 @@ datafusion-common = { workspace = true, default-features = 
true }
 datafusion-common-runtime = { workspace = true, default-features = true }
 futures = { workspace = true }
 half = { workspace = true, default-features = true }
+indicatif = "0.17"
 itertools = { workspace = true }
 log = { workspace = true }
 object_store = { workspace = true }
+once_cell = { version = "1.20", optional = true }
 postgres-protocol = { version = "0.6.7", optional = true }
 postgres-types = { version = "0.2.8", features = ["derive", 
"with-chrono-0_4"], optional = true }
 rust_decimal = { version = "1.36.0", features = ["tokio-pg"] }
@@ -56,6 +58,8 @@ rust_decimal = { version = "1.36.0", features = ["tokio-pg"] }
 sqllogictest = "=0.24.0"
 sqlparser = { workspace = true }
 tempfile = { workspace = true }
+testcontainers = { version = "0.23", features = ["default"], optional = true }
+testcontainers-modules = { version = "0.11", features = ["postgres"], optional 
= true }
 thiserror = "2.0.0"
 tokio = { workspace = true }
 tokio-postgres = { version = "0.7.12", optional = true }
@@ -65,9 +69,12 @@ avro = ["datafusion/avro"]
 postgres = [
     "bytes",
     "chrono",
-    "tokio-postgres",
+    "once_cell",
     "postgres-types",
     "postgres-protocol",
+    "testcontainers",
+    "testcontainers-modules",
+    "tokio-postgres",
 ]
 
 [dev-dependencies]
diff --git a/datafusion/sqllogictest/README.md 
b/datafusion/sqllogictest/README.md
index 885e92fee2..4a7dc09d7d 100644
--- a/datafusion/sqllogictest/README.md
+++ b/datafusion/sqllogictest/README.md
@@ -28,13 +28,14 @@ This crate is a submodule of DataFusion that contains an 
implementation of [sqll
 ## Overview
 
 This crate uses 
[sqllogictest-rs](https://github.com/risinglightdb/sqllogictest-rs) to parse 
and run `.slt` files in the
-[`test_files`](test_files) directory of this crate.
+[`test_files`](test_files) directory of this crate or the 
[`data/sqlite`](sqlite)
+directory of the datafusion-testing crate.
 
 ## Testing setup
 
 1. `rustup update stable` DataFusion uses the latest stable release of rust
 2. `git submodule init`
-3. `git submodule update`
+3. `git submodule update --init --remote --recursive`
 
 ## Running tests: TLDR Examples
 
@@ -160,7 +161,7 @@ cargo test --test sqllogictests -- information
 Test files that start with prefix `pg_compat_` verify compatibility
 with Postgres by running the same script files both with DataFusion and with 
Postgres
 
-In order to run the sqllogictests running against a previously running 
Postgres instance, do:
+In order to have the sqllogictest run against an existing running Postgres 
instance, do:
 
 ```shell
 PG_COMPAT=true PG_URI="postgresql://[email protected]/postgres" cargo test 
--features=postgres --test sqllogictests
@@ -172,7 +173,7 @@ The environment variables:
 2. `PG_URI` contains a `libpq` style connection string, whose format is 
described in
    [the 
docs](https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url)
 
-One way to create a suitable a posgres container in docker is to use
+One way to create a suitable a postgres container in docker is to use
 the [Official Image](https://hub.docker.com/_/postgres) with a command
 such as the following. Note the collation **must** be set to `C` otherwise
 `ORDER BY` will not match DataFusion and the tests will diff.
@@ -185,6 +186,15 @@ docker run \
   postgres
 ```
 
+If you do not want to create a new postgres database and you have docker
+installed you can skip providing a PG_URI env variable and the sqllogictest
+runner will automatically create a temporary postgres docker container.
+For example:
+
+```shell
+PG_COMPAT=true cargo test --features=postgres --test sqllogictests
+```
+
 ## Running Tests: `tpch`
 
 Test files in `tpch` directory runs against the `TPCH` data set (SF =
@@ -205,6 +215,34 @@ Then you need to add `INCLUDE_TPCH=true` to run tpch tests:
 INCLUDE_TPCH=true cargo test --test sqllogictests
 ```
 
+## Running Tests: `sqlite`
+
+Test files in `data/sqlite` directory of the datafusion-testing crate were
+sourced from the [sqlite test 
suite](https://www.sqlite.org/sqllogictest/dir?ci=tip) and have been cleansed 
and updated to
+run within DataFusion's sqllogictest runner.
+
+To run the sqlite tests you need to increase the rust stack size and add
+`INCLUDE_SQLITE=true` to run the sqlite tests:
+
+```shell
+export RUST_MIN_STACK=30485760;
+INCLUDE_SQLITE=true cargo test --test sqllogictests
+```
+
+Note that there are well over 5 million queries in these tests and running the
+sqlite tests will take a long time. You may wish to run them in release-nonlto 
mode:
+
+```shell
+INCLUDE_SQLITE=true cargo test --profile release-nonlto --test sqllogictests
+```
+
+The sqlite tests can also be run with the postgres runner to verify 
compatibility:
+
+```shell
+export RUST_MIN_STACK=30485760;
+PG_COMPAT=true INCLUDE_SQLITE=true cargo test --features=postgres --test 
sqllogictests
+```
+
 ## Updating tests: Completion Mode
 
 In test script completion mode, `sqllogictests` reads a prototype script and 
runs the statements and queries against the
diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs 
b/datafusion/sqllogictest/bin/sqllogictests.rs
index 066cc8ee98..498539c167 100644
--- a/datafusion/sqllogictest/bin/sqllogictests.rs
+++ b/datafusion/sqllogictest/bin/sqllogictests.rs
@@ -16,57 +16,129 @@
 // under the License.
 
 use clap::Parser;
+use datafusion_common::instant::Instant;
 use datafusion_common::utils::get_available_parallelism;
+use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, 
Result};
+use datafusion_common_runtime::SpawnedTask;
 use datafusion_sqllogictest::{DataFusion, TestContext};
 use futures::stream::StreamExt;
+use indicatif::{
+    HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, 
ProgressStyle,
+};
 use itertools::Itertools;
-use log::info;
-use sqllogictest::{strict_column_validator, Normalizer};
+use log::Level::{Info, Warn};
+use log::{info, log_enabled, warn};
+#[cfg(feature = "postgres")]
+use once_cell::sync::Lazy;
+use sqllogictest::{
+    parse_file, strict_column_validator, AsyncDB, Condition, Normalizer, 
Record,
+    Validator,
+};
+#[cfg(feature = "postgres")]
+use std::env::set_var;
 use std::ffi::OsStr;
 use std::fs;
+#[cfg(feature = "postgres")]
+use std::future::Future;
 use std::path::{Path, PathBuf};
-
-use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, 
Result};
-use datafusion_common_runtime::SpawnedTask;
+#[cfg(feature = "postgres")]
+use std::{env, thread};
+#[cfg(feature = "postgres")]
+use testcontainers::core::IntoContainerPort;
+#[cfg(feature = "postgres")]
+use testcontainers::runners::AsyncRunner;
+#[cfg(feature = "postgres")]
+use testcontainers::ImageExt;
+#[cfg(feature = "postgres")]
+use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
+#[cfg(feature = "postgres")]
+use tokio::sync::{mpsc, Mutex};
+#[cfg(feature = "postgres")]
+use ContainerCommands::{FetchHost, FetchPort};
 
 const TEST_DIRECTORY: &str = "test_files/";
+const DATAFUSION_TESTING_TEST_DIRECTORY: &str = 
"../../datafusion-testing/data/";
 const PG_COMPAT_FILE_PREFIX: &str = "pg_compat_";
+const SQLITE_PREFIX: &str = "sqlite";
 
 pub fn main() -> Result<()> {
     tokio::runtime::Builder::new_multi_thread()
         .enable_all()
-        .build()
-        .unwrap()
+        .build()?
         .block_on(run_tests())
 }
 
+// Trailing whitespace from lines in SLT will typically be removed, but do not 
fail if it is not
+// If particular test wants to cover trailing whitespace on a value,
+// it should project additional non-whitespace column on the right.
 #[allow(clippy::ptr_arg)]
-fn normalizer(s: &String) -> String {
-    // Trailing whitespace from lines in SLT will typically be removed, but do 
not fail if it is not
-    // If particular test wants to cover trailing whitespace on a value,
-    // it should project additional non-whitespace column on the right.
-    s.trim_end().to_owned()
+fn value_normalizer(s: &String) -> String {
+    s.trim_end().to_string()
 }
 
-fn value_validator(
+fn sqlite_value_validator(
     normalizer: Normalizer,
     actual: &[Vec<String>],
     expected: &[String],
 ) -> bool {
-    let expected = expected.iter().map(normalizer).collect::<Vec<_>>();
-    let actual = actual
+    let normalized_expected = 
expected.iter().map(normalizer).collect::<Vec<_>>();
+    let normalized_actual = actual
+        .iter()
+        .map(|strs| strs.iter().map(normalizer).join(" "))
+        .collect_vec();
+
+    if log_enabled!(Info) && normalized_actual != normalized_expected {
+        info!("sqlite validation failed. actual vs expected:");
+        for i in 0..normalized_actual.len() {
+            info!("[{i}] {}<eol>", normalized_actual[i]);
+            info!(
+                "[{i}] {}<eol>",
+                if normalized_expected.len() >= i {
+                    &normalized_expected[i]
+                } else {
+                    "No more results"
+                }
+            );
+        }
+    }
+
+    normalized_actual == normalized_expected
+}
+
+fn df_value_validator(
+    normalizer: Normalizer,
+    actual: &[Vec<String>],
+    expected: &[String],
+) -> bool {
+    let normalized_expected = 
expected.iter().map(normalizer).collect::<Vec<_>>();
+    let normalized_actual = actual
         .iter()
         .map(|strs| strs.iter().join(" "))
-        // Editors do not preserve trailing whitespace, so expected may or may 
not lack it included
-        .map(|str| normalizer(&str))
-        .collect::<Vec<_>>();
-    actual == expected
+        .map(|str| str.trim_end().to_string())
+        .collect_vec();
+
+    if log_enabled!(Warn) && normalized_actual != normalized_expected {
+        warn!("df validation failed. actual vs expected:");
+        for i in 0..normalized_actual.len() {
+            warn!("[{i}] {}<eol>", normalized_actual[i]);
+            warn!(
+                "[{i}] {}<eol>",
+                if normalized_expected.len() >= i {
+                    &normalized_expected[i]
+                } else {
+                    "No more results"
+                }
+            );
+        }
+    }
+
+    normalized_actual == normalized_expected
 }
 
 /// Sets up an empty directory at test_files/scratch/<name>
 /// creating it if needed and clearing any file contents if it exists
 /// This allows tests for inserting to external tables or copy to
-/// to persist data to disk and have consistent state when running
+/// persist data to disk and have consistent state when running
 /// a new test
 fn setup_scratch_dir(name: &Path) -> Result<()> {
     // go from copy.slt --> copy
@@ -97,23 +169,89 @@ async fn run_tests() -> Result<()> {
     }
     options.warn_on_ignored();
 
+    #[cfg(feature = "postgres")]
+    let start_pg_database = options.postgres_runner && !is_pg_uri_set();
+    #[cfg(feature = "postgres")]
+    if start_pg_database {
+        info!("Starting postgres db ...");
+
+        thread::spawn(|| {
+            execute_blocking(start_postgres(
+                &POSTGRES_IN,
+                &POSTGRES_HOST,
+                &POSTGRES_PORT,
+                &POSTGRES_STOPPED,
+            ))
+        });
+
+        POSTGRES_IN.tx.send(FetchHost).unwrap();
+        let db_host = POSTGRES_HOST.rx.lock().await.recv().await.unwrap();
+
+        POSTGRES_IN.tx.send(FetchPort).unwrap();
+        let db_port = POSTGRES_PORT.rx.lock().await.recv().await.unwrap();
+
+        let pg_uri = 
format!("postgresql://postgres:postgres@{db_host}:{db_port}/test");
+        info!("Postgres uri is {pg_uri}");
+
+        set_var("PG_URI", pg_uri);
+    }
+
     // Run all tests in parallel, reporting failures at the end
     //
     // Doing so is safe because each slt file runs with its own
     // `SessionContext` and should not have side effects (like
     // modifying shared state like `/tmp/`)
+    let m = 
MultiProgress::with_draw_target(ProgressDrawTarget::stderr_with_hz(1));
+    let m_style = ProgressStyle::with_template(
+        "[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}",
+    )
+    .unwrap()
+    .progress_chars("##-");
+
+    let start = Instant::now();
+
     let errors: Vec<_> = futures::stream::iter(read_test_files(&options)?)
         .map(|test_file| {
+            let validator = if options.include_sqlite
+                && test_file.relative_path.starts_with(SQLITE_PREFIX)
+            {
+                sqlite_value_validator
+            } else {
+                df_value_validator
+            };
+
+            let m_clone = m.clone();
+            let m_style_clone = m_style.clone();
+
             SpawnedTask::spawn(async move {
-                let file_path = test_file.relative_path.clone();
-                let start = datafusion::common::instant::Instant::now();
                 match (options.postgres_runner, options.complete) {
-                    (false, false) => run_test_file(test_file).await?,
-                    (false, true) => run_complete_file(test_file).await?,
-                    (true, false) => 
run_test_file_with_postgres(test_file).await?,
-                    (true, true) => 
run_complete_file_with_postgres(test_file).await?,
+                    (false, false) => {
+                        run_test_file(test_file, validator, m_clone, 
m_style_clone)
+                            .await?
+                    }
+                    (false, true) => {
+                        run_complete_file(test_file, validator, m_clone, 
m_style_clone)
+                            .await?
+                    }
+                    (true, false) => {
+                        run_test_file_with_postgres(
+                            test_file,
+                            validator,
+                            m_clone,
+                            m_style_clone,
+                        )
+                        .await?
+                    }
+                    (true, true) => {
+                        run_complete_file_with_postgres(
+                            test_file,
+                            validator,
+                            m_clone,
+                            m_style_clone,
+                        )
+                        .await?
+                    }
                 }
-                println!("Executed {:?}. Took {:?}", file_path, 
start.elapsed());
                 Ok(()) as Result<()>
             })
             .join()
@@ -136,6 +274,15 @@ async fn run_tests() -> Result<()> {
         .collect()
         .await;
 
+    m.println(format!("Completed in {}", HumanDuration(start.elapsed())))?;
+
+    #[cfg(feature = "postgres")]
+    if start_pg_database {
+        println!("Stopping postgres db ...");
+        POSTGRES_IN.tx.send(ContainerCommands::Stop).unwrap_or(());
+        POSTGRES_STOPPED.rx.lock().await.recv().await;
+    }
+
     // report on any errors
     if !errors.is_empty() {
         for e in &errors {
@@ -147,60 +294,148 @@ async fn run_tests() -> Result<()> {
     }
 }
 
-async fn run_test_file(test_file: TestFile) -> Result<()> {
+#[cfg(feature = "postgres")]
+fn is_pg_uri_set() -> bool {
+    match env::var("PG_URI") {
+        Ok(_) => true,
+        Err(_) => false,
+    }
+}
+
+async fn run_test_file(
+    test_file: TestFile,
+    validator: Validator,
+    mp: MultiProgress,
+    mp_style: ProgressStyle,
+) -> Result<()> {
     let TestFile {
         path,
         relative_path,
     } = test_file;
-    info!("Running with DataFusion runner: {}", path.display());
     let Some(test_ctx) = 
TestContext::try_new_for_test_file(&relative_path).await else {
         info!("Skipping: {}", path.display());
         return Ok(());
     };
     setup_scratch_dir(&relative_path)?;
+
+    let count: u64 = get_record_count(&path, "Datafusion".to_string());
+    let pb = mp.add(ProgressBar::new(count));
+
+    pb.set_style(mp_style);
+    pb.set_message(format!("{:?}", &relative_path));
+
     let mut runner = sqllogictest::Runner::new(|| async {
         Ok(DataFusion::new(
             test_ctx.session_ctx().clone(),
             relative_path.clone(),
+            pb.clone(),
         ))
     });
+    runner.add_label("Datafusion");
     runner.with_column_validator(strict_column_validator);
-    runner.with_normalizer(normalizer);
-    runner.with_validator(value_validator);
-    runner
+    runner.with_normalizer(value_normalizer);
+    runner.with_validator(validator);
+
+    let res = runner
         .run_file_async(path)
         .await
-        .map_err(|e| DataFusionError::External(Box::new(e)))
+        .map_err(|e| DataFusionError::External(Box::new(e)));
+
+    pb.finish_and_clear();
+
+    res
+}
+
+fn get_record_count(path: &PathBuf, label: String) -> u64 {
+    let records: Vec<Record<<DataFusion as AsyncDB>::ColumnType>> =
+        parse_file(path).unwrap();
+    let mut count: u64 = 0;
+
+    records.iter().for_each(|rec| match rec {
+        Record::Query { conditions, .. } => {
+            if conditions.is_empty()
+                || !conditions.contains(&Condition::SkipIf {
+                    label: label.clone(),
+                })
+                || conditions.contains(&Condition::OnlyIf {
+                    label: label.clone(),
+                })
+            {
+                count += 1;
+            }
+        }
+        Record::Statement { conditions, .. } => {
+            if conditions.is_empty()
+                || !conditions.contains(&Condition::SkipIf {
+                    label: label.clone(),
+                })
+                || conditions.contains(&Condition::OnlyIf {
+                    label: label.clone(),
+                })
+            {
+                count += 1;
+            }
+        }
+        _ => {}
+    });
+
+    count
 }
 
 #[cfg(feature = "postgres")]
-async fn run_test_file_with_postgres(test_file: TestFile) -> Result<()> {
+async fn run_test_file_with_postgres(
+    test_file: TestFile,
+    validator: Validator,
+    mp: MultiProgress,
+    mp_style: ProgressStyle,
+) -> Result<()> {
     use datafusion_sqllogictest::Postgres;
     let TestFile {
         path,
         relative_path,
     } = test_file;
-    info!("Running with Postgres runner: {}", path.display());
     setup_scratch_dir(&relative_path)?;
-    let mut runner =
-        sqllogictest::Runner::new(|| Postgres::connect(relative_path.clone()));
+
+    let count: u64 = get_record_count(&path, "postgresql".to_string());
+    let pb = mp.add(ProgressBar::new(count));
+
+    pb.set_style(mp_style);
+    pb.set_message(format!("{:?}", &relative_path));
+
+    let mut runner = sqllogictest::Runner::new(|| {
+        Postgres::connect(relative_path.clone(), pb.clone())
+    });
+    runner.add_label("postgres");
     runner.with_column_validator(strict_column_validator);
-    runner.with_normalizer(normalizer);
-    runner.with_validator(value_validator);
+    runner.with_normalizer(value_normalizer);
+    runner.with_validator(validator);
     runner
         .run_file_async(path)
         .await
         .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+    pb.finish_and_clear();
+
     Ok(())
 }
 
 #[cfg(not(feature = "postgres"))]
-async fn run_test_file_with_postgres(_test_file: TestFile) -> Result<()> {
+async fn run_test_file_with_postgres(
+    _test_file: TestFile,
+    _validator: Validator,
+    _mp: MultiProgress,
+    _mp_style: ProgressStyle,
+) -> Result<()> {
     use datafusion_common::plan_err;
     plan_err!("Can not run with postgres as postgres feature is not enabled")
 }
 
-async fn run_complete_file(test_file: TestFile) -> Result<()> {
+async fn run_complete_file(
+    test_file: TestFile,
+    validator: Validator,
+    mp: MultiProgress,
+    mp_style: ProgressStyle,
+) -> Result<()> {
     let TestFile {
         path,
         relative_path,
@@ -213,30 +448,48 @@ async fn run_complete_file(test_file: TestFile) -> 
Result<()> {
         return Ok(());
     };
     setup_scratch_dir(&relative_path)?;
+
+    let count: u64 = get_record_count(&path, "Datafusion".to_string());
+    let pb = mp.add(ProgressBar::new(count));
+
+    pb.set_style(mp_style);
+    pb.set_message(format!("{:?}", &relative_path));
+
     let mut runner = sqllogictest::Runner::new(|| async {
         Ok(DataFusion::new(
             test_ctx.session_ctx().clone(),
             relative_path.clone(),
+            pb.clone(),
         ))
     });
+
     let col_separator = " ";
-    runner
+    let res = runner
         .update_test_file(
             path,
             col_separator,
-            value_validator,
-            normalizer,
+            validator,
+            value_normalizer,
             strict_column_validator,
         )
         .await
         // Can't use e directly because it isn't marked Send, so turn it into 
a string.
         .map_err(|e| {
             DataFusionError::Execution(format!("Error completing 
{relative_path:?}: {e}"))
-        })
+        });
+
+    pb.finish_and_clear();
+
+    res
 }
 
 #[cfg(feature = "postgres")]
-async fn run_complete_file_with_postgres(test_file: TestFile) -> Result<()> {
+async fn run_complete_file_with_postgres(
+    test_file: TestFile,
+    validator: Validator,
+    mp: MultiProgress,
+    mp_style: ProgressStyle,
+) -> Result<()> {
     use datafusion_sqllogictest::Postgres;
     let TestFile {
         path,
@@ -247,26 +500,48 @@ async fn run_complete_file_with_postgres(test_file: 
TestFile) -> Result<()> {
         path.display()
     );
     setup_scratch_dir(&relative_path)?;
-    let mut runner =
-        sqllogictest::Runner::new(|| Postgres::connect(relative_path.clone()));
+
+    let count: u64 = get_record_count(&path, "postgresql".to_string());
+    let pb = mp.add(ProgressBar::new(count));
+
+    pb.set_style(mp_style);
+    pb.set_message(format!("{:?}", &relative_path));
+
+    let mut runner = sqllogictest::Runner::new(|| {
+        Postgres::connect(relative_path.clone(), pb.clone())
+    });
+    runner.add_label("postgres");
+    runner.with_column_validator(strict_column_validator);
+    runner.with_normalizer(value_normalizer);
+    runner.with_validator(validator);
+
     let col_separator = " ";
-    runner
+    let res = runner
         .update_test_file(
             path,
             col_separator,
-            value_validator,
-            normalizer,
+            validator,
+            value_normalizer,
             strict_column_validator,
         )
         .await
         // Can't use e directly because it isn't marked Send, so turn it into 
a string.
         .map_err(|e| {
             DataFusionError::Execution(format!("Error completing 
{relative_path:?}: {e}"))
-        })
+        });
+
+    pb.finish_and_clear();
+
+    res
 }
 
 #[cfg(not(feature = "postgres"))]
-async fn run_complete_file_with_postgres(_test_file: TestFile) -> Result<()> {
+async fn run_complete_file_with_postgres(
+    _test_file: TestFile,
+    _validator: Validator,
+    _mp: MultiProgress,
+    _mp_style: ProgressStyle,
+) -> Result<()> {
     use datafusion_common::plan_err;
     plan_err!("Can not run with postgres as postgres feature is not enabled")
 }
@@ -282,11 +557,14 @@ struct TestFile {
 
 impl TestFile {
     fn new(path: PathBuf) -> Self {
-        let relative_path = PathBuf::from(
-            path.to_string_lossy()
-                .strip_prefix(TEST_DIRECTORY)
-                .unwrap_or(""),
-        );
+        let p = path.to_string_lossy();
+        let relative_path = PathBuf::from(if p.starts_with(TEST_DIRECTORY) {
+            p.strip_prefix(TEST_DIRECTORY).unwrap()
+        } else if p.starts_with(DATAFUSION_TESTING_TEST_DIRECTORY) {
+            p.strip_prefix(DATAFUSION_TESTING_TEST_DIRECTORY).unwrap()
+        } else {
+            ""
+        });
 
         Self {
             path,
@@ -298,6 +576,14 @@ impl TestFile {
         self.path.extension() == Some(OsStr::new("slt"))
     }
 
+    fn check_sqlite(&self, options: &Options) -> bool {
+        if !self.relative_path.starts_with(SQLITE_PREFIX) {
+            return true;
+        }
+
+        options.include_sqlite
+    }
+
     fn check_tpch(&self, options: &Options) -> bool {
         if !self.relative_path.starts_with("tpch") {
             return true;
@@ -310,15 +596,29 @@ impl TestFile {
 fn read_test_files<'a>(
     options: &'a Options,
 ) -> Result<Box<dyn Iterator<Item = TestFile> + 'a>> {
-    Ok(Box::new(
-        read_dir_recursive(TEST_DIRECTORY)?
+    let mut paths = read_dir_recursive(TEST_DIRECTORY)?
+        .into_iter()
+        .map(TestFile::new)
+        .filter(|f| options.check_test_file(&f.relative_path))
+        .filter(|f| f.is_slt_file())
+        .filter(|f| f.check_tpch(options))
+        .filter(|f| f.check_sqlite(options))
+        .filter(|f| options.check_pg_compat_file(f.path.as_path()))
+        .collect::<Vec<_>>();
+    if options.include_sqlite {
+        let mut sqlite_paths = 
read_dir_recursive(DATAFUSION_TESTING_TEST_DIRECTORY)?
             .into_iter()
             .map(TestFile::new)
             .filter(|f| options.check_test_file(&f.relative_path))
             .filter(|f| f.is_slt_file())
-            .filter(|f| f.check_tpch(options))
-            .filter(|f| options.check_pg_compat_file(f.path.as_path())),
-    ))
+            .filter(|f| f.check_sqlite(options))
+            .filter(|f| options.check_pg_compat_file(f.path.as_path()))
+            .collect::<Vec<_>>();
+
+        paths.append(&mut sqlite_paths)
+    }
+
+    Ok(Box::new(paths.into_iter()))
 }
 
 fn read_dir_recursive<P: AsRef<Path>>(path: P) -> Result<Vec<PathBuf>> {
@@ -350,7 +650,7 @@ fn read_dir_recursive_impl(dst: &mut Vec<PathBuf>, path: 
&Path) -> Result<()> {
 
 /// Parsed command line options
 ///
-/// This structure attempts to mimic the command line options of the built in 
rust test runner
+/// This structure attempts to mimic the command line options of the built-in 
rust test runner
 /// accepted by IDEs such as CLion that pass arguments
 ///
 /// See <https://github.com/apache/datafusion/issues/8287> for more details
@@ -367,6 +667,9 @@ struct Options {
     )]
     postgres_runner: bool,
 
+    #[clap(long, env = "INCLUDE_SQLITE", help = "Include sqlite files")]
+    include_sqlite: bool,
+
     #[clap(long, env = "INCLUDE_TPCH", help = "Include tpch files")]
     include_tpch: bool,
 
@@ -431,10 +734,13 @@ impl Options {
             .any(|filter| relative_path.to_string_lossy().contains(filter))
     }
 
-    /// Postgres runner executes only tests in files with specific names
+    /// Postgres runner executes only tests in files with specific names or in
+    /// specific folders
     fn check_pg_compat_file(&self, path: &Path) -> bool {
         let file_name = 
path.file_name().unwrap().to_str().unwrap().to_string();
-        !self.postgres_runner || file_name.starts_with(PG_COMPAT_FILE_PREFIX)
+        !self.postgres_runner
+            || file_name.starts_with(PG_COMPAT_FILE_PREFIX)
+            || (self.include_sqlite && 
path.to_string_lossy().contains(SQLITE_PREFIX))
     }
 
     /// Logs warning messages to stdout if any ignored options are passed
@@ -452,3 +758,87 @@ impl Options {
         }
     }
 }
+
+#[cfg(feature = "postgres")]
+pub async fn start_postgres(
+    in_channel: &Channel<ContainerCommands>,
+    host_channel: &Channel<String>,
+    port_channel: &Channel<u16>,
+    stopped_channel: &Channel<()>,
+) {
+    info!("Starting postgres test container with user postgres/postgres and db 
test");
+
+    let container = testcontainers_modules::postgres::Postgres::default()
+        .with_user("postgres")
+        .with_password("postgres")
+        .with_db_name("test")
+        .with_mapped_port(16432, 5432.tcp())
+        .with_tag("17-alpine")
+        .start()
+        .await
+        .unwrap();
+    // uncomment this if you are running docker in docker
+    // let host = "host.docker.internal".to_string();
+    let host = container.get_host().await.unwrap().to_string();
+    let port = container.get_host_port_ipv4(5432).await.unwrap();
+
+    let mut rx = in_channel.rx.lock().await;
+    while let Some(command) = rx.recv().await {
+        match command {
+            FetchHost => host_channel.tx.send(host.clone()).unwrap(),
+            FetchPort => port_channel.tx.send(port).unwrap(),
+            ContainerCommands::Stop => {
+                container.stop().await.unwrap();
+                stopped_channel.tx.send(()).unwrap();
+                rx.close();
+            }
+        }
+    }
+}
+
+#[cfg(feature = "postgres")]
+#[derive(Debug)]
+pub enum ContainerCommands {
+    FetchHost,
+    FetchPort,
+    Stop,
+}
+
+#[cfg(feature = "postgres")]
+pub struct Channel<T> {
+    pub tx: UnboundedSender<T>,
+    pub rx: Mutex<UnboundedReceiver<T>>,
+}
+
+#[cfg(feature = "postgres")]
+pub fn channel<T>() -> Channel<T> {
+    let (tx, rx) = mpsc::unbounded_channel();
+    Channel {
+        tx,
+        rx: Mutex::new(rx),
+    }
+}
+
+#[cfg(feature = "postgres")]
+pub fn execute_blocking<F: Future>(f: F) {
+    tokio::runtime::Builder::new_current_thread()
+        .enable_all()
+        .build()
+        .unwrap()
+        .block_on(f);
+}
+
+#[cfg(feature = "postgres")]
+pub struct HostPort {
+    pub host: String,
+    pub port: u16,
+}
+
+#[cfg(feature = "postgres")]
+static POSTGRES_IN: Lazy<Channel<ContainerCommands>> = Lazy::new(channel);
+#[cfg(feature = "postgres")]
+static POSTGRES_HOST: Lazy<Channel<String>> = Lazy::new(channel);
+#[cfg(feature = "postgres")]
+static POSTGRES_PORT: Lazy<Channel<u16>> = Lazy::new(channel);
+#[cfg(feature = "postgres")]
+static POSTGRES_STOPPED: Lazy<Channel<()>> = Lazy::new(channel);
diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs 
b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs
index 5c24b49cfe..e696058484 100644
--- a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs
+++ b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs
@@ -18,26 +18,49 @@
 use std::sync::Arc;
 use std::{path::PathBuf, time::Duration};
 
+use super::{error::Result, normalize, DFSqlLogicTestError};
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
 use datafusion::physical_plan::common::collect;
 use datafusion::physical_plan::execute_stream;
 use datafusion::prelude::SessionContext;
-use log::info;
+use indicatif::ProgressBar;
+use log::Level::{Debug, Info};
+use log::{debug, log_enabled, warn};
 use sqllogictest::DBOutput;
-
-use super::{error::Result, normalize, DFSqlLogicTestError};
+use tokio::time::Instant;
 
 use crate::engines::output::{DFColumnType, DFOutput};
 
 pub struct DataFusion {
     ctx: SessionContext,
     relative_path: PathBuf,
+    pb: ProgressBar,
 }
 
 impl DataFusion {
-    pub fn new(ctx: SessionContext, relative_path: PathBuf) -> Self {
-        Self { ctx, relative_path }
+    pub fn new(ctx: SessionContext, relative_path: PathBuf, pb: ProgressBar) 
-> Self {
+        Self {
+            ctx,
+            relative_path,
+            pb,
+        }
+    }
+
+    fn update_slow_count(&self) {
+        let msg = self.pb.message();
+        let split: Vec<&str> = msg.split(" ").collect();
+        let mut current_count = 0;
+
+        if split.len() > 2 {
+            // third match will be current slow count
+            current_count = split[2].parse::<i32>().unwrap();
+        }
+
+        current_count += 1;
+
+        self.pb
+            .set_message(format!("{} - {} took > 500 ms", split[0], 
current_count));
     }
 }
 
@@ -47,12 +70,32 @@ impl sqllogictest::AsyncDB for DataFusion {
     type ColumnType = DFColumnType;
 
     async fn run(&mut self, sql: &str) -> Result<DFOutput> {
-        info!(
-            "[{}] Running query: \"{}\"",
-            self.relative_path.display(),
-            sql
-        );
-        run_query(&self.ctx, sql).await
+        if log_enabled!(Debug) {
+            debug!(
+                "[{}] Running query: \"{}\"",
+                self.relative_path.display(),
+                sql
+            );
+        }
+
+        let start = Instant::now();
+        let result = run_query(&self.ctx, sql).await;
+        let duration = start.elapsed();
+
+        if duration.gt(&Duration::from_millis(500)) {
+            self.update_slow_count();
+        }
+
+        self.pb.inc(1);
+
+        if log_enabled!(Info) && duration.gt(&Duration::from_secs(2)) {
+            warn!(
+                "[{}] Running query took more than 2 sec ({duration:?}): 
\"{sql}\"",
+                self.relative_path.display()
+            );
+        }
+
+        result
     }
 
     /// Engine name of current database.
diff --git a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs 
b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs
index a490488cd7..1439695d62 100644
--- a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs
+++ b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs
@@ -15,22 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
-/// Postgres engine implementation for sqllogictest.
-use std::path::{Path, PathBuf};
-use std::str::FromStr;
-
 use async_trait::async_trait;
 use bytes::Bytes;
 use futures::{SinkExt, StreamExt};
-use log::debug;
+use log::{debug, info};
 use sqllogictest::DBOutput;
+/// Postgres engine implementation for sqllogictest.
+use std::path::{Path, PathBuf};
+use std::str::FromStr;
+use std::time::Duration;
 use tokio::task::JoinHandle;
 
 use super::conversion::*;
 use crate::engines::output::{DFColumnType, DFOutput};
 use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
+use indicatif::ProgressBar;
 use postgres_types::Type;
 use rust_decimal::Decimal;
+use tokio::time::Instant;
 use tokio_postgres::{Column, Row};
 use types::PgRegtype;
 
@@ -55,6 +57,7 @@ pub struct Postgres {
     join_handle: JoinHandle<()>,
     /// Relative test file path
     relative_path: PathBuf,
+    pb: ProgressBar,
 }
 
 impl Postgres {
@@ -71,11 +74,11 @@ impl Postgres {
     /// ```
     ///
     /// See 
https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url
 for format
-    pub async fn connect(relative_path: PathBuf) -> Result<Self> {
+    pub async fn connect(relative_path: PathBuf, pb: ProgressBar) -> 
Result<Self> {
         let uri =
             std::env::var("PG_URI").map_or(PG_URI.to_string(), 
std::convert::identity);
 
-        debug!("Using postgres connection string: {uri}");
+        info!("Using postgres connection string: {uri}");
 
         let config = tokio_postgres::Config::from_str(&uri)?;
 
@@ -113,6 +116,7 @@ impl Postgres {
             client,
             join_handle,
             relative_path,
+            pb,
         })
     }
 
@@ -181,6 +185,22 @@ impl Postgres {
         tx.commit().await?;
         Ok(DBOutput::StatementComplete(0))
     }
+
+    fn update_slow_count(&self) {
+        let msg = self.pb.message();
+        let split: Vec<&str> = msg.split(" ").collect();
+        let mut current_count = 0;
+
+        if split.len() > 2 {
+            // second match will be current slow count
+            current_count += split[2].parse::<i32>().unwrap();
+        }
+
+        current_count += 1;
+
+        self.pb
+            .set_message(format!("{} - {} took > 500 ms", split[0], 
current_count));
+    }
 }
 
 /// remove single quotes from the start and end of the string
@@ -194,16 +214,13 @@ fn no_quotes(t: &str) -> &str {
 /// return a schema name
 fn schema_name(relative_path: &Path) -> String {
     relative_path
-        .file_name()
-        .map(|name| {
-            name.to_string_lossy()
-                .chars()
-                .filter(|ch| ch.is_ascii_alphabetic())
-                .collect::<String>()
-                .trim_start_matches("pg_")
-                .to_string()
-        })
-        .unwrap_or_else(|| "default_schema".to_string())
+        .to_string_lossy()
+        .to_string()
+        .chars()
+        .filter(|ch| ch.is_ascii_alphanumeric())
+        .collect::<String>()
+        .trim_start_matches("pg_")
+        .to_string()
 }
 
 impl Drop for Postgres {
@@ -221,7 +238,7 @@ impl sqllogictest::AsyncDB for Postgres {
         &mut self,
         sql: &str,
     ) -> Result<DBOutput<Self::ColumnType>, Self::Error> {
-        println!(
+        debug!(
             "[{}] Running query: \"{}\"",
             self.relative_path.display(),
             sql
@@ -242,14 +259,24 @@ impl sqllogictest::AsyncDB for Postgres {
         };
 
         if lower_sql.starts_with("copy") {
+            self.pb.inc(1);
             return self.run_copy_command(sql).await;
         }
 
         if !is_query_sql {
             self.client.execute(sql, &[]).await?;
+            self.pb.inc(1);
             return Ok(DBOutput::StatementComplete(0));
         }
+        let start = Instant::now();
         let rows = self.client.query(sql, &[]).await?;
+        let duration = start.elapsed();
+
+        if duration.gt(&Duration::from_millis(500)) {
+            self.update_slow_count();
+        }
+
+        self.pb.inc(1);
 
         let types: Vec<Type> = if rows.is_empty() {
             self.client
diff --git a/docs/source/contributor-guide/getting_started.md 
b/docs/source/contributor-guide/getting_started.md
index 696d6d3a0f..5d85e07f3e 100644
--- a/docs/source/contributor-guide/getting_started.md
+++ b/docs/source/contributor-guide/getting_started.md
@@ -74,7 +74,7 @@ Testing setup:
 
 - `rustup update stable` DataFusion uses the latest stable release of rust
 - `git submodule init`
-- `git submodule update`
+- `git submodule update --init --remote --recursive`
 
 Formatting instructions:
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to