This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 858c020423 Extract postgres container from sqllogictest, update
datafusion-testing pin (#13971)
858c020423 is described below
commit 858c020423ac7fadd210fbc3983eb91e86951a40
Author: Bruce Ritchie <[email protected]>
AuthorDate: Thu Jan 2 13:29:26 2025 -0500
Extract postgres container from sqllogictest, update datafusion-testing pin
(#13971)
* Add support for sqlite test files to sqllogictest
* Removed workaround for bug that was fixed.
* Refactor sqllogictest to extract postgres functionality into a separate
file. Removed dependency on once_cell in favour of LazyLock.
* Add missing license header.
---
datafusion-testing | 2 +-
datafusion/sqllogictest/Cargo.toml | 2 -
datafusion/sqllogictest/bin/postgres_container.rs | 151 +++++++++++++++++++++
datafusion/sqllogictest/bin/sqllogictests.rs | 148 ++------------------
.../src/engines/postgres_engine/mod.rs | 1 -
5 files changed, 160 insertions(+), 144 deletions(-)
diff --git a/datafusion-testing b/datafusion-testing
index e2e320c947..5cc59cecee 160000
--- a/datafusion-testing
+++ b/datafusion-testing
@@ -1 +1 @@
-Subproject commit e2e320c9477a6d8ab09662eae255887733c0e304
+Subproject commit 5cc59ceceeebeea6b39861210b6d1cd27e66648a
diff --git a/datafusion/sqllogictest/Cargo.toml
b/datafusion/sqllogictest/Cargo.toml
index 1bb88a8bd4..3104846eda 100644
--- a/datafusion/sqllogictest/Cargo.toml
+++ b/datafusion/sqllogictest/Cargo.toml
@@ -49,7 +49,6 @@ indicatif = "0.17"
itertools = { workspace = true }
log = { workspace = true }
object_store = { workspace = true }
-once_cell = { version = "1.20", optional = true }
postgres-protocol = { version = "0.6.7", optional = true }
postgres-types = { version = "0.2.8", features = ["derive",
"with-chrono-0_4"], optional = true }
rust_decimal = { version = "1.36.0", features = ["tokio-pg"] }
@@ -69,7 +68,6 @@ avro = ["datafusion/avro"]
postgres = [
"bytes",
"chrono",
- "once_cell",
"postgres-types",
"postgres-protocol",
"testcontainers",
diff --git a/datafusion/sqllogictest/bin/postgres_container.rs
b/datafusion/sqllogictest/bin/postgres_container.rs
new file mode 100644
index 0000000000..210b9b3e36
--- /dev/null
+++ b/datafusion/sqllogictest/bin/postgres_container.rs
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#![cfg(feature = "postgres")]
+
+use crate::Options;
+use datafusion_common::Result;
+use log::info;
+use std::env::set_var;
+use std::future::Future;
+use std::sync::LazyLock;
+use std::{env, thread};
+use testcontainers::core::IntoContainerPort;
+use testcontainers::runners::AsyncRunner;
+use testcontainers::ImageExt;
+use testcontainers_modules::postgres;
+use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
+use tokio::sync::{mpsc, Mutex};
+use ContainerCommands::{FetchHost, FetchPort};
+
+#[derive(Debug)]
+pub enum ContainerCommands {
+ FetchHost,
+ FetchPort,
+ Stop,
+}
+
+pub struct Channel<T> {
+ pub tx: UnboundedSender<T>,
+ pub rx: Mutex<UnboundedReceiver<T>>,
+}
+
+pub fn channel<T>() -> Channel<T> {
+ let (tx, rx) = mpsc::unbounded_channel();
+ Channel {
+ tx,
+ rx: Mutex::new(rx),
+ }
+}
+
+pub fn execute_blocking<F: Future>(f: F) {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+ .block_on(f);
+}
+
+static POSTGRES_IN: LazyLock<Channel<ContainerCommands>> =
LazyLock::new(channel);
+static POSTGRES_HOST: LazyLock<Channel<String>> = LazyLock::new(channel);
+static POSTGRES_PORT: LazyLock<Channel<u16>> = LazyLock::new(channel);
+static POSTGRES_STOPPED: LazyLock<Channel<()>> = LazyLock::new(channel);
+
+pub async fn initialize_postgres_container(options: &Options) -> Result<()> {
+ let start_pg_database = options.postgres_runner && !is_pg_uri_set();
+ if start_pg_database {
+ info!("Starting postgres db ...");
+
+ thread::spawn(|| {
+ execute_blocking(start_postgres(
+ &POSTGRES_IN,
+ &POSTGRES_HOST,
+ &POSTGRES_PORT,
+ &POSTGRES_STOPPED,
+ ))
+ });
+
+ POSTGRES_IN.tx.send(FetchHost).unwrap();
+ let db_host = POSTGRES_HOST.rx.lock().await.recv().await.unwrap();
+
+ POSTGRES_IN.tx.send(FetchPort).unwrap();
+ let db_port = POSTGRES_PORT.rx.lock().await.recv().await.unwrap();
+
+ let pg_uri =
format!("postgresql://postgres:postgres@{db_host}:{db_port}/test");
+ info!("Postgres uri is {pg_uri}");
+
+ set_var("PG_URI", pg_uri);
+ } else {
+ // close receiver
+ POSTGRES_IN.rx.lock().await.close();
+ }
+
+ Ok(())
+}
+
+pub async fn terminate_postgres_container() -> Result<()> {
+ if !POSTGRES_IN.tx.is_closed() {
+ println!("Stopping postgres db ...");
+ POSTGRES_IN.tx.send(ContainerCommands::Stop).unwrap_or(());
+ POSTGRES_STOPPED.rx.lock().await.recv().await;
+ }
+
+ Ok(())
+}
+
+async fn start_postgres(
+ in_channel: &Channel<ContainerCommands>,
+ host_channel: &Channel<String>,
+ port_channel: &Channel<u16>,
+ stopped_channel: &Channel<()>,
+) {
+ info!("Starting postgres test container with user postgres/postgres and db
test");
+
+ let container = postgres::Postgres::default()
+ .with_user("postgres")
+ .with_password("postgres")
+ .with_db_name("test")
+ .with_mapped_port(16432, 5432.tcp())
+ .with_tag("17-alpine")
+ .start()
+ .await
+ .unwrap();
+ // uncomment this if you are running docker in docker
+ let host = "host.docker.internal".to_string();
+ // let host = container.get_host().await.unwrap().to_string();
+ let port = container.get_host_port_ipv4(5432).await.unwrap();
+
+ let mut rx = in_channel.rx.lock().await;
+ while let Some(command) = rx.recv().await {
+ match command {
+ FetchHost => host_channel.tx.send(host.clone()).unwrap(),
+ FetchPort => port_channel.tx.send(port).unwrap(),
+ ContainerCommands::Stop => {
+ container.stop().await.unwrap();
+ stopped_channel.tx.send(()).unwrap();
+ rx.close();
+ }
+ }
+ }
+}
+
+fn is_pg_uri_set() -> bool {
+ match env::var("PG_URI") {
+ Ok(_) => true,
+ Err(_) => false,
+ }
+}
diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs
b/datafusion/sqllogictest/bin/sqllogictests.rs
index 498539c167..f6b35bf377 100644
--- a/datafusion/sqllogictest/bin/sqllogictests.rs
+++ b/datafusion/sqllogictest/bin/sqllogictests.rs
@@ -28,33 +28,21 @@ use indicatif::{
use itertools::Itertools;
use log::Level::{Info, Warn};
use log::{info, log_enabled, warn};
-#[cfg(feature = "postgres")]
-use once_cell::sync::Lazy;
use sqllogictest::{
parse_file, strict_column_validator, AsyncDB, Condition, Normalizer,
Record,
Validator,
};
+
#[cfg(feature = "postgres")]
-use std::env::set_var;
+use crate::postgres_container::{
+ initialize_postgres_container, terminate_postgres_container,
+};
use std::ffi::OsStr;
use std::fs;
-#[cfg(feature = "postgres")]
-use std::future::Future;
use std::path::{Path, PathBuf};
+
#[cfg(feature = "postgres")]
-use std::{env, thread};
-#[cfg(feature = "postgres")]
-use testcontainers::core::IntoContainerPort;
-#[cfg(feature = "postgres")]
-use testcontainers::runners::AsyncRunner;
-#[cfg(feature = "postgres")]
-use testcontainers::ImageExt;
-#[cfg(feature = "postgres")]
-use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
-#[cfg(feature = "postgres")]
-use tokio::sync::{mpsc, Mutex};
-#[cfg(feature = "postgres")]
-use ContainerCommands::{FetchHost, FetchPort};
+mod postgres_container;
const TEST_DIRECTORY: &str = "test_files/";
const DATAFUSION_TESTING_TEST_DIRECTORY: &str =
"../../datafusion-testing/data/";
@@ -170,31 +158,7 @@ async fn run_tests() -> Result<()> {
options.warn_on_ignored();
#[cfg(feature = "postgres")]
- let start_pg_database = options.postgres_runner && !is_pg_uri_set();
- #[cfg(feature = "postgres")]
- if start_pg_database {
- info!("Starting postgres db ...");
-
- thread::spawn(|| {
- execute_blocking(start_postgres(
- &POSTGRES_IN,
- &POSTGRES_HOST,
- &POSTGRES_PORT,
- &POSTGRES_STOPPED,
- ))
- });
-
- POSTGRES_IN.tx.send(FetchHost).unwrap();
- let db_host = POSTGRES_HOST.rx.lock().await.recv().await.unwrap();
-
- POSTGRES_IN.tx.send(FetchPort).unwrap();
- let db_port = POSTGRES_PORT.rx.lock().await.recv().await.unwrap();
-
- let pg_uri =
format!("postgresql://postgres:postgres@{db_host}:{db_port}/test");
- info!("Postgres uri is {pg_uri}");
-
- set_var("PG_URI", pg_uri);
- }
+ initialize_postgres_container(&options).await?;
// Run all tests in parallel, reporting failures at the end
//
@@ -277,11 +241,7 @@ async fn run_tests() -> Result<()> {
m.println(format!("Completed in {}", HumanDuration(start.elapsed())))?;
#[cfg(feature = "postgres")]
- if start_pg_database {
- println!("Stopping postgres db ...");
- POSTGRES_IN.tx.send(ContainerCommands::Stop).unwrap_or(());
- POSTGRES_STOPPED.rx.lock().await.recv().await;
- }
+ terminate_postgres_container().await?;
// report on any errors
if !errors.is_empty() {
@@ -294,14 +254,6 @@ async fn run_tests() -> Result<()> {
}
}
-#[cfg(feature = "postgres")]
-fn is_pg_uri_set() -> bool {
- match env::var("PG_URI") {
- Ok(_) => true,
- Err(_) => false,
- }
-}
-
async fn run_test_file(
test_file: TestFile,
validator: Validator,
@@ -758,87 +710,3 @@ impl Options {
}
}
}
-
-#[cfg(feature = "postgres")]
-pub async fn start_postgres(
- in_channel: &Channel<ContainerCommands>,
- host_channel: &Channel<String>,
- port_channel: &Channel<u16>,
- stopped_channel: &Channel<()>,
-) {
- info!("Starting postgres test container with user postgres/postgres and db
test");
-
- let container = testcontainers_modules::postgres::Postgres::default()
- .with_user("postgres")
- .with_password("postgres")
- .with_db_name("test")
- .with_mapped_port(16432, 5432.tcp())
- .with_tag("17-alpine")
- .start()
- .await
- .unwrap();
- // uncomment this if you are running docker in docker
- // let host = "host.docker.internal".to_string();
- let host = container.get_host().await.unwrap().to_string();
- let port = container.get_host_port_ipv4(5432).await.unwrap();
-
- let mut rx = in_channel.rx.lock().await;
- while let Some(command) = rx.recv().await {
- match command {
- FetchHost => host_channel.tx.send(host.clone()).unwrap(),
- FetchPort => port_channel.tx.send(port).unwrap(),
- ContainerCommands::Stop => {
- container.stop().await.unwrap();
- stopped_channel.tx.send(()).unwrap();
- rx.close();
- }
- }
- }
-}
-
-#[cfg(feature = "postgres")]
-#[derive(Debug)]
-pub enum ContainerCommands {
- FetchHost,
- FetchPort,
- Stop,
-}
-
-#[cfg(feature = "postgres")]
-pub struct Channel<T> {
- pub tx: UnboundedSender<T>,
- pub rx: Mutex<UnboundedReceiver<T>>,
-}
-
-#[cfg(feature = "postgres")]
-pub fn channel<T>() -> Channel<T> {
- let (tx, rx) = mpsc::unbounded_channel();
- Channel {
- tx,
- rx: Mutex::new(rx),
- }
-}
-
-#[cfg(feature = "postgres")]
-pub fn execute_blocking<F: Future>(f: F) {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .unwrap()
- .block_on(f);
-}
-
-#[cfg(feature = "postgres")]
-pub struct HostPort {
- pub host: String,
- pub port: u16,
-}
-
-#[cfg(feature = "postgres")]
-static POSTGRES_IN: Lazy<Channel<ContainerCommands>> = Lazy::new(channel);
-#[cfg(feature = "postgres")]
-static POSTGRES_HOST: Lazy<Channel<String>> = Lazy::new(channel);
-#[cfg(feature = "postgres")]
-static POSTGRES_PORT: Lazy<Channel<u16>> = Lazy::new(channel);
-#[cfg(feature = "postgres")]
-static POSTGRES_STOPPED: Lazy<Channel<()>> = Lazy::new(channel);
diff --git a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs
b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs
index 1439695d62..6391f666b4 100644
--- a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs
+++ b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs
@@ -215,7 +215,6 @@ fn no_quotes(t: &str) -> &str {
fn schema_name(relative_path: &Path) -> String {
relative_path
.to_string_lossy()
- .to_string()
.chars()
.filter(|ch| ch.is_ascii_alphanumeric())
.collect::<String>()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]