This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new de9f425f8 fix: [native_scans] Support `CASE_SENSITIVE` when reading 
Parquet (#1782)
de9f425f8 is described below

commit de9f425f8f3c7af1d837732bd315d27d7ec1edf3
Author: Andy Grove <agr...@apache.org>
AuthorDate: Tue May 27 08:07:51 2025 -0600

    fix: [native_scans] Support `CASE_SENSITIVE` when reading Parquet (#1782)
---
 common/src/main/java/org/apache/comet/parquet/Native.java |  3 ++-
 .../java/org/apache/comet/parquet/NativeBatchReader.java  |  8 +++++++-
 native/core/src/execution/planner.rs                      |  1 +
 native/core/src/parquet/mod.rs                            |  4 +++-
 native/core/src/parquet/parquet_exec.rs                   | 11 ++++++++---
 native/core/src/parquet/parquet_support.rs                | 15 ++++++++++++---
 native/proto/src/proto/operator.proto                     |  1 +
 .../scala/org/apache/comet/serde/QueryPlanSerde.scala     |  1 +
 .../org/apache/comet/exec/CometNativeReaderSuite.scala    | 14 ++++++++++++++
 9 files changed, 49 insertions(+), 9 deletions(-)

diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java 
b/common/src/main/java/org/apache/comet/parquet/Native.java
index d4056c9ed..9070487ff 100644
--- a/common/src/main/java/org/apache/comet/parquet/Native.java
+++ b/common/src/main/java/org/apache/comet/parquet/Native.java
@@ -257,7 +257,8 @@ public final class Native extends NativeBase {
       byte[] requiredSchema,
       byte[] dataSchema,
       String sessionTimezone,
-      int batchSize);
+      int batchSize,
+      boolean caseSensitive);
 
   // arrow native version of read batch
   /**
diff --git 
a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java 
b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
index 8865e6f33..7a6a1d714 100644
--- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
@@ -65,6 +65,7 @@ import 
org.apache.spark.sql.execution.datasources.PartitionedFile;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetColumn;
 import 
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
 import org.apache.spark.sql.execution.metric.SQLMetric;
+import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
@@ -405,6 +406,10 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
           conf.getInt(
               CometConf.COMET_BATCH_SIZE().key(),
               (Integer) CometConf.COMET_BATCH_SIZE().defaultValue().get());
+      boolean caseSensitive =
+          conf.getBoolean(
+              SQLConf.CASE_SENSITIVE().key(),
+              (boolean) SQLConf.CASE_SENSITIVE().defaultValue().get());
       this.handle =
           Native.initRecordBatchReader(
               filePath,
@@ -415,7 +420,8 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
               serializedRequestedArrowSchema,
               serializedDataArrowSchema,
               timeZoneId,
-              batchSize);
+              batchSize,
+              caseSensitive);
     }
     isInitialized = true;
   }
diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 202c187d3..60587a6fb 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -1190,6 +1190,7 @@ impl PhysicalPlanner {
                     Some(data_filters?),
                     default_values,
                     scan.session_timezone.as_str(),
+                    scan.case_sensitive,
                 )?;
                 Ok((
                     vec![],
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index 54fe23f59..b24591e9d 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -60,7 +60,7 @@ use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::{SessionConfig, SessionContext};
 use futures::{poll, StreamExt};
 use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray, 
JString, ReleaseMode};
-use jni::sys::jstring;
+use jni::sys::{jstring, JNI_FALSE};
 use object_store::path::Path;
 use read::ColumnReader;
 use util::jni::{convert_column_descriptor, convert_encoding, 
deserialize_schema};
@@ -657,6 +657,7 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_parquet_Native_initRecordBat
     data_schema: jbyteArray,
     session_timezone: jstring,
     batch_size: jint,
+    case_sensitive: jboolean,
 ) -> jlong {
     try_unwrap_or_throw(&e, |mut env| unsafe {
         let session_config = SessionConfig::new().with_batch_size(batch_size 
as usize);
@@ -717,6 +718,7 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_parquet_Native_initRecordBat
             data_filters,
             None,
             session_timezone.as_str(),
+            case_sensitive != JNI_FALSE,
         )?;
 
         let partition_index: usize = 0;
diff --git a/native/core/src/parquet/parquet_exec.rs 
b/native/core/src/parquet/parquet_exec.rs
index f655f9157..13961ebf9 100644
--- a/native/core/src/parquet/parquet_exec.rs
+++ b/native/core/src/parquet/parquet_exec.rs
@@ -65,8 +65,10 @@ pub(crate) fn init_datasource_exec(
     data_filters: Option<Vec<Arc<dyn PhysicalExpr>>>,
     default_values: Option<HashMap<usize, ScalarValue>>,
     session_timezone: &str,
+    case_sensitive: bool,
 ) -> Result<Arc<DataSourceExec>, ExecutionError> {
-    let (table_parquet_options, spark_parquet_options) = 
get_options(session_timezone);
+    let (table_parquet_options, spark_parquet_options) =
+        get_options(session_timezone, case_sensitive);
     let mut parquet_source =
         
ParquetSource::new(table_parquet_options).with_schema_adapter_factory(Arc::new(
             SparkSchemaAdapterFactory::new(spark_parquet_options, 
default_values),
@@ -118,7 +120,10 @@ pub(crate) fn init_datasource_exec(
     Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
 }
 
-fn get_options(session_timezone: &str) -> (TableParquetOptions, 
SparkParquetOptions) {
+fn get_options(
+    session_timezone: &str,
+    case_sensitive: bool,
+) -> (TableParquetOptions, SparkParquetOptions) {
     let mut table_parquet_options = TableParquetOptions::new();
     table_parquet_options.global.pushdown_filters = true;
     table_parquet_options.global.reorder_filters = true;
@@ -126,7 +131,7 @@ fn get_options(session_timezone: &str) -> 
(TableParquetOptions, SparkParquetOpti
     let mut spark_parquet_options =
         SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false);
     spark_parquet_options.allow_cast_unsigned_ints = true;
-    spark_parquet_options.case_sensitive = false;
+    spark_parquet_options.case_sensitive = case_sensitive;
     (table_parquet_options, spark_parquet_options)
 }
 
diff --git a/native/core/src/parquet/parquet_support.rs 
b/native/core/src/parquet/parquet_support.rs
index 0a41efcec..4067afaea 100644
--- a/native/core/src/parquet/parquet_support.rs
+++ b/native/core/src/parquet/parquet_support.rs
@@ -211,7 +211,11 @@ fn cast_struct_to_struct(
             // TODO some of this logic may be specific to converting Parquet 
to Spark
             let mut field_name_to_index_map = HashMap::new();
             for (i, field) in from_fields.iter().enumerate() {
-                field_name_to_index_map.insert(field.name(), i);
+                if parquet_options.case_sensitive {
+                    field_name_to_index_map.insert(field.name().clone(), i);
+                } else {
+                    
field_name_to_index_map.insert(field.name().to_lowercase(), i);
+                }
             }
             assert_eq!(field_name_to_index_map.len(), from_fields.len());
             let mut cast_fields: Vec<ArrayRef> = 
Vec::with_capacity(to_fields.len());
@@ -219,8 +223,13 @@ fn cast_struct_to_struct(
                 // Fields in the to_type schema may not exist in the from_type 
schema
                 // i.e. the required schema may have fields that the file does 
not
                 // have
-                if field_name_to_index_map.contains_key(to_fields[i].name()) {
-                    let from_index = 
field_name_to_index_map[to_fields[i].name()];
+                let key = if parquet_options.case_sensitive {
+                    to_fields[i].name().clone()
+                } else {
+                    to_fields[i].name().to_lowercase()
+                };
+                if field_name_to_index_map.contains_key(&key) {
+                    let from_index = field_name_to_index_map[&key];
                     let cast_field = cast_array(
                         Arc::clone(array.column(from_index)),
                         to_fields[i].data_type(),
diff --git a/native/proto/src/proto/operator.proto 
b/native/proto/src/proto/operator.proto
index e16cdb212..9a41f977a 100644
--- a/native/proto/src/proto/operator.proto
+++ b/native/proto/src/proto/operator.proto
@@ -93,6 +93,7 @@ message NativeScan {
   string session_timezone = 9;
   repeated spark.spark_expression.Expr default_values = 10;
   repeated int64 default_values_indexes = 11;
+  bool case_sensitive = 12;
 }
 
 message Projection {
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 1da3e984b..a98234585 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -2370,6 +2370,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
           
nativeScanBuilder.addAllRequiredSchema(requiredSchema.toIterable.asJava)
           
nativeScanBuilder.addAllPartitionSchema(partitionSchema.toIterable.asJava)
           
nativeScanBuilder.setSessionTimezone(conf.getConfString("spark.sql.session.timeZone"))
+          
nativeScanBuilder.setCaseSensitive(conf.getConf[Boolean](SQLConf.CASE_SENSITIVE))
 
           Some(result.setNativeScan(nativeScanBuilder).build())
 
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
index 4115ba432..f8f13f9ad 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala
@@ -44,6 +44,20 @@ class CometNativeReaderSuite extends CometTestBase with 
AdaptiveSparkPlanHelper
       })
   }
 
+  test("native reader case sensitivity") {
+    withTempPath { path =>
+      spark.range(10).toDF("a").write.parquet(path.toString)
+      Seq(true, false).foreach { caseSensitive =>
+        withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
+          val tbl = 
s"case_sensitivity_${caseSensitive}_${System.currentTimeMillis()}"
+          sql(s"create table $tbl (A long) using parquet options (path '" + 
path + "')")
+          val df = sql(s"select A from $tbl")
+          checkSparkAnswer(df)
+        }
+      }
+    }
+  }
+
   test("native reader - read simple STRUCT fields") {
     testSingleLineQuery(
       """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to