This is an automated email from the ASF dual-hosted git repository.
kazuyukitanimura pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new a9868943e feat: make parquet native scan schema case insensitive
(#1575)
a9868943e is described below
commit a9868943eec399185fe546cd9bd184c43ca0d757
Author: Zhen Wang <[email protected]>
AuthorDate: Sat Mar 29 01:36:52 2025 +0800
feat: make parquet native scan schema case insensitive (#1575)
## Which issue does this PR close?
Part of #1574.
## Rationale for this change
The data schema of spark parquet datasource scan may not be consistent with
the actual schema of file. This may cause the `projection/filter` to not behave
as expected.
## What changes are included in this PR?
This PR makes `data schema` and `file schema` case insensitive in schema
adapter, but it does not affect `pruning_predicate` and
`page_pruning_predicate`.
## How are these changes tested?
added unit test
---
native/core/src/parquet/parquet_exec.rs | 1 +
native/core/src/parquet/parquet_support.rs | 4 ++
native/core/src/parquet/schema_adapter.rs | 51 ++++++++++++++++------
.../apache/comet/parquet/ParquetReadSuite.scala | 19 ++++++++
4 files changed, 62 insertions(+), 13 deletions(-)
diff --git a/native/core/src/parquet/parquet_exec.rs
b/native/core/src/parquet/parquet_exec.rs
index 85a3d023c..a39ee259f 100644
--- a/native/core/src/parquet/parquet_exec.rs
+++ b/native/core/src/parquet/parquet_exec.rs
@@ -110,6 +110,7 @@ fn get_options(session_timezone: &str) ->
(TableParquetOptions, SparkParquetOpti
let mut spark_parquet_options =
SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false);
spark_parquet_options.allow_cast_unsigned_ints = true;
+ spark_parquet_options.case_sensitive = false;
(table_parquet_options, spark_parquet_options)
}
diff --git a/native/core/src/parquet/parquet_support.rs
b/native/core/src/parquet/parquet_support.rs
index 3d09a046a..ee830b795 100644
--- a/native/core/src/parquet/parquet_support.rs
+++ b/native/core/src/parquet/parquet_support.rs
@@ -64,6 +64,8 @@ pub struct SparkParquetOptions {
pub use_decimal_128: bool,
/// Whether to read dates/timestamps that were written in the legacy
hybrid Julian + Gregorian calendar as it is. If false, throw exceptions
instead. If the spark type is TimestampNTZ, this should be true.
pub use_legacy_date_timestamp_or_ntz: bool,
+ // Whether schema field names are case sensitive
+ pub case_sensitive: bool,
}
impl SparkParquetOptions {
@@ -76,6 +78,7 @@ impl SparkParquetOptions {
is_adapting_schema: false,
use_decimal_128: false,
use_legacy_date_timestamp_or_ntz: false,
+ case_sensitive: false,
}
}
@@ -88,6 +91,7 @@ impl SparkParquetOptions {
is_adapting_schema: false,
use_decimal_128: false,
use_legacy_date_timestamp_or_ntz: false,
+ case_sensitive: false,
}
}
}
diff --git a/native/core/src/parquet/schema_adapter.rs
b/native/core/src/parquet/schema_adapter.rs
index e9d1ff640..a387ec6f5 100644
--- a/native/core/src/parquet/schema_adapter.rs
+++ b/native/core/src/parquet/schema_adapter.rs
@@ -84,7 +84,20 @@ impl SchemaAdapter for SparkSchemaAdapter {
/// Panics if index is not in range for the table schema
fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize> {
let field = self.required_schema.field(index);
- Some(file_schema.fields.find(field.name())?.0)
+ Some(
+ file_schema
+ .fields
+ .iter()
+ .enumerate()
+ .find(|(_, b)| {
+ if self.parquet_options.case_sensitive {
+ b.name() == field.name()
+ } else {
+ b.name().to_lowercase() == field.name().to_lowercase()
+ }
+ })?
+ .0,
+ )
}
/// Creates a `SchemaMapping` for casting or mapping the columns from the
@@ -104,8 +117,18 @@ impl SchemaAdapter for SparkSchemaAdapter {
let mut field_mappings = vec![None;
self.required_schema.fields().len()];
for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
- if let Some((table_idx, _table_field)) =
- self.required_schema.fields().find(file_field.name())
+ if let Some((table_idx, _table_field)) = self
+ .required_schema
+ .fields()
+ .iter()
+ .enumerate()
+ .find(|(_, b)| {
+ if self.parquet_options.case_sensitive {
+ b.name() == file_field.name()
+ } else {
+ b.name().to_lowercase() ==
file_field.name().to_lowercase()
+ }
+ })
{
field_mappings[table_idx] = Some(projection.len());
projection.push(file_idx);
@@ -234,16 +257,18 @@ impl SchemaMapper for SchemaMapping {
.zip(batch_cols.iter())
.flat_map(|(field, batch_col)| {
self.table_schema
- // try to get the same field from the table schema that we
have stored in self
- .field_with_name(field.name())
- // and if we don't have it, that's fine, ignore it. This
may occur when we've
- // created an external table whose fields are a subset of
the fields in this
- // file, then tried to read data from the file into this
table. If that is the
- // case here, it's fine to ignore because we don't care
about this field
- // anyways
- .ok()
+ .fields()
+ .iter()
+ .enumerate()
+ .find(|(_, b)| {
+ if self.parquet_options.case_sensitive {
+ b.name() == field.name()
+ } else {
+ b.name().to_lowercase() ==
field.name().to_lowercase()
+ }
+ })
// but if we do have it,
- .map(|table_field| {
+ .map(|(_, table_field)| {
// try to cast it into the correct output type. we
don't want to ignore this
// error, though, so it's propagated.
spark_parquet_convert(
@@ -253,7 +278,7 @@ impl SchemaMapper for SchemaMapping {
)?
.into_array(batch_col.len())
// and if that works, return the field and column.
- .map(|new_col| (new_col, table_field.clone()))
+ .map(|new_col| (new_col, table_field.as_ref().clone()))
})
})
.collect::<Result<Vec<_>, _>>()?
diff --git
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 6e9b731d4..a6526e5fe 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -1460,6 +1460,25 @@ class ParquetReadV1Suite extends ParquetReadSuite with
AdaptiveSparkPlanHelper {
v1 = Some("parquet"))
}
}
+
+ test("test V1 parquet native scan -- case insensitive") {
+ withTempPath { path =>
+ spark.range(10).toDF("a").write.parquet(path.toString)
+ Seq(CometConf.SCAN_NATIVE_DATAFUSION,
CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach(
+ scanMode => {
+ withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode) {
+ withTable("test") {
+ sql("create table test (A long) using parquet options (path '" +
path + "')")
+ val df = sql("select A from test")
+ checkSparkAnswer(df)
+ // TODO: pushed down filters do not used schema adapter in
datafusion, will cause empty result
+ // val df = sql("select * from test where A > 5")
+ // checkSparkAnswer(df)
+ }
+ }
+ })
+ }
+ }
}
class ParquetReadV2Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]