This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new ae7ea991 chore: Fix some regressions with Spark 3.5.1 (#674)
ae7ea991 is described below
commit ae7ea9912de2f7dc612a91b57308ece6bec4d74f
Author: Andy Grove <[email protected]>
AuthorDate: Tue Jul 16 17:04:49 2024 -0600
chore: Fix some regressions with Spark 3.5.1 (#674)
---
dev/diffs/3.5.1.diff | 127 +--------------------
.../spark/sql/comet/shims/ShimCometScanExec.scala | 13 +--
2 files changed, 6 insertions(+), 134 deletions(-)
diff --git a/dev/diffs/3.5.1.diff b/dev/diffs/3.5.1.diff
index cf675441..6892e868 100644
--- a/dev/diffs/3.5.1.diff
+++ b/dev/diffs/3.5.1.diff
@@ -1532,107 +1532,6 @@ index 68bae34790a..ea906fd1adc 100644
}
assert(shuffles2.size == 4)
val smj2 = findTopLevelSortMergeJoin(adaptive2)
-diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
-index 15055a276fa..6e60b94dc3d 100644
----
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
-+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
-@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat
-
- import org.apache.spark.TestUtils
- import org.apache.spark.paths.SparkPath
--import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest,
Row}
-+import org.apache.spark.sql.{AnalysisException, Column, DataFrame,
IgnoreComet, QueryTest, Row}
- import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
- import org.apache.spark.sql.catalyst.trees.TreeNodeTag
- import org.apache.spark.sql.execution.FileSourceScanExec
-@@ -116,7 +116,9 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
- testName: String, fileSchema: StructType)
- (f: (DataFrame, Map[String, Any], Map[String, Any]) => Unit): Unit = {
- Seq("json", "parquet").foreach { testFileFormat =>
-- test(s"metadata struct ($testFileFormat): " + testName) {
-+ test(s"metadata struct ($testFileFormat): " + testName,
-+ // https://github.com/apache/datafusion-comet/issues/617
-+ IgnoreComet("TODO: fix Comet for this test")) {
- withTempDir { dir =>
- import scala.collection.JavaConverters._
-
-@@ -767,7 +769,9 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
-
- Seq(true, false).foreach { useVectorizedReader =>
- val label = if (useVectorizedReader) "reading batches" else "reading rows"
-- test(s"SPARK-39806: metadata for a partitioned table ($label)") {
-+ test(s"SPARK-39806: metadata for a partitioned table ($label)",
-+ // https://github.com/apache/datafusion-comet/issues/617
-+ IgnoreComet("TODO: fix Comet for this test")) {
- withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key ->
useVectorizedReader.toString) {
- withTempPath { dir =>
- // Store dynamically partitioned data.
-@@ -789,7 +793,9 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
- }
-
- Seq("parquet", "orc").foreach { format =>
-- test(s"SPARK-40918: Output cols around WSCG.isTooManyFields limit in
$format") {
-+ test(s"SPARK-40918: Output cols around WSCG.isTooManyFields limit in
$format",
-+ // https://github.com/apache/datafusion-comet/issues/617
-+ IgnoreComet("TODO: fix Comet for this test")) {
- // The issue was that ParquetFileFormat would not count the _metadata
columns towards
- // the WholeStageCodegenExec.isTooManyFields limit, while
FileSourceScanExec would,
- // resulting in Parquet reader returning columnar output, while scan
expected row.
-@@ -862,7 +868,9 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
- }
- }
-
-- test("SPARK-41896: Filter on constant and generated metadata attributes at
the same time") {
-+ test("SPARK-41896: Filter on constant and generated metadata attributes at
the same time",
-+ // https://github.com/apache/datafusion-comet/issues/617
-+ IgnoreComet("TODO: fix Comet for this test")) {
- withTempPath { dir =>
- val idColumnName = "id"
- val partitionColumnName = "partition"
-@@ -897,7 +905,9 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
- }
- }
-
-- test("SPARK-41896: Filter by a function that takes the metadata struct as
argument") {
-+ test("SPARK-41896: Filter by a function that takes the metadata struct as
argument",
-+ // https://github.com/apache/datafusion-comet/issues/617
-+ IgnoreComet("TODO: fix Comet for this test")) {
- withTempPath { dir =>
- val idColumnName = "id"
- val numFiles = 4
-@@ -984,7 +994,9 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
-
-
- Seq("parquet", "json", "csv", "text", "orc").foreach { format =>
-- test(s"metadata file path is url encoded for format: $format") {
-+ test(s"metadata file path is url encoded for format: $format",
-+ // https://github.com/apache/datafusion-comet/issues/617
-+ IgnoreComet("TODO: fix Comet for this test")) {
- withTempPath { f =>
- val dirWithSpace = s"$f/with space"
- spark.range(10)
-@@ -1002,7 +1014,9 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
- }
- }
-
-- test(s"metadata file name is url encoded for format: $format") {
-+ test(s"metadata file name is url encoded for format: $format",
-+ // https://github.com/apache/datafusion-comet/issues/617
-+ IgnoreComet("TODO: fix Comet for this test")) {
- val suffix = if (format == "text") ".txt" else s".$format"
- withTempPath { f =>
- val dirWithSpace = s"$f/with space"
-@@ -1056,7 +1070,9 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
- }
- }
-
-- test("SPARK-43450: Filter on full _metadata column struct") {
-+ test("SPARK-43450: Filter on full _metadata column struct",
-+ // https://github.com/apache/datafusion-comet/issues/617
-+ IgnoreComet("TODO: fix Comet for this test")) {
- withTempPath { dir =>
- val numRows = 10
- spark.range(end = numRows)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
index 05872d41131..a2c328b9742 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
@@ -1756,30 +1655,6 @@ index 07e2849ce6f..3e73645b638 100644
val extraOptions = Map[String, String](
ParquetOutputFormat.WRITER_VERSION ->
ParquetProperties.WriterVersion.PARQUET_2_0.toString
)
-diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
-index c10e1799702..f18ca092dba 100644
----
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
-+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
-@@ -16,7 +16,7 @@
- */
- package org.apache.spark.sql.execution.datasources.parquet
-
--import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
-+import org.apache.spark.sql.{AnalysisException, DataFrame, IgnoreComet,
QueryTest}
- import org.apache.spark.sql.execution.datasources.FileFormat
- import org.apache.spark.sql.functions.{col, lit}
- import org.apache.spark.sql.internal.SQLConf
-@@ -219,7 +219,9 @@ class ParquetFileMetadataStructRowIndexSuite extends
QueryTest with SharedSparkS
- }
- }
-
-- test(s"read user created ${FileFormat.METADATA_NAME}.${ROW_INDEX} column") {
-+ // https://github.com/apache/datafusion-comet/issues/617
-+ test(s"read user created ${FileFormat.METADATA_NAME}.${ROW_INDEX} column",
-+ IgnoreComet("TODO: fix Comet for this test")) {
- withReadDataFrame("parquet", partitionCol = "pb") { df =>
- withTempPath { dir =>
- // The `df` has 10 input files with 10 rows each. Therefore the
`_metadata.row_index` values
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 8e88049f51e..98d1eb07493 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -2589,7 +2464,7 @@ index abe606ad9c1..2d930b64cca 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
-index dd55fcfe42c..293e9dc2986 100644
+index dd55fcfe42c..e7fcd0a9e6a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
diff --git
a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
index 3c6f764c..3c3e8c47 100644
---
a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
+++
b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
@@ -19,14 +19,13 @@
package org.apache.spark.sql.comet.shims
-import org.apache.comet.shims.ShimFileFormat
import org.apache.hadoop.fs.Path
-import org.apache.spark.SparkException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
@@ -49,16 +48,14 @@ trait ShimCometScanExec {
filePartitions,
readSchema,
fileConstantMetadataColumns,
- Map.empty,
+ fsRelation.fileFormat.fileConstantMetadataExtractors,
options)
protected def invalidBucketFile(path: String, sparkVersion: String):
Throwable =
- new SparkException("INVALID_BUCKET_FILE", Map("path" -> path), null)
+ QueryExecutionErrors.invalidBucketFile(path)
- protected def isNeededForSchema(sparkSchema: StructType): Boolean = {
- // TODO: remove after PARQUET-2161 becomes available in Parquet (tracked
in SPARK-39634)
- ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema) >= 0
- }
+ // see SPARK-39634
+ protected def isNeededForSchema(sparkSchema: StructType): Boolean = false
protected def getPartitionedFile(f: FileStatusWithMetadata, p:
PartitionDirectory): PartitionedFile =
PartitionedFileUtil.getPartitionedFile(f, p.values)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]