This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new ae7ea991 chore: Fix some regressions with Spark 3.5.1 (#674)
ae7ea991 is described below

commit ae7ea9912de2f7dc612a91b57308ece6bec4d74f
Author: Andy Grove <[email protected]>
AuthorDate: Tue Jul 16 17:04:49 2024 -0600

    chore: Fix some regressions with Spark 3.5.1 (#674)
---
 dev/diffs/3.5.1.diff                               | 127 +--------------------
 .../spark/sql/comet/shims/ShimCometScanExec.scala  |  13 +--
 2 files changed, 6 insertions(+), 134 deletions(-)

diff --git a/dev/diffs/3.5.1.diff b/dev/diffs/3.5.1.diff
index cf675441..6892e868 100644
--- a/dev/diffs/3.5.1.diff
+++ b/dev/diffs/3.5.1.diff
@@ -1532,107 +1532,6 @@ index 68bae34790a..ea906fd1adc 100644
          }
          assert(shuffles2.size == 4)
          val smj2 = findTopLevelSortMergeJoin(adaptive2)
-diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
-index 15055a276fa..6e60b94dc3d 100644
---- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
-+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
-@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat
- 
- import org.apache.spark.TestUtils
- import org.apache.spark.paths.SparkPath
--import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, 
Row}
-+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, 
IgnoreComet, QueryTest, Row}
- import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
- import org.apache.spark.sql.catalyst.trees.TreeNodeTag
- import org.apache.spark.sql.execution.FileSourceScanExec
-@@ -116,7 +116,9 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
-       testName: String, fileSchema: StructType)
-     (f: (DataFrame, Map[String, Any], Map[String, Any]) => Unit): Unit = {
-     Seq("json", "parquet").foreach { testFileFormat =>
--      test(s"metadata struct ($testFileFormat): " + testName) {
-+      test(s"metadata struct ($testFileFormat): " + testName,
-+          // https://github.com/apache/datafusion-comet/issues/617
-+          IgnoreComet("TODO: fix Comet for this test")) {
-         withTempDir { dir =>
-           import scala.collection.JavaConverters._
- 
-@@ -767,7 +769,9 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
- 
-   Seq(true, false).foreach { useVectorizedReader =>
-     val label = if (useVectorizedReader) "reading batches" else "reading rows"
--    test(s"SPARK-39806: metadata for a partitioned table ($label)") {
-+    test(s"SPARK-39806: metadata for a partitioned table ($label)",
-+        // https://github.com/apache/datafusion-comet/issues/617
-+        IgnoreComet("TODO: fix Comet for this test")) {
-       withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
useVectorizedReader.toString) {
-         withTempPath { dir =>
-           // Store dynamically partitioned data.
-@@ -789,7 +793,9 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
-   }
- 
-   Seq("parquet", "orc").foreach { format =>
--    test(s"SPARK-40918: Output cols around WSCG.isTooManyFields limit in 
$format") {
-+    test(s"SPARK-40918: Output cols around WSCG.isTooManyFields limit in 
$format",
-+        // https://github.com/apache/datafusion-comet/issues/617
-+        IgnoreComet("TODO: fix Comet for this test")) {
-       // The issue was that ParquetFileFormat would not count the _metadata 
columns towards
-       // the WholeStageCodegenExec.isTooManyFields limit, while 
FileSourceScanExec would,
-       // resulting in Parquet reader returning columnar output, while scan 
expected row.
-@@ -862,7 +868,9 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
-     }
-   }
- 
--  test("SPARK-41896: Filter on constant and generated metadata attributes at 
the same time") {
-+  test("SPARK-41896: Filter on constant and generated metadata attributes at 
the same time",
-+      // https://github.com/apache/datafusion-comet/issues/617
-+      IgnoreComet("TODO: fix Comet for this test")) {
-     withTempPath { dir =>
-       val idColumnName = "id"
-       val partitionColumnName = "partition"
-@@ -897,7 +905,9 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
-     }
-   }
- 
--  test("SPARK-41896: Filter by a function that takes the metadata struct as 
argument") {
-+  test("SPARK-41896: Filter by a function that takes the metadata struct as 
argument",
-+      // https://github.com/apache/datafusion-comet/issues/617
-+      IgnoreComet("TODO: fix Comet for this test")) {
-     withTempPath { dir =>
-       val idColumnName = "id"
-       val numFiles = 4
-@@ -984,7 +994,9 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
- 
- 
-   Seq("parquet", "json", "csv", "text", "orc").foreach { format =>
--    test(s"metadata file path is url encoded for format: $format") {
-+    test(s"metadata file path is url encoded for format: $format",
-+        // https://github.com/apache/datafusion-comet/issues/617
-+        IgnoreComet("TODO: fix Comet for this test")) {
-       withTempPath { f =>
-         val dirWithSpace = s"$f/with space"
-         spark.range(10)
-@@ -1002,7 +1014,9 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
-       }
-     }
- 
--    test(s"metadata file name is url encoded for format: $format") {
-+    test(s"metadata file name is url encoded for format: $format",
-+        // https://github.com/apache/datafusion-comet/issues/617
-+        IgnoreComet("TODO: fix Comet for this test")) {
-       val suffix = if (format == "text") ".txt" else s".$format"
-       withTempPath { f =>
-         val dirWithSpace = s"$f/with space"
-@@ -1056,7 +1070,9 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
-     }
-   }
- 
--  test("SPARK-43450: Filter on full _metadata column struct") {
-+  test("SPARK-43450: Filter on full _metadata column struct",
-+      // https://github.com/apache/datafusion-comet/issues/617
-+      IgnoreComet("TODO: fix Comet for this test")) {
-     withTempPath { dir =>
-       val numRows = 10
-       spark.range(end = numRows)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
 index 05872d41131..a2c328b9742 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
@@ -1756,30 +1655,6 @@ index 07e2849ce6f..3e73645b638 100644
      val extraOptions = Map[String, String](
        ParquetOutputFormat.WRITER_VERSION -> 
ParquetProperties.WriterVersion.PARQUET_2_0.toString
      )
-diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
-index c10e1799702..f18ca092dba 100644
---- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
-+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
-@@ -16,7 +16,7 @@
-  */
- package org.apache.spark.sql.execution.datasources.parquet
- 
--import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
-+import org.apache.spark.sql.{AnalysisException, DataFrame, IgnoreComet, 
QueryTest}
- import org.apache.spark.sql.execution.datasources.FileFormat
- import org.apache.spark.sql.functions.{col, lit}
- import org.apache.spark.sql.internal.SQLConf
-@@ -219,7 +219,9 @@ class ParquetFileMetadataStructRowIndexSuite extends 
QueryTest with SharedSparkS
-     }
-   }
- 
--  test(s"read user created ${FileFormat.METADATA_NAME}.${ROW_INDEX} column") {
-+  // https://github.com/apache/datafusion-comet/issues/617
-+  test(s"read user created ${FileFormat.METADATA_NAME}.${ROW_INDEX} column",
-+      IgnoreComet("TODO: fix Comet for this test")) {
-     withReadDataFrame("parquet", partitionCol = "pb") { df =>
-       withTempPath { dir =>
-         // The `df` has 10 input files with 10 rows each. Therefore the 
`_metadata.row_index` values
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 index 8e88049f51e..98d1eb07493 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -2589,7 +2464,7 @@ index abe606ad9c1..2d930b64cca 100644
      val tblTargetName = "tbl_target"
      val tblSourceQualified = s"default.$tblSourceName"
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
-index dd55fcfe42c..293e9dc2986 100644
+index dd55fcfe42c..e7fcd0a9e6a 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
 @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
diff --git 
a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
 
b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
index 3c6f764c..3c3e8c47 100644
--- 
a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
+++ 
b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
@@ -19,14 +19,13 @@
 
 package org.apache.spark.sql.comet.shims
 
-import org.apache.comet.shims.ShimFileFormat
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.SparkException
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
@@ -49,16 +48,14 @@ trait ShimCometScanExec {
     filePartitions,
     readSchema,
     fileConstantMetadataColumns,
-    Map.empty,
+    fsRelation.fileFormat.fileConstantMetadataExtractors,
     options)
 
   protected def invalidBucketFile(path: String, sparkVersion: String): 
Throwable =
-    new SparkException("INVALID_BUCKET_FILE", Map("path" -> path), null)
+    QueryExecutionErrors.invalidBucketFile(path)
 
-  protected def isNeededForSchema(sparkSchema: StructType): Boolean = {
-    // TODO: remove after PARQUET-2161 becomes available in Parquet (tracked 
in SPARK-39634)
-    ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema) >= 0
-  }
+  // see SPARK-39634
+  protected def isNeededForSchema(sparkSchema: StructType): Boolean = false
 
   protected def getPartitionedFile(f: FileStatusWithMetadata, p: 
PartitionDirectory): PartitionedFile =
     PartitionedFileUtil.getPartitionedFile(f, p.values)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to