This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 167baf7182 Fix: Do not normalize table names when deserializing from
protobuf (#18187)
167baf7182 is described below
commit 167baf718278112d427910a0296f366909bc3384
Author: Aldrin M <[email protected]>
AuthorDate: Thu Oct 23 18:13:30 2025 -0700
Fix: Do not normalize table names when deserializing from protobuf (#18187)
## Which issue does this PR close?
Closes #18122
## Rationale for this change
Existing behavior is to use the `relation` field of `ColumnRelation`
message to construct a `TableReference` ([mod.rs#L146][fromcol_proto],
[mod.rs#L171][tryfrom_proto]). However, the `relation` field
is a string and `From<String> for TableReference` always calls
parse_identifiers_normalized with `ignore_case: False`, which always
normalizes the identifier to lower case
([TableReference::parse_str][parse_str]).
For a description of the bug at a bit of a higher level, see #18122.
## What changes are included in this PR?
This PR introduces the following:
1. An implementation `From<protobuf::ColumnRelation>` and
`From<&protobuf::ColumnRelation>` for
`TableReference`.
2. Updated logic in `TryFrom<&protobuf::DFSchema>` for `DFSchema` and in
`From<protobuf::Column>` for `Column` that correctly leads to the new
`From` impls for `TableReference` to be invoked.
3. A new method, `TableReference::parse_str_normalized`, that parses an
identifier without normalizing it, with some logic from
`TableReference::parse_str` being refactored to accommodate code reuse.
## Are these changes tested?
Commit a355196 adds a new test case,
`roundtrip_mixed_case_table_reference`, that tests the desired behavior.
The existing behavior (without the fix in 0616df2 and with the extra
line `println!("{}", server_logical_plan.display_indent_schema());`):
```
cargo test "roundtrip_mixed_case_table_reference" --test proto_integration
-- --nocapture
Compiling datafusion-proto v48.0.1
(/Users/aldrinm/code/bauplanlabs/datafusion/octalene-datafusion/datafusion/proto)
Finished `test` profile [unoptimized + debuginfo] target(s) in 1.56s
Running tests/proto_integration.rs
(target/debug/deps/proto_integration-775454d70979734b)
running 1 test
thread
'cases::roundtrip_logical_plan::roundtrip_mixed_case_table_reference' panicked
at datafusion/proto/tests/cases/roundtrip_logical_plan.rs:2690:5:
assertion `left == right` failed
left: "Filter: TestData.a = Int64(1) [a:Int64;N]\n TableScan: TestData
projection=[a], partial_filters=[TestData.a = Int64(1)] [a:Int64;N]"
right: "Filter: testdata.a = Int64(1) [a:Int64;N]\n TableScan: TestData
projection=[a], partial_filters=[testdata.a = Int64(1)] [a:Int64;N]"
note: run with `RUST_BACKTRACE=1` environment variable to display a
backtrace
test cases::roundtrip_logical_plan::roundtrip_mixed_case_table_reference
... FAILED
failures:
failures:
cases::roundtrip_logical_plan::roundtrip_mixed_case_table_reference
test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 112
filtered out; finished in 0.09s
```
With the fix implemented (0616df2):
```
running 1 test
Filter: TestData.a = Int64(1) [a:Int64;N]
TableScan: TestData projection=[a], partial_filters=[TestData.a =
Int64(1)] [a:Int64;N]
test cases::roundtrip_logical_plan::roundtrip_mixed_case_table_reference
... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 112 filtered
out; finished in 0.06s
```
## Are there any user-facing changes?
None.
<!-- Resources -->
[fromcol_proto]:
https://github.com/apache/datafusion/blob/50.2.0/datafusion/proto-common/src/from_proto/mod.rs#L146
[tryfrom_proto]:
https://github.com/apache/datafusion/blob/50.2.0/datafusion/proto-common/src/from_proto/mod.rs#L171
[parse_str]:
https://github.com/apache/datafusion/blob/50.2.0/datafusion/common/src/table_reference.rs#L273
---
datafusion-cli/src/object_storage/instrumented.rs | 8 ++--
datafusion-cli/src/print_options.rs | 2 +-
datafusion/common/src/table_reference.rs | 47 +++++++++++++++-------
datafusion/common/src/utils/mod.rs | 3 ++
datafusion/proto-common/src/from_proto/mod.rs | 13 +++---
datafusion/proto/src/logical_plan/from_proto.rs | 6 +--
.../proto/tests/cases/roundtrip_logical_plan.rs | 46 +++++++++++++++++++++
7 files changed, 96 insertions(+), 29 deletions(-)
diff --git a/datafusion-cli/src/object_storage/instrumented.rs
b/datafusion-cli/src/object_storage/instrumented.rs
index 4465c59a90..c4b63b417f 100644
--- a/datafusion-cli/src/object_storage/instrumented.rs
+++ b/datafusion-cli/src/object_storage/instrumented.rs
@@ -58,7 +58,7 @@ pub enum InstrumentedObjectStoreMode {
impl fmt::Display for InstrumentedObjectStoreMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "{:?}", self)
+ write!(f, "{self:?}")
}
}
@@ -426,7 +426,7 @@ pub enum Operation {
impl fmt::Display for Operation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "{:?}", self)
+ write!(f, "{self:?}")
}
}
@@ -556,11 +556,11 @@ impl RequestSummaries {
let size_stats = s.size_stats.as_ref();
let dur_avg = duration_stats.map(|d| {
let avg = d.sum.as_secs_f32() / count;
- format!("{:.6}s", avg)
+ format!("{avg:.6}s")
});
let size_avg = size_stats.map(|s| {
let avg = s.sum as f32 / count;
- format!("{} B", avg)
+ format!("{avg} B")
});
[dur_avg, size_avg]
})
diff --git a/datafusion-cli/src/print_options.rs
b/datafusion-cli/src/print_options.rs
index 5804617f39..93d1d450fd 100644
--- a/datafusion-cli/src/print_options.rs
+++ b/datafusion-cli/src/print_options.rs
@@ -206,7 +206,7 @@ impl PrintOptions {
writeln!(writer, "Summaries:")?;
let summaries = RequestSummaries::new(&requests);
- writeln!(writer, "{}", summaries)?;
+ writeln!(writer, "{summaries}")?;
}
}
}
diff --git a/datafusion/common/src/table_reference.rs
b/datafusion/common/src/table_reference.rs
index 7cf8e7af1a..5744658567 100644
--- a/datafusion/common/src/table_reference.rs
+++ b/datafusion/common/src/table_reference.rs
@@ -269,24 +269,41 @@ impl TableReference {
}
/// Forms a [`TableReference`] by parsing `s` as a multipart SQL
- /// identifier. See docs on [`TableReference`] for more details.
+ /// identifier, normalizing `s` to lowercase.
+ /// See docs on [`TableReference`] for more details.
pub fn parse_str(s: &str) -> Self {
- let mut parts = parse_identifiers_normalized(s, false);
+ Self::parse_str_normalized(s, false)
+ }
+
+ /// Forms a [`TableReference`] by parsing `s` as a multipart SQL
+ /// identifier, normalizing `s` to lowercase if `ignore_case` is `false`.
+ /// See docs on [`TableReference`] for more details.
+ pub fn parse_str_normalized(s: &str, ignore_case: bool) -> Self {
+ let table_parts = parse_identifiers_normalized(s, ignore_case);
+ Self::from_vec(table_parts).unwrap_or_else(|| Self::Bare { table:
s.into() })
+ }
+
+ /// Consume a vector of identifier parts to compose a [`TableReference`].
The input vector
+ /// should contain 1 <= N <= 3 elements in the following sequence:
+ /// ```no_rust
+ /// [<catalog>, <schema>, table]
+ /// ```
+ fn from_vec(mut parts: Vec<String>) -> Option<Self> {
match parts.len() {
- 1 => Self::Bare {
- table: parts.remove(0).into(),
- },
- 2 => Self::Partial {
- schema: parts.remove(0).into(),
- table: parts.remove(0).into(),
- },
- 3 => Self::Full {
- catalog: parts.remove(0).into(),
- schema: parts.remove(0).into(),
- table: parts.remove(0).into(),
- },
- _ => Self::Bare { table: s.into() },
+ 1 => Some(Self::Bare {
+ table: parts.pop()?.into(),
+ }),
+ 2 => Some(Self::Partial {
+ table: parts.pop()?.into(),
+ schema: parts.pop()?.into(),
+ }),
+ 3 => Some(Self::Full {
+ table: parts.pop()?.into(),
+ schema: parts.pop()?.into(),
+ catalog: parts.pop()?.into(),
+ }),
+ _ => None,
}
}
diff --git a/datafusion/common/src/utils/mod.rs
b/datafusion/common/src/utils/mod.rs
index c72e3b3a8d..045c02a5a2 100644
--- a/datafusion/common/src/utils/mod.rs
+++ b/datafusion/common/src/utils/mod.rs
@@ -285,6 +285,9 @@ pub(crate) fn parse_identifiers(s: &str) ->
Result<Vec<Ident>> {
Ok(idents)
}
+/// Parse a string into a vector of identifiers.
+///
+/// Note: If ignore_case is false, the string will be normalized to lowercase.
#[cfg(feature = "sql")]
pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) ->
Vec<String> {
parse_identifiers(s)
diff --git a/datafusion/proto-common/src/from_proto/mod.rs
b/datafusion/proto-common/src/from_proto/mod.rs
index 2d07fb8410..4ede5b970e 100644
--- a/datafusion/proto-common/src/from_proto/mod.rs
+++ b/datafusion/proto-common/src/from_proto/mod.rs
@@ -138,11 +138,17 @@ where
}
}
+impl From<protobuf::ColumnRelation> for TableReference {
+ fn from(rel: protobuf::ColumnRelation) -> Self {
+ Self::parse_str_normalized(rel.relation.as_str(), true)
+ }
+}
+
impl From<protobuf::Column> for Column {
fn from(c: protobuf::Column) -> Self {
let protobuf::Column { relation, name } = c;
- Self::new(relation.map(|r| r.relation), name)
+ Self::new(relation, name)
}
}
@@ -164,10 +170,7 @@ impl TryFrom<&protobuf::DfSchema> for DFSchema {
.map(|df_field| {
let field: Field = df_field.field.as_ref().required("field")?;
Ok((
- df_field
- .qualifier
- .as_ref()
- .map(|q| q.relation.clone().into()),
+ df_field.qualifier.as_ref().map(|q| q.clone().into()),
Arc::new(field),
))
})
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index 4296867049..507a0cec9d 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -315,8 +315,7 @@ pub fn parse_expr(
let null_treatment =
protobuf::NullTreatment::try_from(null_treatment)
.map_err(|_| {
proto_error(format!(
- "Received a WindowExprNode message with unknown
NullTreatment {}",
- null_treatment
+ "Received a WindowExprNode message with unknown
NullTreatment {null_treatment}",
))
})?;
Some(NullTreatment::from(null_treatment))
@@ -596,8 +595,7 @@ pub fn parse_expr(
let null_treatment =
protobuf::NullTreatment::try_from(null_treatment)
.map_err(|_| {
proto_error(format!(
- "Received an AggregateUdfExprNode message with
unknown NullTreatment {}",
- null_treatment
+ "Received an AggregateUdfExprNode message with
unknown NullTreatment {null_treatment}",
))
})?;
Some(NullTreatment::from(null_treatment))
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 516f178cc0..18cd8b8e66 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -2804,3 +2804,49 @@ async fn roundtrip_arrow_scan() -> Result<()> {
assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
Ok(())
}
+
+#[tokio::test]
+async fn roundtrip_mixed_case_table_reference() -> Result<()> {
+ // Prepare "client" database
+ let client_ctx = SessionContext::new_with_config(
+ SessionConfig::new()
+ .set_bool("datafusion.sql_parser.enable_ident_normalization",
false),
+ );
+ client_ctx
+ .register_csv(
+ "\"TestData\"",
+ "tests/testdata/test.csv",
+ CsvReadOptions::default(),
+ )
+ .await?;
+
+ // Prepare "server" database
+ let server_ctx = SessionContext::new_with_config(
+ SessionConfig::new()
+ .set_bool("datafusion.sql_parser.enable_ident_normalization",
false),
+ );
+ server_ctx
+ .register_csv(
+ "\"TestData\"",
+ "tests/testdata/test.csv",
+ CsvReadOptions::default(),
+ )
+ .await?;
+
+ // Create a logical plan, serialize it (client), then deserialize it
(server)
+ let dataframe = client_ctx
+ .sql("SELECT a FROM TestData WHERE TestData.a = 1")
+ .await?;
+
+ let client_logical_plan = dataframe.into_optimized_plan()?;
+ let plan_bytes = logical_plan_to_bytes(&client_logical_plan)?;
+ let server_logical_plan =
+ logical_plan_from_bytes(&plan_bytes, &server_ctx.task_ctx())?;
+
+ assert_eq!(
+ format!("{}", client_logical_plan.display_indent_schema()),
+ format!("{}", server_logical_plan.display_indent_schema())
+ );
+
+ Ok(())
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]