This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-22529-0d3fb77a136d77c8e9aa34aa5db2d6162e256232 in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit 7127f8bd79a7ab40cc0189540b912c5415ed6caa Author: Kumar Ujjawal <[email protected]> AuthorDate: Sat Jun 6 01:22:34 2026 +0530 feat: Add Spark SQL parser dialect config (#22529) ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #21653. ## Rationale for this change This lets users combine Spark-compatible functions with Spark SQL parsing when they choose the dialect. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? - Add `Spark` to the SQL parser dialect config enum. - Accept `spark` and `sparksql` as dialect config values. - Use Spark dialect for Spark sqllogictests. - Keep `with_spark_features()` config-neutral. - Update dialect docs and expected config output. - Add tests for Spark dialect parsing and Spark SQL execution with Spark functions. <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? Users can now set `datafusion.sql_parser.dialect` to `spark` or `sparksql`. <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Andrew Lamb <[email protected]> --- Cargo.lock | 1 + datafusion-cli/src/exec.rs | 12 +++----- datafusion/common/src/config.rs | 31 ++++++++++++++++--- datafusion/core/src/execution/session_state.rs | 10 +++--- datafusion/spark/Cargo.toml | 3 +- datafusion/spark/src/session_state.rs | 36 ++++++++++++++++++++++ datafusion/sqllogictest/src/test_context.rs | 4 +++ .../sqllogictest/test_files/information_schema.slt | 2 +- .../test_files/spark/collection/size.slt | 2 +- docs/source/library-user-guide/upgrading/54.0.0.md | 5 +++ docs/source/user-guide/configs.md | 2 +- 11 files changed, 86 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cdfd97abcd..67f9c1cf47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2579,6 +2579,7 @@ dependencies = [ "serde_json", "sha1 0.11.0", "sha2", + "tokio", "twox-hash", "url", ] diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 09347d6d7d..800e33f645 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -28,7 +28,7 @@ use crate::{ }; use datafusion::common::instant::Instant; use datafusion::common::{plan_datafusion_err, plan_err}; -use datafusion::config::ConfigFileType; +use datafusion::config::{ConfigFileType, Dialect}; use datafusion::datasource::listing::ListingTableUrl; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::memory_pool::MemoryConsumer; @@ -223,9 +223,8 @@ pub(super) async fn exec_and_print( let dialect = &options.sql_parser.dialect; let dialect = dialect_from_str(dialect).ok_or_else(|| { plan_datafusion_err!( - "Unsupported SQL dialect: {dialect}. Available dialects: \ - Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \ - MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks." + "Unsupported SQL dialect: {dialect}. Available dialects: {}.", + Dialect::AVAILABLE ) })?; @@ -613,9 +612,8 @@ mod tests { let dialect = &task_ctx.session_config().options().sql_parser.dialect; let dialect = dialect_from_str(dialect).ok_or_else(|| { plan_datafusion_err!( - "Unsupported SQL dialect: {dialect}. Available dialects: \ - Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \ - MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks." + "Unsupported SQL dialect: {dialect}. Available dialects: {}.", + Dialect::AVAILABLE ) })?; for location in locations { diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index ab1405054c..4025157cef 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -279,7 +279,7 @@ config_namespace! { pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, - /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks. + /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks and Spark. pub dialect: Dialect, default = Dialect::Generic // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive @@ -342,6 +342,13 @@ pub enum Dialect { Ansi, DuckDB, Databricks, + Spark, +} + +impl Dialect { + /// List of all supported dialect names, for use in error messages. + pub const AVAILABLE: &'static str = "Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \ + MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks, Spark"; } impl AsRef<str> for Dialect { @@ -360,6 +367,7 @@ impl AsRef<str> for Dialect { Self::Ansi => "ansi", Self::DuckDB => "duckdb", Self::Databricks => "databricks", + Self::Spark => "spark", } } } @@ -382,11 +390,12 @@ impl FromStr for Dialect { "ansi" => Self::Ansi, "duckdb" => Self::DuckDB, "databricks" => Self::Databricks, + "spark" | "sparksql" => Self::Spark, other => { - let error_message = format!( - "Invalid Dialect: {other}. Expected one of: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks" - ); - return Err(DataFusionError::Configuration(error_message)); + return Err(DataFusionError::Configuration(format!( + "Invalid Dialect: {other}. Expected one of: {}", + Self::AVAILABLE + ))); } }; Ok(value) @@ -4161,6 +4170,18 @@ mod tests { assert_eq!(cdc.norm_level, 0); } + #[test] + fn test_dialect_spark_roundtrip() { + use crate::config::Dialect; + use std::str::FromStr; + + assert_eq!(Dialect::from_str("spark").unwrap(), Dialect::Spark); + assert_eq!(Dialect::from_str("sparksql").unwrap(), Dialect::Spark); + assert_eq!(Dialect::from_str("SPARK").unwrap(), Dialect::Spark); + assert_eq!(Dialect::Spark.as_ref(), "spark"); + assert_eq!(Dialect::Spark.to_string(), "spark"); + } + #[test] fn max_row_group_bytes_rejects_zero() { use crate::config::MaxRowGroupBytes; diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index ed2ea27cf4..dfd1eea709 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -439,9 +439,8 @@ impl SessionState { ) -> datafusion_common::Result<Statement> { let dialect = dialect_from_str(dialect).ok_or_else(|| { plan_datafusion_err!( - "Unsupported SQL dialect: {dialect}. Available dialects: \ - Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \ - MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks." + "Unsupported SQL dialect: {dialect}. Available dialects: {}.", + Dialect::AVAILABLE ) })?; @@ -488,9 +487,8 @@ impl SessionState { ) -> datafusion_common::Result<SQLExprWithAlias> { let dialect = dialect_from_str(dialect).ok_or_else(|| { plan_datafusion_err!( - "Unsupported SQL dialect: {dialect}. Available dialects: \ - Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \ - MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks." + "Unsupported SQL dialect: {dialect}. Available dialects: {}.", + Dialect::AVAILABLE ) })?; diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml index 14f9396d76..93987b553f 100644 --- a/datafusion/spark/Cargo.toml +++ b/datafusion/spark/Cargo.toml @@ -71,7 +71,8 @@ url = { workspace = true } arrow = { workspace = true, features = ["test_utils"] } criterion = { workspace = true } # for SessionStateBuilderSpark tests -datafusion = { workspace = true, default-features = false } +datafusion = { workspace = true, default-features = false, features = ["sql"] } +tokio = { workspace = true, features = ["rt"] } [[bench]] harness = false diff --git a/datafusion/spark/src/session_state.rs b/datafusion/spark/src/session_state.rs index e39de3a588..839487772a 100644 --- a/datafusion/spark/src/session_state.rs +++ b/datafusion/spark/src/session_state.rs @@ -88,6 +88,9 @@ impl SessionStateBuilderSpark for SessionStateBuilder { #[cfg(test)] mod tests { use super::*; + use datafusion::common::config::Dialect; + use datafusion::prelude::SessionConfig; + use datafusion::prelude::SessionContext; #[test] fn test_session_state_with_spark_features() { @@ -108,4 +111,37 @@ mod tests { "Apache Spark expr planners should be registered" ); } + + #[tokio::test] + async fn test_spark_dialect_with_spark_functions() { + let query = "SELECT sha2('abc', 256), CAST(1 AS LONG)"; + + let mut config = SessionConfig::new(); + config.options_mut().sql_parser.dialect = Dialect::Spark; + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .with_spark_features() + .build(); + let ctx = SessionContext::new_with_state(state); + + let result = ctx.sql(query).await.unwrap().collect().await.unwrap(); + assert_eq!(result.len(), 1); + assert_eq!(result[0].num_rows(), 1); + + let mut config = SessionConfig::new(); + config.options_mut().sql_parser.dialect = Dialect::Generic; + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .with_spark_features() + .build(); + let ctx = SessionContext::new_with_state(state); + + let err = ctx.sql(query).await.unwrap_err().to_string(); + assert!( + err.contains("Unsupported SQL type LONG"), + "unexpected error: {err}" + ); + } } diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index f9b7663108..8d437271fe 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -36,6 +36,7 @@ use arrow::record_batch::RecordBatch; use datafusion::catalog::{ CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider, Session, }; +use datafusion::common::config::Dialect; use datafusion::common::{DataFusionError, Result, not_impl_err}; use datafusion::functions::math::abs; use datafusion::logical_expr::async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl}; @@ -151,6 +152,9 @@ impl TestContext { if is_spark_path(relative_path) { state_builder = state_builder.with_spark_features(); + if let Some(config) = state_builder.config() { + config.options_mut().sql_parser.dialect = Dialect::Spark; + } } if matches!( diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 370492c2eb..840bff6ea6 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -505,7 +505,7 @@ datafusion.runtime.temp_directory NULL The path to the temporary file directory. datafusion.spark.map_key_dedup_policy EXCEPTION Policy for handling duplicate keys in Spark-compatible map-construction functions (`map_from_arrays`, `map_from_entries`, `str_to_map`). Mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961): - `EXCEPTION` (default): raise `[DUPLICATED_MAP_KEY]` at runtime on any duplicate key. - `LAST_WIN`: [...] datafusion.sql_parser.collect_spans false When set to true, the source locations relative to the original SQL query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected and recorded in the logical plan nodes. datafusion.sql_parser.default_null_ordering nulls_max Specifies the default null ordering for query results. There are 4 options: - `nulls_max`: Nulls appear last in ascending order. - `nulls_min`: Nulls appear first in ascending order. - `nulls_first`: Nulls always be first in any order. - `nulls_last`: Nulls always be last in any order. By default, `nulls_max` is used to follow Postgres's behavior. postgres rule: <https://www.postgresql.org/docs/current/queries-order.html> -datafusion.sql_parser.dialect generic Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks. +datafusion.sql_parser.dialect generic Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks and Spark. datafusion.sql_parser.enable_ident_normalization true When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) datafusion.sql_parser.enable_options_value_normalization false When set to true, SQL parser will normalize options value (convert value to lowercase). Note that this option is ignored and will be removed in the future. All case-insensitive values are normalized automatically. datafusion.sql_parser.enable_subquery_sort_elimination true When set to true, DataFusion may remove `ORDER BY` clauses from subqueries or CTEs during SQL planning when their ordering cannot affect the result, such as when no `LIMIT` or other order-sensitive operator depends on them. Disable this option to preserve explicit subquery ordering in the planned query. diff --git a/datafusion/sqllogictest/test_files/spark/collection/size.slt b/datafusion/sqllogictest/test_files/spark/collection/size.slt index 106760eebf..b9c445f4e6 100644 --- a/datafusion/sqllogictest/test_files/spark/collection/size.slt +++ b/datafusion/sqllogictest/test_files/spark/collection/size.slt @@ -84,7 +84,7 @@ SELECT size(make_array(1, NULL, 3)); # NULL array returns -1 (Spark behavior) query I -SELECT size(NULL::int[]); +SELECT size(CAST(NULL AS ARRAY<INT>)); ---- -1 diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index aa90cca10b..c71b2ccd6b 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -932,3 +932,8 @@ match register_function { RegisterFunction::Table(name, table) => {}, } ``` + +### New `Dialect::Spark` variant + +The `Dialect` enum in `datafusion_common::config` now includes a `Spark` variant. +If you match exhaustively on `Dialect`, add a `Dialect::Spark` arm. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 442b72ea9b..cc679549de 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -192,7 +192,7 @@ The following configuration settings are available: | datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type [...] | datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) [...] | datafusion.sql_parser.enable_options_value_normalization | false | When set to true, SQL parser will normalize options value (convert value to lowercase). Note that this option is ignored and will be removed in the future. All case-insensitive values are normalized automatically. [...] -| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks. [...] +| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks and Spark. [...] | datafusion.sql_parser.support_varchar_with_length | true | If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but ignore the length. If false, error if a `VARCHAR` with a length is specified. The Arrow type system does not have a notion of maximum string length and thus DataFusion can not enforce such limits. [...] | datafusion.sql_parser.map_string_types_to_utf8view | true | If true, string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning. If false, they are mapped to `Utf8`. Default is true. [...] | datafusion.sql_parser.collect_spans | false | When set to true, the source locations relative to the original SQL query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected and recorded in the logical plan nodes. [...] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
