alamb commented on code in PR #4834:
URL: https://github.com/apache/arrow-datafusion/pull/4834#discussion_r1082610209


##########
datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs:
##########
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::{array, array::ArrayRef, datatypes::DataType, 
record_batch::RecordBatch};
+use datafusion::error::DataFusionError;
+use sqllogictest::{ColumnType, DBOutput};
+
+use super::super::conversion::*;
+use super::error::{DFSqlLogicTestError, Result};
+
+/// Converts `batches` to a DBOutput as expected by sqllogicteset.
+///
+/// Assumes empty record batches are a successful statement completion
+///
+pub fn convert_batches(
+    batches: Vec<RecordBatch>,
+    is_pg_compatibility_test: bool,
+) -> Result<DBOutput> {
+    if batches.is_empty() {
+        // DataFusion doesn't report number of rows complete
+        return Ok(DBOutput::StatementComplete(0));
+    }
+
+    let schema = batches[0].schema();
+
+    // TODO: report the the actual types of the result
+    // https://github.com/apache/arrow-datafusion/issues/4499
+    let types = vec![ColumnType::Any; batches[0].num_columns()];
+
+    let mut rows = vec![];
+    for batch in batches {
+        // Verify schema
+        if schema != batch.schema() {
+            return 
Err(DFSqlLogicTestError::DataFusion(DataFusionError::Internal(
+                format!(
+                    "Schema mismatch. Previously had\n{:#?}\n\nGot:\n{:#?}",
+                    schema,
+                    batch.schema()
+                ),
+            )));
+        }
+        rows.append(&mut convert_batch(batch, is_pg_compatibility_test)?);
+    }
+
+    Ok(DBOutput::Rows { types, rows })
+}
+
+/// Convert a single batch to a `Vec<Vec<String>>` for comparison
+fn convert_batch(
+    batch: RecordBatch,
+    is_pg_compatibility_test: bool,
+) -> Result<Vec<Vec<String>>> {
+    (0..batch.num_rows())
+        .map(|row| {
+            batch
+                .columns()
+                .iter()
+                .map(|col| cell_to_string(col, row, is_pg_compatibility_test))
+                .collect::<Result<Vec<String>>>()
+        })
+        .collect()
+}
+
+macro_rules! get_row_value {
+    ($array_type:ty, $column: ident, $row: ident) => {{
+        let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
+
+        array.value($row)
+    }};
+}
+
+/// Normalizes the content of a single cell in RecordBatch prior to printing.
+///
+/// This is to make the output comparable to the semi-standard .slt format
+///
+/// Normalizations applied to [NULL Values and empty strings]
+///
+/// [NULL Values and empty strings]: 
https://duckdb.org/dev/sqllogictest/result_verification#null-values-and-empty-strings
+///
+/// Floating numbers are rounded to have a consistent representation with the 
Postgres runner.
+///
+pub fn cell_to_string(
+    col: &ArrayRef,
+    row: usize,
+    is_pg_compatibility_test: bool,
+) -> Result<String> {
+    if !col.is_valid(row) {
+        // represent any null value with the string "NULL"
+        Ok(NULL_STR.to_string())
+    } else if is_pg_compatibility_test {

Review Comment:
   👍  I am not sure the super high precision floating point results are 
necessary. I wonder if we could simply use the lower precision normalization in 
all the tests (both in pgcompat and non pg compat) 🤔 



##########
datafusion/core/tests/sqllogictests/src/engines/postgres/mod.rs:
##########
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use log::info;
+use sqllogictest::{ColumnType, DBOutput};
+use testcontainers::core::WaitFor;
+use testcontainers::images::generic::GenericImage;
+use tokio::task::JoinHandle;
+
+pub struct Postgres {
+    client: Arc<tokio_postgres::Client>,
+    join_handle: JoinHandle<()>,
+}
+
+pub const PG_USER: &str = "postgres";
+pub const PG_PASSWORD: &str = "postgres";
+pub const PG_DB: &str = "test";
+pub const PG_PORT: u16 = 5432;
+
+impl Postgres {
+    pub fn postgres_docker_image() -> GenericImage {
+        let postgres_test_data = match datafusion::test_util::get_data_dir(
+            "POSTGRES_TEST_DATA",
+            "tests/sqllogictests/postgres",
+        ) {
+            Ok(pb) => pb.display().to_string(),
+            Err(err) => panic!("failed to get arrow data dir: {err}"),
+        };
+        GenericImage::new("postgres", "15")
+            .with_wait_for(WaitFor::message_on_stderr(
+                "database system is ready to accept connections",
+            ))
+            .with_env_var("POSTGRES_DB", PG_DB)
+            .with_env_var("POSTGRES_USER", PG_USER)
+            .with_env_var("POSTGRES_PASSWORD", PG_PASSWORD)
+            .with_env_var(
+                "POSTGRES_INITDB_ARGS",
+                "--encoding=UTF-8 --lc-collate=C --lc-ctype=C",
+            )
+            .with_exposed_port(PG_PORT)
+            .with_volume(
+                format!(
+                    "{0}/csv/aggregate_test_100.csv",
+                    datafusion::test_util::arrow_test_data()
+                ),
+                "/opt/data/csv/aggregate_test_100.csv",
+            )
+            .with_volume(
+                format!("{0}/postgres_create_table.sql", postgres_test_data),
+                "/docker-entrypoint-initdb.d/0_create_table.sql",
+            )
+    }
+
+    pub async fn connect_with_retry(

Review Comment:
   👍 
   
   I think in general the approach of orchestrating the docker containers using 
github CI worked well rather than restarting the containers within the tests



##########
datafusion/core/tests/sqllogictests/src/engines/postgres/mod.rs:
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use log::debug;
+use sqllogictest::{ColumnType, DBOutput};
+use tokio::task::JoinHandle;
+
+use super::conversion::*;
+use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
+use postgres_types::Type;
+use rust_decimal::Decimal;
+use tokio_postgres::{Column, Row};
+
+pub mod image;
+
+pub struct Postgres {
+    client: Arc<tokio_postgres::Client>,
+    join_handle: JoinHandle<()>,
+    file_name: String,
+}
+
+impl Postgres {
+    pub async fn connect_with_retry(
+        file_name: String,
+        host: &str,
+        port: u16,
+        db: &str,
+        user: &str,
+        pass: &str,
+    ) -> Result<Self, tokio_postgres::error::Error> {
+        let mut retry = 0;
+        loop {
+            let connection_result =
+                Postgres::connect(file_name.clone(), host, port, db, user, 
pass).await;
+            match connection_result {
+                Err(e) if retry <= 3 => {
+                    debug!("Retrying connection error '{:?}'", e);
+                    retry += 1;
+                    tokio::time::sleep(Duration::from_secs(1)).await;
+                }
+                result => break result,
+            }
+        }
+    }
+
+    async fn connect(
+        file_name: String,
+        host: &str,
+        port: u16,
+        db: &str,
+        user: &str,
+        pass: &str,
+    ) -> Result<Self, tokio_postgres::error::Error> {
+        let (client, connection) = tokio_postgres::Config::new()
+            .host(host)
+            .port(port)
+            .dbname(db)
+            .user(user)
+            .password(pass)
+            .connect(tokio_postgres::NoTls)
+            .await?;
+
+        let join_handle = tokio::spawn(async move {
+            if let Err(e) = connection.await {
+                log::error!("Postgres connection error: {:?}", e);
+            }
+        });
+
+        Ok(Self {
+            client: Arc::new(client),
+            join_handle,
+            file_name,
+        })
+    }
+}
+
+impl Drop for Postgres {
+    fn drop(&mut self) {
+        self.join_handle.abort()
+    }
+}
+
+macro_rules! make_string {
+    ($row:ident, $idx:ident, $t:ty) => {{
+        let value: Option<$t> = $row.get($idx);
+        match value {
+            Some(value) => value.to_string(),
+            None => NULL_STR.to_string(),
+        }
+    }};
+    ($row:ident, $idx:ident, $t:ty, $convert:ident) => {{
+        let value: Option<$t> = $row.get($idx);
+        match value {
+            Some(value) => $convert(value).to_string(),
+            None => NULL_STR.to_string(),
+        }
+    }};
+}
+
+fn cell_to_string(row: &Row, column: &Column, idx: usize) -> String {

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to