This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 858c020423 Extract postgres container from sqllogictest, update 
datafusion-testing pin (#13971)
858c020423 is described below

commit 858c020423ac7fadd210fbc3983eb91e86951a40
Author: Bruce Ritchie <[email protected]>
AuthorDate: Thu Jan 2 13:29:26 2025 -0500

    Extract postgres container from sqllogictest, update datafusion-testing pin 
(#13971)
    
    * Add support for sqlite test files to sqllogictest
    
    * Removed workaround for bug that was fixed.
    
    * Refactor sqllogictest to extract postgres functionality into a separate 
file. Removed dependency on once_cell in favour of LazyLock.
    
    * Add missing license header.
---
 datafusion-testing                                 |   2 +-
 datafusion/sqllogictest/Cargo.toml                 |   2 -
 datafusion/sqllogictest/bin/postgres_container.rs  | 151 +++++++++++++++++++++
 datafusion/sqllogictest/bin/sqllogictests.rs       | 148 ++------------------
 .../src/engines/postgres_engine/mod.rs             |   1 -
 5 files changed, 160 insertions(+), 144 deletions(-)

diff --git a/datafusion-testing b/datafusion-testing
index e2e320c947..5cc59cecee 160000
--- a/datafusion-testing
+++ b/datafusion-testing
@@ -1 +1 @@
-Subproject commit e2e320c9477a6d8ab09662eae255887733c0e304
+Subproject commit 5cc59ceceeebeea6b39861210b6d1cd27e66648a
diff --git a/datafusion/sqllogictest/Cargo.toml 
b/datafusion/sqllogictest/Cargo.toml
index 1bb88a8bd4..3104846eda 100644
--- a/datafusion/sqllogictest/Cargo.toml
+++ b/datafusion/sqllogictest/Cargo.toml
@@ -49,7 +49,6 @@ indicatif = "0.17"
 itertools = { workspace = true }
 log = { workspace = true }
 object_store = { workspace = true }
-once_cell = { version = "1.20", optional = true }
 postgres-protocol = { version = "0.6.7", optional = true }
 postgres-types = { version = "0.2.8", features = ["derive", 
"with-chrono-0_4"], optional = true }
 rust_decimal = { version = "1.36.0", features = ["tokio-pg"] }
@@ -69,7 +68,6 @@ avro = ["datafusion/avro"]
 postgres = [
     "bytes",
     "chrono",
-    "once_cell",
     "postgres-types",
     "postgres-protocol",
     "testcontainers",
diff --git a/datafusion/sqllogictest/bin/postgres_container.rs 
b/datafusion/sqllogictest/bin/postgres_container.rs
new file mode 100644
index 0000000000..210b9b3e36
--- /dev/null
+++ b/datafusion/sqllogictest/bin/postgres_container.rs
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#![cfg(feature = "postgres")]
+
+use crate::Options;
+use datafusion_common::Result;
+use log::info;
+use std::env::set_var;
+use std::future::Future;
+use std::sync::LazyLock;
+use std::{env, thread};
+use testcontainers::core::IntoContainerPort;
+use testcontainers::runners::AsyncRunner;
+use testcontainers::ImageExt;
+use testcontainers_modules::postgres;
+use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
+use tokio::sync::{mpsc, Mutex};
+use ContainerCommands::{FetchHost, FetchPort};
+
+#[derive(Debug)]
+pub enum ContainerCommands {
+    FetchHost,
+    FetchPort,
+    Stop,
+}
+
+pub struct Channel<T> {
+    pub tx: UnboundedSender<T>,
+    pub rx: Mutex<UnboundedReceiver<T>>,
+}
+
+pub fn channel<T>() -> Channel<T> {
+    let (tx, rx) = mpsc::unbounded_channel();
+    Channel {
+        tx,
+        rx: Mutex::new(rx),
+    }
+}
+
+pub fn execute_blocking<F: Future>(f: F) {
+    tokio::runtime::Builder::new_current_thread()
+        .enable_all()
+        .build()
+        .unwrap()
+        .block_on(f);
+}
+
+static POSTGRES_IN: LazyLock<Channel<ContainerCommands>> = 
LazyLock::new(channel);
+static POSTGRES_HOST: LazyLock<Channel<String>> = LazyLock::new(channel);
+static POSTGRES_PORT: LazyLock<Channel<u16>> = LazyLock::new(channel);
+static POSTGRES_STOPPED: LazyLock<Channel<()>> = LazyLock::new(channel);
+
+pub async fn initialize_postgres_container(options: &Options) -> Result<()> {
+    let start_pg_database = options.postgres_runner && !is_pg_uri_set();
+    if start_pg_database {
+        info!("Starting postgres db ...");
+
+        thread::spawn(|| {
+            execute_blocking(start_postgres(
+                &POSTGRES_IN,
+                &POSTGRES_HOST,
+                &POSTGRES_PORT,
+                &POSTGRES_STOPPED,
+            ))
+        });
+
+        POSTGRES_IN.tx.send(FetchHost).unwrap();
+        let db_host = POSTGRES_HOST.rx.lock().await.recv().await.unwrap();
+
+        POSTGRES_IN.tx.send(FetchPort).unwrap();
+        let db_port = POSTGRES_PORT.rx.lock().await.recv().await.unwrap();
+
+        let pg_uri = 
format!("postgresql://postgres:postgres@{db_host}:{db_port}/test");
+        info!("Postgres uri is {pg_uri}");
+
+        set_var("PG_URI", pg_uri);
+    } else {
+        // close receiver
+        POSTGRES_IN.rx.lock().await.close();
+    }
+
+    Ok(())
+}
+
+pub async fn terminate_postgres_container() -> Result<()> {
+    if !POSTGRES_IN.tx.is_closed() {
+        println!("Stopping postgres db ...");
+        POSTGRES_IN.tx.send(ContainerCommands::Stop).unwrap_or(());
+        POSTGRES_STOPPED.rx.lock().await.recv().await;
+    }
+
+    Ok(())
+}
+
+async fn start_postgres(
+    in_channel: &Channel<ContainerCommands>,
+    host_channel: &Channel<String>,
+    port_channel: &Channel<u16>,
+    stopped_channel: &Channel<()>,
+) {
+    info!("Starting postgres test container with user postgres/postgres and db 
test");
+
+    let container = postgres::Postgres::default()
+        .with_user("postgres")
+        .with_password("postgres")
+        .with_db_name("test")
+        .with_mapped_port(16432, 5432.tcp())
+        .with_tag("17-alpine")
+        .start()
+        .await
+        .unwrap();
+    // uncomment this if you are running docker in docker
+    let host = "host.docker.internal".to_string();
+    // let host = container.get_host().await.unwrap().to_string();
+    let port = container.get_host_port_ipv4(5432).await.unwrap();
+
+    let mut rx = in_channel.rx.lock().await;
+    while let Some(command) = rx.recv().await {
+        match command {
+            FetchHost => host_channel.tx.send(host.clone()).unwrap(),
+            FetchPort => port_channel.tx.send(port).unwrap(),
+            ContainerCommands::Stop => {
+                container.stop().await.unwrap();
+                stopped_channel.tx.send(()).unwrap();
+                rx.close();
+            }
+        }
+    }
+}
+
+fn is_pg_uri_set() -> bool {
+    match env::var("PG_URI") {
+        Ok(_) => true,
+        Err(_) => false,
+    }
+}
diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs 
b/datafusion/sqllogictest/bin/sqllogictests.rs
index 498539c167..f6b35bf377 100644
--- a/datafusion/sqllogictest/bin/sqllogictests.rs
+++ b/datafusion/sqllogictest/bin/sqllogictests.rs
@@ -28,33 +28,21 @@ use indicatif::{
 use itertools::Itertools;
 use log::Level::{Info, Warn};
 use log::{info, log_enabled, warn};
-#[cfg(feature = "postgres")]
-use once_cell::sync::Lazy;
 use sqllogictest::{
     parse_file, strict_column_validator, AsyncDB, Condition, Normalizer, 
Record,
     Validator,
 };
+
 #[cfg(feature = "postgres")]
-use std::env::set_var;
+use crate::postgres_container::{
+    initialize_postgres_container, terminate_postgres_container,
+};
 use std::ffi::OsStr;
 use std::fs;
-#[cfg(feature = "postgres")]
-use std::future::Future;
 use std::path::{Path, PathBuf};
+
 #[cfg(feature = "postgres")]
-use std::{env, thread};
-#[cfg(feature = "postgres")]
-use testcontainers::core::IntoContainerPort;
-#[cfg(feature = "postgres")]
-use testcontainers::runners::AsyncRunner;
-#[cfg(feature = "postgres")]
-use testcontainers::ImageExt;
-#[cfg(feature = "postgres")]
-use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
-#[cfg(feature = "postgres")]
-use tokio::sync::{mpsc, Mutex};
-#[cfg(feature = "postgres")]
-use ContainerCommands::{FetchHost, FetchPort};
+mod postgres_container;
 
 const TEST_DIRECTORY: &str = "test_files/";
 const DATAFUSION_TESTING_TEST_DIRECTORY: &str = 
"../../datafusion-testing/data/";
@@ -170,31 +158,7 @@ async fn run_tests() -> Result<()> {
     options.warn_on_ignored();
 
     #[cfg(feature = "postgres")]
-    let start_pg_database = options.postgres_runner && !is_pg_uri_set();
-    #[cfg(feature = "postgres")]
-    if start_pg_database {
-        info!("Starting postgres db ...");
-
-        thread::spawn(|| {
-            execute_blocking(start_postgres(
-                &POSTGRES_IN,
-                &POSTGRES_HOST,
-                &POSTGRES_PORT,
-                &POSTGRES_STOPPED,
-            ))
-        });
-
-        POSTGRES_IN.tx.send(FetchHost).unwrap();
-        let db_host = POSTGRES_HOST.rx.lock().await.recv().await.unwrap();
-
-        POSTGRES_IN.tx.send(FetchPort).unwrap();
-        let db_port = POSTGRES_PORT.rx.lock().await.recv().await.unwrap();
-
-        let pg_uri = 
format!("postgresql://postgres:postgres@{db_host}:{db_port}/test");
-        info!("Postgres uri is {pg_uri}");
-
-        set_var("PG_URI", pg_uri);
-    }
+    initialize_postgres_container(&options).await?;
 
     // Run all tests in parallel, reporting failures at the end
     //
@@ -277,11 +241,7 @@ async fn run_tests() -> Result<()> {
     m.println(format!("Completed in {}", HumanDuration(start.elapsed())))?;
 
     #[cfg(feature = "postgres")]
-    if start_pg_database {
-        println!("Stopping postgres db ...");
-        POSTGRES_IN.tx.send(ContainerCommands::Stop).unwrap_or(());
-        POSTGRES_STOPPED.rx.lock().await.recv().await;
-    }
+    terminate_postgres_container().await?;
 
     // report on any errors
     if !errors.is_empty() {
@@ -294,14 +254,6 @@ async fn run_tests() -> Result<()> {
     }
 }
 
-#[cfg(feature = "postgres")]
-fn is_pg_uri_set() -> bool {
-    match env::var("PG_URI") {
-        Ok(_) => true,
-        Err(_) => false,
-    }
-}
-
 async fn run_test_file(
     test_file: TestFile,
     validator: Validator,
@@ -758,87 +710,3 @@ impl Options {
         }
     }
 }
-
-#[cfg(feature = "postgres")]
-pub async fn start_postgres(
-    in_channel: &Channel<ContainerCommands>,
-    host_channel: &Channel<String>,
-    port_channel: &Channel<u16>,
-    stopped_channel: &Channel<()>,
-) {
-    info!("Starting postgres test container with user postgres/postgres and db 
test");
-
-    let container = testcontainers_modules::postgres::Postgres::default()
-        .with_user("postgres")
-        .with_password("postgres")
-        .with_db_name("test")
-        .with_mapped_port(16432, 5432.tcp())
-        .with_tag("17-alpine")
-        .start()
-        .await
-        .unwrap();
-    // uncomment this if you are running docker in docker
-    // let host = "host.docker.internal".to_string();
-    let host = container.get_host().await.unwrap().to_string();
-    let port = container.get_host_port_ipv4(5432).await.unwrap();
-
-    let mut rx = in_channel.rx.lock().await;
-    while let Some(command) = rx.recv().await {
-        match command {
-            FetchHost => host_channel.tx.send(host.clone()).unwrap(),
-            FetchPort => port_channel.tx.send(port).unwrap(),
-            ContainerCommands::Stop => {
-                container.stop().await.unwrap();
-                stopped_channel.tx.send(()).unwrap();
-                rx.close();
-            }
-        }
-    }
-}
-
-#[cfg(feature = "postgres")]
-#[derive(Debug)]
-pub enum ContainerCommands {
-    FetchHost,
-    FetchPort,
-    Stop,
-}
-
-#[cfg(feature = "postgres")]
-pub struct Channel<T> {
-    pub tx: UnboundedSender<T>,
-    pub rx: Mutex<UnboundedReceiver<T>>,
-}
-
-#[cfg(feature = "postgres")]
-pub fn channel<T>() -> Channel<T> {
-    let (tx, rx) = mpsc::unbounded_channel();
-    Channel {
-        tx,
-        rx: Mutex::new(rx),
-    }
-}
-
-#[cfg(feature = "postgres")]
-pub fn execute_blocking<F: Future>(f: F) {
-    tokio::runtime::Builder::new_current_thread()
-        .enable_all()
-        .build()
-        .unwrap()
-        .block_on(f);
-}
-
-#[cfg(feature = "postgres")]
-pub struct HostPort {
-    pub host: String,
-    pub port: u16,
-}
-
-#[cfg(feature = "postgres")]
-static POSTGRES_IN: Lazy<Channel<ContainerCommands>> = Lazy::new(channel);
-#[cfg(feature = "postgres")]
-static POSTGRES_HOST: Lazy<Channel<String>> = Lazy::new(channel);
-#[cfg(feature = "postgres")]
-static POSTGRES_PORT: Lazy<Channel<u16>> = Lazy::new(channel);
-#[cfg(feature = "postgres")]
-static POSTGRES_STOPPED: Lazy<Channel<()>> = Lazy::new(channel);
diff --git a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs 
b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs
index 1439695d62..6391f666b4 100644
--- a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs
+++ b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs
@@ -215,7 +215,6 @@ fn no_quotes(t: &str) -> &str {
 fn schema_name(relative_path: &Path) -> String {
     relative_path
         .to_string_lossy()
-        .to_string()
         .chars()
         .filter(|ch| ch.is_ascii_alphanumeric())
         .collect::<String>()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to