melgenek commented on code in PR #4834:
URL: https://github.com/apache/arrow-datafusion/pull/4834#discussion_r1080579104
##########
datafusion/core/Cargo.toml:
##########
@@ -104,17 +104,22 @@ xz2 = { version = "0.1", optional = true }
[dev-dependencies]
arrow = { version = "31.0.0", features = ["prettyprint", "dyn_cmp_dict"] }
async-trait = "0.1.53"
+bigdecimal = "0.3.0"
criterion = "0.4"
csv = "1.1.6"
ctor = "0.1.22"
doc-comment = "0.3"
env_logger = "0.10"
+half = "2.2.1"
parquet-test-utils = { path = "../../parquet-test-utils" }
+postgres-types = { version = "0.2.4", features = ["derive", "with-chrono-0_4"]
}
rstest = "0.16.0"
+rust_decimal = { version = "1.27.0", features = ["tokio-pg"] }
sqllogictest = "0.10.0"
test-utils = { path = "../../test-utils" }
+testcontainers = "0.14.0"
thiserror = "1.0.37"
-
+tokio-postgres = "0.7.7"
Review Comment:
Here are the reasons for these dependencies:
- `half` - `f16` type for Datafusion
- `testcontainers` - creates a fresh docker container with Postgres for each
sqllogictest file.
- `postgres-types` and `tokio-postgres` - these are required for writing a
Postgres client
- `rust_decimal` - converts Postgres "numeric" type to a rust type
- `bigdecimal` - provides a common type to do floating number rounding.
`rust_decimal`, unfortunately, doesn't handle numbers of arbitrary precision.
For example, `rust_decimal` could not parse
`26156334342021890000000000000000000000` that is currently present in one of
`.slt` tests in Datafusion.
##########
datafusion/core/Cargo.toml:
##########
@@ -112,8 +112,9 @@ parquet-test-utils = { path = "../../parquet-test-utils" }
rstest = "0.16.0"
sqllogictest = "0.10.0"
test-utils = { path = "../../test-utils" }
+tokio-postgres = "0.7.7"
thiserror = "1.0.37"
-
+testcontainers = "0.14.0"
Review Comment:
Added a `PG_COMPAT` env var. This var makes sqllogictest tests use Postgres
as a runner.
##########
datafusion/core/tests/sqllogictests/src/engines/conversion.rs:
##########
@@ -0,0 +1,73 @@
+use bigdecimal::BigDecimal;
+use half::f16;
+use rust_decimal::prelude::*;
+use rust_decimal::Decimal;
+
+pub const NULL_STR: &str = "NULL";
+
+pub fn bool_to_str(value: bool) -> String {
+ if value {
+ "true".to_string()
+ } else {
+ "false".to_string()
+ }
+}
+
+pub fn varchar_to_str(value: &str) -> String {
+ if value.is_empty() {
+ "(empty)".to_string()
+ } else {
+ value.to_string()
+ }
+}
+
+pub fn f16_to_str(value: f16) -> String {
+ if value.is_nan() {
+ "NaN".to_string()
+ } else if value == f16::INFINITY {
+ "Infinity".to_string()
+ } else if value == f16::NEG_INFINITY {
+ "-Infinity".to_string()
+ } else {
+ big_decimal_to_str(BigDecimal::from_str(&value.to_string()).unwrap())
+ }
+}
+
+pub fn f32_to_str(value: f32) -> String {
+ if value.is_nan() {
+ "NaN".to_string()
+ } else if value == f32::INFINITY {
+ "Infinity".to_string()
+ } else if value == f32::NEG_INFINITY {
+ "-Infinity".to_string()
+ } else {
+ big_decimal_to_str(BigDecimal::from_str(&value.to_string()).unwrap())
+ }
+}
+
+pub fn f64_to_str(value: f64) -> String {
+ if value.is_nan() {
+ "NaN".to_string()
+ } else if value == f64::INFINITY {
+ "Infinity".to_string()
+ } else if value == f64::NEG_INFINITY {
+ "-Infinity".to_string()
+ } else {
+ big_decimal_to_str(BigDecimal::from_str(&value.to_string()).unwrap())
+ }
+}
+
+pub fn i128_to_str(value: i128, scale: u32) -> String {
+ big_decimal_to_str(
+ BigDecimal::from_str(&Decimal::from_i128_with_scale(value,
scale).to_string())
+ .unwrap(),
+ )
+}
+
+pub fn decimal_to_str(value: Decimal) -> String {
+ big_decimal_to_str(BigDecimal::from_str(&value.to_string()).unwrap())
+}
+
+pub fn big_decimal_to_str(value: BigDecimal) -> String {
+ value.round(12).normalized().to_string()
Review Comment:
All numbers are rounded to `12` decimal digits. Without explicit types
Postgres and Datafusion can choose different underlying types. For example,
Postgres could choose to use `numeric` when Datafusion uses `int`. In order to
compare results, all floating number types are converted to the same number of
decimal points.
`12` is chosen to pass the existing set of tests. I think it could produce
errors, for example, when rounding `f16` to 12 digits. I would probably use `3`
(or `4`) decimal digits if high precision is not required for Postgres
compatibility tests. `3` or `4` is an expected number of digits for 16 bit
binary according to
[IEEE_754](https://en.wikipedia.org/wiki/IEEE_754#Basic_and_interchange_formats),
so it should be safer to round to the smallest possible data type.
##########
datafusion/core/tests/sqllogictests/src/engines/postgres/mod.rs:
##########
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use log::info;
+use sqllogictest::{ColumnType, DBOutput};
+use testcontainers::core::WaitFor;
+use testcontainers::images::generic::GenericImage;
+use tokio::task::JoinHandle;
+
+pub struct Postgres {
Review Comment:
The implementation changed to provide the same type conversions for both
Datafusion and Postgres.
##########
datafusion/core/tests/sqllogictests/src/engines/postgres/mod.rs:
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use log::debug;
+use sqllogictest::{ColumnType, DBOutput};
+use tokio::task::JoinHandle;
+
+use super::conversion::*;
+use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
+use postgres_types::Type;
+use rust_decimal::Decimal;
+use tokio_postgres::{Column, Row};
+
+pub mod image;
+
+pub struct Postgres {
+ client: Arc<tokio_postgres::Client>,
+ join_handle: JoinHandle<()>,
+ file_name: String,
+}
+
+impl Postgres {
+ pub async fn connect_with_retry(
+ file_name: String,
+ host: &str,
+ port: u16,
+ db: &str,
+ user: &str,
+ pass: &str,
+ ) -> Result<Self, tokio_postgres::error::Error> {
+ let mut retry = 0;
+ loop {
+ let connection_result =
+ Postgres::connect(file_name.clone(), host, port, db, user,
pass).await;
+ match connection_result {
+ Err(e) if retry <= 3 => {
+ debug!("Retrying connection error '{:?}'", e);
+ retry += 1;
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ }
+ result => break result,
+ }
+ }
+ }
+
+ async fn connect(
+ file_name: String,
+ host: &str,
+ port: u16,
+ db: &str,
+ user: &str,
+ pass: &str,
+ ) -> Result<Self, tokio_postgres::error::Error> {
+ let (client, connection) = tokio_postgres::Config::new()
+ .host(host)
+ .port(port)
+ .dbname(db)
+ .user(user)
+ .password(pass)
+ .connect(tokio_postgres::NoTls)
+ .await?;
+
+ let join_handle = tokio::spawn(async move {
+ if let Err(e) = connection.await {
+ log::error!("Postgres connection error: {:?}", e);
+ }
+ });
+
+ Ok(Self {
+ client: Arc::new(client),
+ join_handle,
+ file_name,
+ })
+ }
+}
+
+impl Drop for Postgres {
+ fn drop(&mut self) {
+ self.join_handle.abort()
+ }
+}
+
+macro_rules! make_string {
+ ($row:ident, $idx:ident, $t:ty) => {{
+ let value: Option<$t> = $row.get($idx);
+ match value {
+ Some(value) => value.to_string(),
+ None => NULL_STR.to_string(),
+ }
+ }};
+ ($row:ident, $idx:ident, $t:ty, $convert:ident) => {{
+ let value: Option<$t> = $row.get($idx);
+ match value {
+ Some(value) => $convert(value).to_string(),
+ None => NULL_STR.to_string(),
+ }
+ }};
+}
+
+fn cell_to_string(row: &Row, column: &Column, idx: usize) -> String {
Review Comment:
The implementation of Postgres client is based on the "postgres-extended"
from the sqllogictest-rs repo
https://github.com/risinglightdb/sqllogictest-rs/blob/2fb7e36e1857fd6b7949956b496c26ddc463f858/sqllogictest-bin/src/engines/postgres_extended.rs.
All the type conversions are now through rust like in the Datafusion. So
every type has to be converted to a string explicitly.
I implemented the types marked as implemented from
https://github.com/apache/arrow-datafusion/blob/master/docs/source/user-guide/sql/data_types.md
and made their text representation compatible with Datafusion.
I haven't implemented arrays.
##########
datafusion/core/tests/sqllogictests/src/main.rs:
##########
@@ -15,144 +15,162 @@
// specific language governing permissions and limitations
// under the License.
-use async_trait::async_trait;
-use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::prelude::SessionContext;
-use datafusion_sql::parser::{DFParser, Statement};
-use log::info;
-use normalize::convert_batches;
-use sqllogictest::DBOutput;
-use sqlparser::ast::Statement as SQLStatement;
-use std::path::Path;
-use std::time::Duration;
-
-use crate::error::{DFSqlLogicTestError, Result};
-use crate::insert::insert;
-
-mod error;
-mod insert;
-mod normalize;
-mod setup;
-mod utils;
+use std::error::Error;
+use std::path::{Path, PathBuf};
-const TEST_DIRECTORY: &str = "tests/sqllogictests/test_files";
-
-pub struct DataFusion {
- ctx: SessionContext,
- file_name: String,
-}
+use log::{debug, info};
+use testcontainers::clients::Cli as Docker;
-#[async_trait]
-impl sqllogictest::AsyncDB for DataFusion {
- type Error = DFSqlLogicTestError;
+use datafusion::prelude::SessionContext;
- async fn run(&mut self, sql: &str) -> Result<DBOutput> {
- println!("[{}] Running query: \"{}\"", self.file_name, sql);
- let result = run_query(&self.ctx, sql).await?;
- Ok(result)
- }
+use crate::engines::datafusion::DataFusion;
+use crate::engines::postgres;
+use crate::engines::postgres::image::{PG_DB, PG_PASSWORD, PG_PORT, PG_USER};
+use crate::engines::postgres::Postgres;
- /// Engine name of current database.
- fn engine_name(&self) -> &str {
- "DataFusion"
- }
+mod engines;
+mod setup;
+mod utils;
- /// [`Runner`] calls this function to perform sleep.
- ///
- /// The default implementation is `std::thread::sleep`, which is
universial to any async runtime
- /// but would block the current thread. If you are running in tokio
runtime, you should override
- /// this by `tokio::time::sleep`.
- async fn sleep(dur: Duration) {
- tokio::time::sleep(dur).await;
- }
-}
+const TEST_DIRECTORY: &str = "tests/sqllogictests/test_files";
+const PG_COMPAT_FILE_PREFIX: &str = "pg_compat_";
#[tokio::main]
#[cfg(target_family = "windows")]
-pub async fn main() -> Result<()> {
+pub async fn main() -> Result<(), Box<dyn Error>> {
println!("Skipping test on windows");
Ok(())
}
#[tokio::main]
#[cfg(not(target_family = "windows"))]
-pub async fn main() -> Result<()> {
+pub async fn main() -> Result<(), Box<dyn Error>> {
// Enable logging (e.g. set RUST_LOG=debug to see debug logs)
-
- use sqllogictest::{default_validator, update_test_file};
env_logger::init();
let options = Options::new();
- // default to all files in test directory filtering based on name
- let files: Vec<_> = std::fs::read_dir(TEST_DIRECTORY)
- .unwrap()
- .map(|path| path.unwrap().path())
- .filter(|path| options.check_test_file(path.as_path()))
- .collect();
+ let files: Vec<_> = read_test_files(&options);
info!("Running test files {:?}", files);
for path in files {
- println!("Running: {}", path.display());
-
let file_name =
path.file_name().unwrap().to_str().unwrap().to_string();
+ let is_pg_compatibility_test =
file_name.starts_with(PG_COMPAT_FILE_PREFIX);
- // Create the test runner
- let ctx = context_for_test_file(&file_name).await;
- let mut runner = sqllogictest::Runner::new(DataFusion { ctx, file_name
});
-
- // run each file using its own new DB
- //
- // We could run these tests in parallel eventually if we wanted.
if options.complete_mode {
- info!("Using complete mode to complete {}", path.display());
- let col_separator = " ";
- let validator = default_validator;
- update_test_file(path, runner, col_separator, validator)
- .await
- .map_err(|e| e.to_string())?;
+ run_complete_file(&path, file_name,
is_pg_compatibility_test).await?;
+ } else if options.postgres_runner {
+ if is_pg_compatibility_test {
+ run_test_file_with_postgres(&path, file_name).await?;
+ } else {
+ debug!("Skipping test file {:?}", path);
+ }
Review Comment:
The tests which name starts with `pg_compat_` would run with Datafusion
during `cargo test`.
In order to check compatibility with Postgres, there is a github job that
runs these tests with a Postgres runner.
Thus Datafusion runs all the time. And Postgres runs only explicitly. This
way `cargo test` is not affected by the speed of running docker containers and
executing Postgres queries.
##########
datafusion/core/tests/sqllogictests/src/engines/datafusion/normalize.rs:
##########
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::{array, array::ArrayRef, datatypes::DataType,
record_batch::RecordBatch};
+use datafusion::error::DataFusionError;
+use sqllogictest::{ColumnType, DBOutput};
+
+use super::super::conversion::*;
+use super::error::{DFSqlLogicTestError, Result};
+
+/// Converts `batches` to a DBOutput as expected by sqllogicteset.
+///
+/// Assumes empty record batches are a successful statement completion
+///
+pub fn convert_batches(
+ batches: Vec<RecordBatch>,
+ is_pg_compatibility_test: bool,
+) -> Result<DBOutput> {
+ if batches.is_empty() {
+ // DataFusion doesn't report number of rows complete
+ return Ok(DBOutput::StatementComplete(0));
+ }
+
+ let schema = batches[0].schema();
+
+ // TODO: report the the actual types of the result
+ // https://github.com/apache/arrow-datafusion/issues/4499
+ let types = vec![ColumnType::Any; batches[0].num_columns()];
+
+ let mut rows = vec![];
+ for batch in batches {
+ // Verify schema
+ if schema != batch.schema() {
+ return
Err(DFSqlLogicTestError::DataFusion(DataFusionError::Internal(
+ format!(
+ "Schema mismatch. Previously had\n{:#?}\n\nGot:\n{:#?}",
+ schema,
+ batch.schema()
+ ),
+ )));
+ }
+ rows.append(&mut convert_batch(batch, is_pg_compatibility_test)?);
+ }
+
+ Ok(DBOutput::Rows { types, rows })
+}
+
+/// Convert a single batch to a `Vec<Vec<String>>` for comparison
+fn convert_batch(
+ batch: RecordBatch,
+ is_pg_compatibility_test: bool,
+) -> Result<Vec<Vec<String>>> {
+ (0..batch.num_rows())
+ .map(|row| {
+ batch
+ .columns()
+ .iter()
+ .map(|col| cell_to_string(col, row, is_pg_compatibility_test))
+ .collect::<Result<Vec<String>>>()
+ })
+ .collect()
+}
+
+macro_rules! get_row_value {
+ ($array_type:ty, $column: ident, $row: ident) => {{
+ let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
+
+ array.value($row)
+ }};
+}
+
+/// Normalizes the content of a single cell in RecordBatch prior to printing.
+///
+/// This is to make the output comparable to the semi-standard .slt format
+///
+/// Normalizations applied to [NULL Values and empty strings]
+///
+/// [NULL Values and empty strings]:
https://duckdb.org/dev/sqllogictest/result_verification#null-values-and-empty-strings
+///
+/// Floating numbers are rounded to have a consistent representation with the
Postgres runner.
+///
+pub fn cell_to_string(
+ col: &ArrayRef,
+ row: usize,
+ is_pg_compatibility_test: bool,
+) -> Result<String> {
+ if !col.is_valid(row) {
+ // represent any null value with the string "NULL"
+ Ok(NULL_STR.to_string())
+ } else if is_pg_compatibility_test {
Review Comment:
If a test is a Postgres compatibility test then some conversions apply. The
same conversions are used in the Postgres client implementation.
This is needed, because existing sqllogictests use higher precision of
floating number results, which would produce different results in Postgres.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]