This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new aafec07e08 Add sqlite test files, progress bar, and automatic postgres
container management into sqllogictests (#13936)
aafec07e08 is described below
commit aafec07e086463fc7ed72c704e9f7e367460618a
Author: Bruce Ritchie <[email protected]>
AuthorDate: Wed Jan 1 07:51:54 2025 -0500
Add sqlite test files, progress bar, and automatic postgres container
management into sqllogictests (#13936)
* Fix md5 return_type to only return Utf8 as per current code impl.
* Add support for sqlite test files to sqllogictest
* Force version 0.24.0 of sqllogictest dependency until issue with labels
is fixed.
* Removed workaround for bug that was fixed.
* Git submodule update ... err update, link to sqlite tests.
* Git submodule update
* Readd submodule
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.gitmodules | 4 +
datafusion-testing | 1 +
datafusion/sqllogictest/Cargo.toml | 9 +-
datafusion/sqllogictest/README.md | 46 +-
datafusion/sqllogictest/bin/sqllogictests.rs | 522 ++++++++++++++++++---
.../src/engines/datafusion_engine/runner.rs | 65 ++-
.../src/engines/postgres_engine/mod.rs | 63 ++-
docs/source/contributor-guide/getting_started.md | 2 +-
8 files changed, 611 insertions(+), 101 deletions(-)
diff --git a/.gitmodules b/.gitmodules
index ec5d6208b8..037accdbe4 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -4,3 +4,7 @@
[submodule "testing"]
path = testing
url = https://github.com/apache/arrow-testing
+[submodule "datafusion-testing"]
+ path = datafusion-testing
+ url = https://github.com/apache/datafusion-testing.git
+ branch = main
diff --git a/datafusion-testing b/datafusion-testing
new file mode 160000
index 0000000000..e2e320c947
--- /dev/null
+++ b/datafusion-testing
@@ -0,0 +1 @@
+Subproject commit e2e320c9477a6d8ab09662eae255887733c0e304
diff --git a/datafusion/sqllogictest/Cargo.toml
b/datafusion/sqllogictest/Cargo.toml
index 5c7d909d5c..1bb88a8bd4 100644
--- a/datafusion/sqllogictest/Cargo.toml
+++ b/datafusion/sqllogictest/Cargo.toml
@@ -45,9 +45,11 @@ datafusion-common = { workspace = true, default-features =
true }
datafusion-common-runtime = { workspace = true, default-features = true }
futures = { workspace = true }
half = { workspace = true, default-features = true }
+indicatif = "0.17"
itertools = { workspace = true }
log = { workspace = true }
object_store = { workspace = true }
+once_cell = { version = "1.20", optional = true }
postgres-protocol = { version = "0.6.7", optional = true }
postgres-types = { version = "0.2.8", features = ["derive",
"with-chrono-0_4"], optional = true }
rust_decimal = { version = "1.36.0", features = ["tokio-pg"] }
@@ -56,6 +58,8 @@ rust_decimal = { version = "1.36.0", features = ["tokio-pg"] }
sqllogictest = "=0.24.0"
sqlparser = { workspace = true }
tempfile = { workspace = true }
+testcontainers = { version = "0.23", features = ["default"], optional = true }
+testcontainers-modules = { version = "0.11", features = ["postgres"], optional
= true }
thiserror = "2.0.0"
tokio = { workspace = true }
tokio-postgres = { version = "0.7.12", optional = true }
@@ -65,9 +69,12 @@ avro = ["datafusion/avro"]
postgres = [
"bytes",
"chrono",
- "tokio-postgres",
+ "once_cell",
"postgres-types",
"postgres-protocol",
+ "testcontainers",
+ "testcontainers-modules",
+ "tokio-postgres",
]
[dev-dependencies]
diff --git a/datafusion/sqllogictest/README.md
b/datafusion/sqllogictest/README.md
index 885e92fee2..4a7dc09d7d 100644
--- a/datafusion/sqllogictest/README.md
+++ b/datafusion/sqllogictest/README.md
@@ -28,13 +28,14 @@ This crate is a submodule of DataFusion that contains an
implementation of [sqll
## Overview
This crate uses
[sqllogictest-rs](https://github.com/risinglightdb/sqllogictest-rs) to parse
and run `.slt` files in the
-[`test_files`](test_files) directory of this crate.
+[`test_files`](test_files) directory of this crate or the
[`data/sqlite`](sqlite)
+directory of the datafusion-testing crate.
## Testing setup
1. `rustup update stable` DataFusion uses the latest stable release of rust
2. `git submodule init`
-3. `git submodule update`
+3. `git submodule update --init --remote --recursive`
## Running tests: TLDR Examples
@@ -160,7 +161,7 @@ cargo test --test sqllogictests -- information
Test files that start with prefix `pg_compat_` verify compatibility
with Postgres by running the same script files both with DataFusion and with
Postgres
-In order to run the sqllogictests running against a previously running
Postgres instance, do:
+In order to have the sqllogictest run against an existing running Postgres
instance, do:
```shell
PG_COMPAT=true PG_URI="postgresql://[email protected]/postgres" cargo test
--features=postgres --test sqllogictests
@@ -172,7 +173,7 @@ The environment variables:
2. `PG_URI` contains a `libpq` style connection string, whose format is
described in
[the
docs](https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url)
-One way to create a suitable a posgres container in docker is to use
+One way to create a suitable a postgres container in docker is to use
the [Official Image](https://hub.docker.com/_/postgres) with a command
such as the following. Note the collation **must** be set to `C` otherwise
`ORDER BY` will not match DataFusion and the tests will diff.
@@ -185,6 +186,15 @@ docker run \
postgres
```
+If you do not want to create a new postgres database and you have docker
+installed you can skip providing a PG_URI env variable and the sqllogictest
+runner will automatically create a temporary postgres docker container.
+For example:
+
+```shell
+PG_COMPAT=true cargo test --features=postgres --test sqllogictests
+```
+
## Running Tests: `tpch`
Test files in `tpch` directory runs against the `TPCH` data set (SF =
@@ -205,6 +215,34 @@ Then you need to add `INCLUDE_TPCH=true` to run tpch tests:
INCLUDE_TPCH=true cargo test --test sqllogictests
```
+## Running Tests: `sqlite`
+
+Test files in `data/sqlite` directory of the datafusion-testing crate were
+sourced from the [sqlite test
suite](https://www.sqlite.org/sqllogictest/dir?ci=tip) and have been cleansed
and updated to
+run within DataFusion's sqllogictest runner.
+
+To run the sqlite tests you need to increase the rust stack size and add
+`INCLUDE_SQLITE=true` to run the sqlite tests:
+
+```shell
+export RUST_MIN_STACK=30485760;
+INCLUDE_SQLITE=true cargo test --test sqllogictests
+```
+
+Note that there are well over 5 million queries in these tests and running the
+sqlite tests will take a long time. You may wish to run them in release-nonlto
mode:
+
+```shell
+INCLUDE_SQLITE=true cargo test --profile release-nonlto --test sqllogictests
+```
+
+The sqlite tests can also be run with the postgres runner to verify
compatibility:
+
+```shell
+export RUST_MIN_STACK=30485760;
+PG_COMPAT=true INCLUDE_SQLITE=true cargo test --features=postgres --test
sqllogictests
+```
+
## Updating tests: Completion Mode
In test script completion mode, `sqllogictests` reads a prototype script and
runs the statements and queries against the
diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs
b/datafusion/sqllogictest/bin/sqllogictests.rs
index 066cc8ee98..498539c167 100644
--- a/datafusion/sqllogictest/bin/sqllogictests.rs
+++ b/datafusion/sqllogictest/bin/sqllogictests.rs
@@ -16,57 +16,129 @@
// under the License.
use clap::Parser;
+use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
+use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError,
Result};
+use datafusion_common_runtime::SpawnedTask;
use datafusion_sqllogictest::{DataFusion, TestContext};
use futures::stream::StreamExt;
+use indicatif::{
+ HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget,
ProgressStyle,
+};
use itertools::Itertools;
-use log::info;
-use sqllogictest::{strict_column_validator, Normalizer};
+use log::Level::{Info, Warn};
+use log::{info, log_enabled, warn};
+#[cfg(feature = "postgres")]
+use once_cell::sync::Lazy;
+use sqllogictest::{
+ parse_file, strict_column_validator, AsyncDB, Condition, Normalizer,
Record,
+ Validator,
+};
+#[cfg(feature = "postgres")]
+use std::env::set_var;
use std::ffi::OsStr;
use std::fs;
+#[cfg(feature = "postgres")]
+use std::future::Future;
use std::path::{Path, PathBuf};
-
-use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError,
Result};
-use datafusion_common_runtime::SpawnedTask;
+#[cfg(feature = "postgres")]
+use std::{env, thread};
+#[cfg(feature = "postgres")]
+use testcontainers::core::IntoContainerPort;
+#[cfg(feature = "postgres")]
+use testcontainers::runners::AsyncRunner;
+#[cfg(feature = "postgres")]
+use testcontainers::ImageExt;
+#[cfg(feature = "postgres")]
+use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
+#[cfg(feature = "postgres")]
+use tokio::sync::{mpsc, Mutex};
+#[cfg(feature = "postgres")]
+use ContainerCommands::{FetchHost, FetchPort};
const TEST_DIRECTORY: &str = "test_files/";
+const DATAFUSION_TESTING_TEST_DIRECTORY: &str =
"../../datafusion-testing/data/";
const PG_COMPAT_FILE_PREFIX: &str = "pg_compat_";
+const SQLITE_PREFIX: &str = "sqlite";
pub fn main() -> Result<()> {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
- .build()
- .unwrap()
+ .build()?
.block_on(run_tests())
}
+// Trailing whitespace from lines in SLT will typically be removed, but do not
fail if it is not
+// If particular test wants to cover trailing whitespace on a value,
+// it should project additional non-whitespace column on the right.
#[allow(clippy::ptr_arg)]
-fn normalizer(s: &String) -> String {
- // Trailing whitespace from lines in SLT will typically be removed, but do
not fail if it is not
- // If particular test wants to cover trailing whitespace on a value,
- // it should project additional non-whitespace column on the right.
- s.trim_end().to_owned()
+fn value_normalizer(s: &String) -> String {
+ s.trim_end().to_string()
}
-fn value_validator(
+fn sqlite_value_validator(
normalizer: Normalizer,
actual: &[Vec<String>],
expected: &[String],
) -> bool {
- let expected = expected.iter().map(normalizer).collect::<Vec<_>>();
- let actual = actual
+ let normalized_expected =
expected.iter().map(normalizer).collect::<Vec<_>>();
+ let normalized_actual = actual
+ .iter()
+ .map(|strs| strs.iter().map(normalizer).join(" "))
+ .collect_vec();
+
+ if log_enabled!(Info) && normalized_actual != normalized_expected {
+ info!("sqlite validation failed. actual vs expected:");
+ for i in 0..normalized_actual.len() {
+ info!("[{i}] {}<eol>", normalized_actual[i]);
+ info!(
+ "[{i}] {}<eol>",
+ if normalized_expected.len() >= i {
+ &normalized_expected[i]
+ } else {
+ "No more results"
+ }
+ );
+ }
+ }
+
+ normalized_actual == normalized_expected
+}
+
+fn df_value_validator(
+ normalizer: Normalizer,
+ actual: &[Vec<String>],
+ expected: &[String],
+) -> bool {
+ let normalized_expected =
expected.iter().map(normalizer).collect::<Vec<_>>();
+ let normalized_actual = actual
.iter()
.map(|strs| strs.iter().join(" "))
- // Editors do not preserve trailing whitespace, so expected may or may
not lack it included
- .map(|str| normalizer(&str))
- .collect::<Vec<_>>();
- actual == expected
+ .map(|str| str.trim_end().to_string())
+ .collect_vec();
+
+ if log_enabled!(Warn) && normalized_actual != normalized_expected {
+ warn!("df validation failed. actual vs expected:");
+ for i in 0..normalized_actual.len() {
+ warn!("[{i}] {}<eol>", normalized_actual[i]);
+ warn!(
+ "[{i}] {}<eol>",
+ if normalized_expected.len() >= i {
+ &normalized_expected[i]
+ } else {
+ "No more results"
+ }
+ );
+ }
+ }
+
+ normalized_actual == normalized_expected
}
/// Sets up an empty directory at test_files/scratch/<name>
/// creating it if needed and clearing any file contents if it exists
/// This allows tests for inserting to external tables or copy to
-/// to persist data to disk and have consistent state when running
+/// persist data to disk and have consistent state when running
/// a new test
fn setup_scratch_dir(name: &Path) -> Result<()> {
// go from copy.slt --> copy
@@ -97,23 +169,89 @@ async fn run_tests() -> Result<()> {
}
options.warn_on_ignored();
+ #[cfg(feature = "postgres")]
+ let start_pg_database = options.postgres_runner && !is_pg_uri_set();
+ #[cfg(feature = "postgres")]
+ if start_pg_database {
+ info!("Starting postgres db ...");
+
+ thread::spawn(|| {
+ execute_blocking(start_postgres(
+ &POSTGRES_IN,
+ &POSTGRES_HOST,
+ &POSTGRES_PORT,
+ &POSTGRES_STOPPED,
+ ))
+ });
+
+ POSTGRES_IN.tx.send(FetchHost).unwrap();
+ let db_host = POSTGRES_HOST.rx.lock().await.recv().await.unwrap();
+
+ POSTGRES_IN.tx.send(FetchPort).unwrap();
+ let db_port = POSTGRES_PORT.rx.lock().await.recv().await.unwrap();
+
+ let pg_uri =
format!("postgresql://postgres:postgres@{db_host}:{db_port}/test");
+ info!("Postgres uri is {pg_uri}");
+
+ set_var("PG_URI", pg_uri);
+ }
+
// Run all tests in parallel, reporting failures at the end
//
// Doing so is safe because each slt file runs with its own
// `SessionContext` and should not have side effects (like
// modifying shared state like `/tmp/`)
+ let m =
MultiProgress::with_draw_target(ProgressDrawTarget::stderr_with_hz(1));
+ let m_style = ProgressStyle::with_template(
+ "[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}",
+ )
+ .unwrap()
+ .progress_chars("##-");
+
+ let start = Instant::now();
+
let errors: Vec<_> = futures::stream::iter(read_test_files(&options)?)
.map(|test_file| {
+ let validator = if options.include_sqlite
+ && test_file.relative_path.starts_with(SQLITE_PREFIX)
+ {
+ sqlite_value_validator
+ } else {
+ df_value_validator
+ };
+
+ let m_clone = m.clone();
+ let m_style_clone = m_style.clone();
+
SpawnedTask::spawn(async move {
- let file_path = test_file.relative_path.clone();
- let start = datafusion::common::instant::Instant::now();
match (options.postgres_runner, options.complete) {
- (false, false) => run_test_file(test_file).await?,
- (false, true) => run_complete_file(test_file).await?,
- (true, false) =>
run_test_file_with_postgres(test_file).await?,
- (true, true) =>
run_complete_file_with_postgres(test_file).await?,
+ (false, false) => {
+ run_test_file(test_file, validator, m_clone,
m_style_clone)
+ .await?
+ }
+ (false, true) => {
+ run_complete_file(test_file, validator, m_clone,
m_style_clone)
+ .await?
+ }
+ (true, false) => {
+ run_test_file_with_postgres(
+ test_file,
+ validator,
+ m_clone,
+ m_style_clone,
+ )
+ .await?
+ }
+ (true, true) => {
+ run_complete_file_with_postgres(
+ test_file,
+ validator,
+ m_clone,
+ m_style_clone,
+ )
+ .await?
+ }
}
- println!("Executed {:?}. Took {:?}", file_path,
start.elapsed());
Ok(()) as Result<()>
})
.join()
@@ -136,6 +274,15 @@ async fn run_tests() -> Result<()> {
.collect()
.await;
+ m.println(format!("Completed in {}", HumanDuration(start.elapsed())))?;
+
+ #[cfg(feature = "postgres")]
+ if start_pg_database {
+ println!("Stopping postgres db ...");
+ POSTGRES_IN.tx.send(ContainerCommands::Stop).unwrap_or(());
+ POSTGRES_STOPPED.rx.lock().await.recv().await;
+ }
+
// report on any errors
if !errors.is_empty() {
for e in &errors {
@@ -147,60 +294,148 @@ async fn run_tests() -> Result<()> {
}
}
-async fn run_test_file(test_file: TestFile) -> Result<()> {
+#[cfg(feature = "postgres")]
+fn is_pg_uri_set() -> bool {
+ match env::var("PG_URI") {
+ Ok(_) => true,
+ Err(_) => false,
+ }
+}
+
+async fn run_test_file(
+ test_file: TestFile,
+ validator: Validator,
+ mp: MultiProgress,
+ mp_style: ProgressStyle,
+) -> Result<()> {
let TestFile {
path,
relative_path,
} = test_file;
- info!("Running with DataFusion runner: {}", path.display());
let Some(test_ctx) =
TestContext::try_new_for_test_file(&relative_path).await else {
info!("Skipping: {}", path.display());
return Ok(());
};
setup_scratch_dir(&relative_path)?;
+
+ let count: u64 = get_record_count(&path, "Datafusion".to_string());
+ let pb = mp.add(ProgressBar::new(count));
+
+ pb.set_style(mp_style);
+ pb.set_message(format!("{:?}", &relative_path));
+
let mut runner = sqllogictest::Runner::new(|| async {
Ok(DataFusion::new(
test_ctx.session_ctx().clone(),
relative_path.clone(),
+ pb.clone(),
))
});
+ runner.add_label("Datafusion");
runner.with_column_validator(strict_column_validator);
- runner.with_normalizer(normalizer);
- runner.with_validator(value_validator);
- runner
+ runner.with_normalizer(value_normalizer);
+ runner.with_validator(validator);
+
+ let res = runner
.run_file_async(path)
.await
- .map_err(|e| DataFusionError::External(Box::new(e)))
+ .map_err(|e| DataFusionError::External(Box::new(e)));
+
+ pb.finish_and_clear();
+
+ res
+}
+
+fn get_record_count(path: &PathBuf, label: String) -> u64 {
+ let records: Vec<Record<<DataFusion as AsyncDB>::ColumnType>> =
+ parse_file(path).unwrap();
+ let mut count: u64 = 0;
+
+ records.iter().for_each(|rec| match rec {
+ Record::Query { conditions, .. } => {
+ if conditions.is_empty()
+ || !conditions.contains(&Condition::SkipIf {
+ label: label.clone(),
+ })
+ || conditions.contains(&Condition::OnlyIf {
+ label: label.clone(),
+ })
+ {
+ count += 1;
+ }
+ }
+ Record::Statement { conditions, .. } => {
+ if conditions.is_empty()
+ || !conditions.contains(&Condition::SkipIf {
+ label: label.clone(),
+ })
+ || conditions.contains(&Condition::OnlyIf {
+ label: label.clone(),
+ })
+ {
+ count += 1;
+ }
+ }
+ _ => {}
+ });
+
+ count
}
#[cfg(feature = "postgres")]
-async fn run_test_file_with_postgres(test_file: TestFile) -> Result<()> {
+async fn run_test_file_with_postgres(
+ test_file: TestFile,
+ validator: Validator,
+ mp: MultiProgress,
+ mp_style: ProgressStyle,
+) -> Result<()> {
use datafusion_sqllogictest::Postgres;
let TestFile {
path,
relative_path,
} = test_file;
- info!("Running with Postgres runner: {}", path.display());
setup_scratch_dir(&relative_path)?;
- let mut runner =
- sqllogictest::Runner::new(|| Postgres::connect(relative_path.clone()));
+
+ let count: u64 = get_record_count(&path, "postgresql".to_string());
+ let pb = mp.add(ProgressBar::new(count));
+
+ pb.set_style(mp_style);
+ pb.set_message(format!("{:?}", &relative_path));
+
+ let mut runner = sqllogictest::Runner::new(|| {
+ Postgres::connect(relative_path.clone(), pb.clone())
+ });
+ runner.add_label("postgres");
runner.with_column_validator(strict_column_validator);
- runner.with_normalizer(normalizer);
- runner.with_validator(value_validator);
+ runner.with_normalizer(value_normalizer);
+ runner.with_validator(validator);
runner
.run_file_async(path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+ pb.finish_and_clear();
+
Ok(())
}
#[cfg(not(feature = "postgres"))]
-async fn run_test_file_with_postgres(_test_file: TestFile) -> Result<()> {
+async fn run_test_file_with_postgres(
+ _test_file: TestFile,
+ _validator: Validator,
+ _mp: MultiProgress,
+ _mp_style: ProgressStyle,
+) -> Result<()> {
use datafusion_common::plan_err;
plan_err!("Can not run with postgres as postgres feature is not enabled")
}
-async fn run_complete_file(test_file: TestFile) -> Result<()> {
+async fn run_complete_file(
+ test_file: TestFile,
+ validator: Validator,
+ mp: MultiProgress,
+ mp_style: ProgressStyle,
+) -> Result<()> {
let TestFile {
path,
relative_path,
@@ -213,30 +448,48 @@ async fn run_complete_file(test_file: TestFile) ->
Result<()> {
return Ok(());
};
setup_scratch_dir(&relative_path)?;
+
+ let count: u64 = get_record_count(&path, "Datafusion".to_string());
+ let pb = mp.add(ProgressBar::new(count));
+
+ pb.set_style(mp_style);
+ pb.set_message(format!("{:?}", &relative_path));
+
let mut runner = sqllogictest::Runner::new(|| async {
Ok(DataFusion::new(
test_ctx.session_ctx().clone(),
relative_path.clone(),
+ pb.clone(),
))
});
+
let col_separator = " ";
- runner
+ let res = runner
.update_test_file(
path,
col_separator,
- value_validator,
- normalizer,
+ validator,
+ value_normalizer,
strict_column_validator,
)
.await
// Can't use e directly because it isn't marked Send, so turn it into
a string.
.map_err(|e| {
DataFusionError::Execution(format!("Error completing
{relative_path:?}: {e}"))
- })
+ });
+
+ pb.finish_and_clear();
+
+ res
}
#[cfg(feature = "postgres")]
-async fn run_complete_file_with_postgres(test_file: TestFile) -> Result<()> {
+async fn run_complete_file_with_postgres(
+ test_file: TestFile,
+ validator: Validator,
+ mp: MultiProgress,
+ mp_style: ProgressStyle,
+) -> Result<()> {
use datafusion_sqllogictest::Postgres;
let TestFile {
path,
@@ -247,26 +500,48 @@ async fn run_complete_file_with_postgres(test_file:
TestFile) -> Result<()> {
path.display()
);
setup_scratch_dir(&relative_path)?;
- let mut runner =
- sqllogictest::Runner::new(|| Postgres::connect(relative_path.clone()));
+
+ let count: u64 = get_record_count(&path, "postgresql".to_string());
+ let pb = mp.add(ProgressBar::new(count));
+
+ pb.set_style(mp_style);
+ pb.set_message(format!("{:?}", &relative_path));
+
+ let mut runner = sqllogictest::Runner::new(|| {
+ Postgres::connect(relative_path.clone(), pb.clone())
+ });
+ runner.add_label("postgres");
+ runner.with_column_validator(strict_column_validator);
+ runner.with_normalizer(value_normalizer);
+ runner.with_validator(validator);
+
let col_separator = " ";
- runner
+ let res = runner
.update_test_file(
path,
col_separator,
- value_validator,
- normalizer,
+ validator,
+ value_normalizer,
strict_column_validator,
)
.await
// Can't use e directly because it isn't marked Send, so turn it into
a string.
.map_err(|e| {
DataFusionError::Execution(format!("Error completing
{relative_path:?}: {e}"))
- })
+ });
+
+ pb.finish_and_clear();
+
+ res
}
#[cfg(not(feature = "postgres"))]
-async fn run_complete_file_with_postgres(_test_file: TestFile) -> Result<()> {
+async fn run_complete_file_with_postgres(
+ _test_file: TestFile,
+ _validator: Validator,
+ _mp: MultiProgress,
+ _mp_style: ProgressStyle,
+) -> Result<()> {
use datafusion_common::plan_err;
plan_err!("Can not run with postgres as postgres feature is not enabled")
}
@@ -282,11 +557,14 @@ struct TestFile {
impl TestFile {
fn new(path: PathBuf) -> Self {
- let relative_path = PathBuf::from(
- path.to_string_lossy()
- .strip_prefix(TEST_DIRECTORY)
- .unwrap_or(""),
- );
+ let p = path.to_string_lossy();
+ let relative_path = PathBuf::from(if p.starts_with(TEST_DIRECTORY) {
+ p.strip_prefix(TEST_DIRECTORY).unwrap()
+ } else if p.starts_with(DATAFUSION_TESTING_TEST_DIRECTORY) {
+ p.strip_prefix(DATAFUSION_TESTING_TEST_DIRECTORY).unwrap()
+ } else {
+ ""
+ });
Self {
path,
@@ -298,6 +576,14 @@ impl TestFile {
self.path.extension() == Some(OsStr::new("slt"))
}
+ fn check_sqlite(&self, options: &Options) -> bool {
+ if !self.relative_path.starts_with(SQLITE_PREFIX) {
+ return true;
+ }
+
+ options.include_sqlite
+ }
+
fn check_tpch(&self, options: &Options) -> bool {
if !self.relative_path.starts_with("tpch") {
return true;
@@ -310,15 +596,29 @@ impl TestFile {
fn read_test_files<'a>(
options: &'a Options,
) -> Result<Box<dyn Iterator<Item = TestFile> + 'a>> {
- Ok(Box::new(
- read_dir_recursive(TEST_DIRECTORY)?
+ let mut paths = read_dir_recursive(TEST_DIRECTORY)?
+ .into_iter()
+ .map(TestFile::new)
+ .filter(|f| options.check_test_file(&f.relative_path))
+ .filter(|f| f.is_slt_file())
+ .filter(|f| f.check_tpch(options))
+ .filter(|f| f.check_sqlite(options))
+ .filter(|f| options.check_pg_compat_file(f.path.as_path()))
+ .collect::<Vec<_>>();
+ if options.include_sqlite {
+ let mut sqlite_paths =
read_dir_recursive(DATAFUSION_TESTING_TEST_DIRECTORY)?
.into_iter()
.map(TestFile::new)
.filter(|f| options.check_test_file(&f.relative_path))
.filter(|f| f.is_slt_file())
- .filter(|f| f.check_tpch(options))
- .filter(|f| options.check_pg_compat_file(f.path.as_path())),
- ))
+ .filter(|f| f.check_sqlite(options))
+ .filter(|f| options.check_pg_compat_file(f.path.as_path()))
+ .collect::<Vec<_>>();
+
+ paths.append(&mut sqlite_paths)
+ }
+
+ Ok(Box::new(paths.into_iter()))
}
fn read_dir_recursive<P: AsRef<Path>>(path: P) -> Result<Vec<PathBuf>> {
@@ -350,7 +650,7 @@ fn read_dir_recursive_impl(dst: &mut Vec<PathBuf>, path:
&Path) -> Result<()> {
/// Parsed command line options
///
-/// This structure attempts to mimic the command line options of the built in
rust test runner
+/// This structure attempts to mimic the command line options of the built-in
rust test runner
/// accepted by IDEs such as CLion that pass arguments
///
/// See <https://github.com/apache/datafusion/issues/8287> for more details
@@ -367,6 +667,9 @@ struct Options {
)]
postgres_runner: bool,
+ #[clap(long, env = "INCLUDE_SQLITE", help = "Include sqlite files")]
+ include_sqlite: bool,
+
#[clap(long, env = "INCLUDE_TPCH", help = "Include tpch files")]
include_tpch: bool,
@@ -431,10 +734,13 @@ impl Options {
.any(|filter| relative_path.to_string_lossy().contains(filter))
}
- /// Postgres runner executes only tests in files with specific names
+ /// Postgres runner executes only tests in files with specific names or in
+ /// specific folders
fn check_pg_compat_file(&self, path: &Path) -> bool {
let file_name =
path.file_name().unwrap().to_str().unwrap().to_string();
- !self.postgres_runner || file_name.starts_with(PG_COMPAT_FILE_PREFIX)
+ !self.postgres_runner
+ || file_name.starts_with(PG_COMPAT_FILE_PREFIX)
+ || (self.include_sqlite &&
path.to_string_lossy().contains(SQLITE_PREFIX))
}
/// Logs warning messages to stdout if any ignored options are passed
@@ -452,3 +758,87 @@ impl Options {
}
}
}
+
+#[cfg(feature = "postgres")]
+pub async fn start_postgres(
+ in_channel: &Channel<ContainerCommands>,
+ host_channel: &Channel<String>,
+ port_channel: &Channel<u16>,
+ stopped_channel: &Channel<()>,
+) {
+ info!("Starting postgres test container with user postgres/postgres and db
test");
+
+ let container = testcontainers_modules::postgres::Postgres::default()
+ .with_user("postgres")
+ .with_password("postgres")
+ .with_db_name("test")
+ .with_mapped_port(16432, 5432.tcp())
+ .with_tag("17-alpine")
+ .start()
+ .await
+ .unwrap();
+ // uncomment this if you are running docker in docker
+ // let host = "host.docker.internal".to_string();
+ let host = container.get_host().await.unwrap().to_string();
+ let port = container.get_host_port_ipv4(5432).await.unwrap();
+
+ let mut rx = in_channel.rx.lock().await;
+ while let Some(command) = rx.recv().await {
+ match command {
+ FetchHost => host_channel.tx.send(host.clone()).unwrap(),
+ FetchPort => port_channel.tx.send(port).unwrap(),
+ ContainerCommands::Stop => {
+ container.stop().await.unwrap();
+ stopped_channel.tx.send(()).unwrap();
+ rx.close();
+ }
+ }
+ }
+}
+
+#[cfg(feature = "postgres")]
+#[derive(Debug)]
+pub enum ContainerCommands {
+ FetchHost,
+ FetchPort,
+ Stop,
+}
+
+#[cfg(feature = "postgres")]
+pub struct Channel<T> {
+ pub tx: UnboundedSender<T>,
+ pub rx: Mutex<UnboundedReceiver<T>>,
+}
+
+#[cfg(feature = "postgres")]
+pub fn channel<T>() -> Channel<T> {
+ let (tx, rx) = mpsc::unbounded_channel();
+ Channel {
+ tx,
+ rx: Mutex::new(rx),
+ }
+}
+
+#[cfg(feature = "postgres")]
+pub fn execute_blocking<F: Future>(f: F) {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+ .block_on(f);
+}
+
+#[cfg(feature = "postgres")]
+pub struct HostPort {
+ pub host: String,
+ pub port: u16,
+}
+
+#[cfg(feature = "postgres")]
+static POSTGRES_IN: Lazy<Channel<ContainerCommands>> = Lazy::new(channel);
+#[cfg(feature = "postgres")]
+static POSTGRES_HOST: Lazy<Channel<String>> = Lazy::new(channel);
+#[cfg(feature = "postgres")]
+static POSTGRES_PORT: Lazy<Channel<u16>> = Lazy::new(channel);
+#[cfg(feature = "postgres")]
+static POSTGRES_STOPPED: Lazy<Channel<()>> = Lazy::new(channel);
diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs
b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs
index 5c24b49cfe..e696058484 100644
--- a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs
+++ b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs
@@ -18,26 +18,49 @@
use std::sync::Arc;
use std::{path::PathBuf, time::Duration};
+use super::{error::Result, normalize, DFSqlLogicTestError};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::physical_plan::common::collect;
use datafusion::physical_plan::execute_stream;
use datafusion::prelude::SessionContext;
-use log::info;
+use indicatif::ProgressBar;
+use log::Level::{Debug, Info};
+use log::{debug, log_enabled, warn};
use sqllogictest::DBOutput;
-
-use super::{error::Result, normalize, DFSqlLogicTestError};
+use tokio::time::Instant;
use crate::engines::output::{DFColumnType, DFOutput};
pub struct DataFusion {
ctx: SessionContext,
relative_path: PathBuf,
+ pb: ProgressBar,
}
impl DataFusion {
- pub fn new(ctx: SessionContext, relative_path: PathBuf) -> Self {
- Self { ctx, relative_path }
+ pub fn new(ctx: SessionContext, relative_path: PathBuf, pb: ProgressBar)
-> Self {
+ Self {
+ ctx,
+ relative_path,
+ pb,
+ }
+ }
+
+ fn update_slow_count(&self) {
+ let msg = self.pb.message();
+ let split: Vec<&str> = msg.split(" ").collect();
+ let mut current_count = 0;
+
+ if split.len() > 2 {
+ // third match will be current slow count
+ current_count = split[2].parse::<i32>().unwrap();
+ }
+
+ current_count += 1;
+
+ self.pb
+ .set_message(format!("{} - {} took > 500 ms", split[0],
current_count));
}
}
@@ -47,12 +70,32 @@ impl sqllogictest::AsyncDB for DataFusion {
type ColumnType = DFColumnType;
async fn run(&mut self, sql: &str) -> Result<DFOutput> {
- info!(
- "[{}] Running query: \"{}\"",
- self.relative_path.display(),
- sql
- );
- run_query(&self.ctx, sql).await
+ if log_enabled!(Debug) {
+ debug!(
+ "[{}] Running query: \"{}\"",
+ self.relative_path.display(),
+ sql
+ );
+ }
+
+ let start = Instant::now();
+ let result = run_query(&self.ctx, sql).await;
+ let duration = start.elapsed();
+
+ if duration.gt(&Duration::from_millis(500)) {
+ self.update_slow_count();
+ }
+
+ self.pb.inc(1);
+
+ if log_enabled!(Info) && duration.gt(&Duration::from_secs(2)) {
+ warn!(
+ "[{}] Running query took more than 2 sec ({duration:?}):
\"{sql}\"",
+ self.relative_path.display()
+ );
+ }
+
+ result
}
/// Engine name of current database.
diff --git a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs
b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs
index a490488cd7..1439695d62 100644
--- a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs
+++ b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs
@@ -15,22 +15,24 @@
// specific language governing permissions and limitations
// under the License.
-/// Postgres engine implementation for sqllogictest.
-use std::path::{Path, PathBuf};
-use std::str::FromStr;
-
use async_trait::async_trait;
use bytes::Bytes;
use futures::{SinkExt, StreamExt};
-use log::debug;
+use log::{debug, info};
use sqllogictest::DBOutput;
+/// Postgres engine implementation for sqllogictest.
+use std::path::{Path, PathBuf};
+use std::str::FromStr;
+use std::time::Duration;
use tokio::task::JoinHandle;
use super::conversion::*;
use crate::engines::output::{DFColumnType, DFOutput};
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
+use indicatif::ProgressBar;
use postgres_types::Type;
use rust_decimal::Decimal;
+use tokio::time::Instant;
use tokio_postgres::{Column, Row};
use types::PgRegtype;
@@ -55,6 +57,7 @@ pub struct Postgres {
join_handle: JoinHandle<()>,
/// Relative test file path
relative_path: PathBuf,
+ pb: ProgressBar,
}
impl Postgres {
@@ -71,11 +74,11 @@ impl Postgres {
/// ```
///
/// See
https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url
for format
- pub async fn connect(relative_path: PathBuf) -> Result<Self> {
+ pub async fn connect(relative_path: PathBuf, pb: ProgressBar) ->
Result<Self> {
let uri =
std::env::var("PG_URI").map_or(PG_URI.to_string(),
std::convert::identity);
- debug!("Using postgres connection string: {uri}");
+ info!("Using postgres connection string: {uri}");
let config = tokio_postgres::Config::from_str(&uri)?;
@@ -113,6 +116,7 @@ impl Postgres {
client,
join_handle,
relative_path,
+ pb,
})
}
@@ -181,6 +185,22 @@ impl Postgres {
tx.commit().await?;
Ok(DBOutput::StatementComplete(0))
}
+
+ fn update_slow_count(&self) {
+ let msg = self.pb.message();
+ let split: Vec<&str> = msg.split(" ").collect();
+ let mut current_count = 0;
+
+ if split.len() > 2 {
+ // second match will be current slow count
+ current_count += split[2].parse::<i32>().unwrap();
+ }
+
+ current_count += 1;
+
+ self.pb
+ .set_message(format!("{} - {} took > 500 ms", split[0],
current_count));
+ }
}
/// remove single quotes from the start and end of the string
@@ -194,16 +214,13 @@ fn no_quotes(t: &str) -> &str {
/// return a schema name
fn schema_name(relative_path: &Path) -> String {
relative_path
- .file_name()
- .map(|name| {
- name.to_string_lossy()
- .chars()
- .filter(|ch| ch.is_ascii_alphabetic())
- .collect::<String>()
- .trim_start_matches("pg_")
- .to_string()
- })
- .unwrap_or_else(|| "default_schema".to_string())
+ .to_string_lossy()
+ .to_string()
+ .chars()
+ .filter(|ch| ch.is_ascii_alphanumeric())
+ .collect::<String>()
+ .trim_start_matches("pg_")
+ .to_string()
}
impl Drop for Postgres {
@@ -221,7 +238,7 @@ impl sqllogictest::AsyncDB for Postgres {
&mut self,
sql: &str,
) -> Result<DBOutput<Self::ColumnType>, Self::Error> {
- println!(
+ debug!(
"[{}] Running query: \"{}\"",
self.relative_path.display(),
sql
@@ -242,14 +259,24 @@ impl sqllogictest::AsyncDB for Postgres {
};
if lower_sql.starts_with("copy") {
+ self.pb.inc(1);
return self.run_copy_command(sql).await;
}
if !is_query_sql {
self.client.execute(sql, &[]).await?;
+ self.pb.inc(1);
return Ok(DBOutput::StatementComplete(0));
}
+ let start = Instant::now();
let rows = self.client.query(sql, &[]).await?;
+ let duration = start.elapsed();
+
+ if duration.gt(&Duration::from_millis(500)) {
+ self.update_slow_count();
+ }
+
+ self.pb.inc(1);
let types: Vec<Type> = if rows.is_empty() {
self.client
diff --git a/docs/source/contributor-guide/getting_started.md
b/docs/source/contributor-guide/getting_started.md
index 696d6d3a0f..5d85e07f3e 100644
--- a/docs/source/contributor-guide/getting_started.md
+++ b/docs/source/contributor-guide/getting_started.md
@@ -74,7 +74,7 @@ Testing setup:
- `rustup update stable` DataFusion uses the latest stable release of rust
- `git submodule init`
-- `git submodule update`
+- `git submodule update --init --remote --recursive`
Formatting instructions:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]