This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch comet-parquet-exec
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/comet-parquet-exec by this 
push:
     new 8563edfc fix: [comet-parquet-exec] fix regressions original comet 
native scal implementation (#1170)
8563edfc is described below

commit 8563edfc66870983b8a008a18c3f0f0bfa276974
Author: Parth Chandra <[email protected]>
AuthorDate: Fri Dec 13 08:58:53 2024 -0800

    fix: [comet-parquet-exec] fix regressions original comet native scal 
implementation (#1170)
    
    * fix: CometScanExec was created for unsupported cases if only 
COMET_NATIVE_SCAN is enabled
    
    * fix: Another try to fix '  test("Comet native metrics: BroadcastHashJoin")
    
    * fix: some tests are valid only when full native scan is enabled
    
    * Merge pull request #1 from andygrove/fix-tests-spark-cast-options
---
 .../main/java/org/apache/comet/parquet/BatchReader.java  |  2 +-
 .../java/org/apache/comet/parquet/NativeBatchReader.java |  8 ++++----
 native/spark-expr/src/cast.rs                            | 16 ++++++++--------
 .../main/scala/org/apache/comet/DataTypeSupport.scala    |  5 ++++-
 .../scala/org/apache/spark/sql/comet/CometScanExec.scala | 11 +++++++++--
 .../scala/org/apache/comet/CometExpressionSuite.scala    |  2 ++
 6 files changed, 28 insertions(+), 16 deletions(-)

diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java 
b/common/src/main/java/org/apache/comet/parquet/BatchReader.java
index 11c6d14d..675dae9e 100644
--- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java
@@ -272,7 +272,7 @@ public class BatchReader extends RecordReader<Void, 
ColumnarBatch> implements Cl
       requestedSchema =
           CometParquetReadSupport.clipParquetSchema(
               requestedSchema, sparkSchema, isCaseSensitive, useFieldId, 
ignoreMissingIds);
-      if (requestedSchema.getColumns().size() != sparkSchema.size()) {
+      if (requestedSchema.getFieldCount() != sparkSchema.size()) {
         throw new IllegalArgumentException(
             String.format(
                 "Spark schema has %d columns while " + "Parquet schema has %d 
columns",
diff --git 
a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java 
b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
index 17fab47e..3ac55ba4 100644
--- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
@@ -245,7 +245,7 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
         requestedSchema =
             CometParquetReadSupport.clipParquetSchema(
                 requestedSchema, sparkSchema, isCaseSensitive, useFieldId, 
ignoreMissingIds);
-        if (requestedSchema.getColumns().size() != sparkSchema.size()) {
+        if (requestedSchema.getFieldCount() != sparkSchema.size()) {
           throw new IllegalArgumentException(
               String.format(
                   "Spark schema has %d columns while " + "Parquet schema has 
%d columns",
@@ -267,9 +267,9 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
     //    ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema);
     for (int i = 0; i < requestedSchema.getFieldCount(); i++) {
       Type t = requestedSchema.getFields().get(i);
-      Preconditions.checkState(
-          t.isPrimitive() && !t.isRepetition(Type.Repetition.REPEATED),
-          "Complex type is not supported");
+      //      Preconditions.checkState(
+      //          t.isPrimitive() && !t.isRepetition(Type.Repetition.REPEATED),
+      //          "Complex type is not supported");
       String[] colPath = paths.get(i);
       if 
(nonPartitionFields[i].name().equals(ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME()))
 {
         // Values of ROW_INDEX_TEMPORARY_COLUMN_NAME column are always 
populated with
diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs
index 17ab73b7..a6d13971 100644
--- a/native/spark-expr/src/cast.rs
+++ b/native/spark-expr/src/cast.rs
@@ -571,7 +571,7 @@ impl SparkCastOptions {
             eval_mode,
             timezone: timezone.to_string(),
             allow_incompat,
-            is_adapting_schema: false,
+            is_adapting_schema: false
         }
     }
 
@@ -583,6 +583,7 @@ impl SparkCastOptions {
             is_adapting_schema: false,
         }
     }
+
 }
 
 /// Spark-compatible cast implementation. Defers to DataFusion's cast where 
that is known
@@ -2087,7 +2088,7 @@ mod tests {
 
         let timezone = "UTC".to_string();
         // test casting string dictionary array to timestamp array
-        let cast_options = SparkCastOptions::new(EvalMode::Legacy, 
timezone.clone(), false);
+        let cast_options = SparkCastOptions::new(EvalMode::Legacy, &timezone, 
false);
         let result = cast_array(
             dict_array,
             &DataType::Timestamp(TimeUnit::Microsecond, 
Some(timezone.clone().into())),
@@ -2296,7 +2297,7 @@ mod tests {
     fn test_cast_unsupported_timestamp_to_date() {
         // Since datafusion uses chrono::Datetime internally not all dates 
representable by TimestampMicrosecondType are supported
         let timestamps: PrimitiveArray<TimestampMicrosecondType> = 
vec![i64::MAX].into();
-        let cast_options = SparkCastOptions::new(EvalMode::Legacy, 
"UTC".to_string(), false);
+        let cast_options = SparkCastOptions::new(EvalMode::Legacy, "UTC", 
false);
         let result = cast_array(
             Arc::new(timestamps.with_timezone("Europe/Copenhagen")),
             &DataType::Date32,
@@ -2309,7 +2310,7 @@ mod tests {
     fn test_cast_invalid_timezone() {
         let timestamps: PrimitiveArray<TimestampMicrosecondType> = 
vec![i64::MAX].into();
         let cast_options =
-            SparkCastOptions::new(EvalMode::Legacy, "Not a valid 
timezone".to_string(), false);
+            SparkCastOptions::new(EvalMode::Legacy, "Not a valid timezone", 
false);
         let result = cast_array(
             Arc::new(timestamps.with_timezone("Europe/Copenhagen")),
             &DataType::Date32,
@@ -2335,7 +2336,7 @@ mod tests {
         let string_array = cast_array(
             c,
             &DataType::Utf8,
-            &SparkCastOptions::new(EvalMode::Legacy, "UTC".to_owned(), false),
+            &SparkCastOptions::new(EvalMode::Legacy, "UTC", false),
         )
         .unwrap();
         let string_array = string_array.as_string::<i32>();
@@ -2400,10 +2401,9 @@ mod tests {
         let cast_array = spark_cast(
             ColumnarValue::Array(c),
             &DataType::Struct(fields),
-            EvalMode::Legacy,
+            &SparkCastOptions::new(EvalMode::Legacy,
             "UTC",
-            false,
-            false,
+            false)
         )
         .unwrap();
         if let ColumnarValue::Array(cast_array) = cast_array {
diff --git a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala 
b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
index 09c062b8..eb524af9 100644
--- a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
+++ b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
@@ -39,7 +39,10 @@ trait DataTypeSupport {
         BinaryType | StringType | _: DecimalType | DateType | TimestampType =>
       true
     case t: DataType if t.typeName == "timestamp_ntz" => true
-    case _: StructType => true
+    case _: StructType
+        if CometConf.COMET_FULL_NATIVE_SCAN_ENABLED
+          .get() || CometConf.COMET_NATIVE_ARROW_SCAN_ENABLED.get() =>
+      true
     case _ => false
   }
 
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
index 49f7694b..5d28b4b7 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
@@ -131,8 +131,15 @@ case class CometScanExec(
   // exposed for testing
   lazy val bucketedScan: Boolean = wrapped.bucketedScan
 
-  override lazy val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) =
-    (wrapped.outputPartitioning, wrapped.outputOrdering)
+  override lazy val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) = {
+    if (bucketedScan) {
+      (wrapped.outputPartitioning, wrapped.outputOrdering)
+    } else {
+      val files = selectedPartitions.flatMap(partition => partition.files)
+      val numPartitions = files.length
+      (UnknownPartitioning(numPartitions), wrapped.outputOrdering)
+    }
+  }
 
   @transient
   private lazy val pushedDownFilters = getPushedDownFilters(relation, 
dataFilters)
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 85ac6138..e65feb6b 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -2217,6 +2217,7 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
         withSQLConf(
           SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
           CometConf.COMET_ENABLED.key -> "true",
+          CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key -> "true",
           CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
 
           val df = spark.read.parquet(dir.toString())
@@ -2249,6 +2250,7 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
         withSQLConf(
           SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
           CometConf.COMET_ENABLED.key -> "true",
+          CometConf.COMET_FULL_NATIVE_SCAN_ENABLED.key -> "true",
           CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
 
           val df = spark.read.parquet(dir.toString())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to