This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new d584229c build: Drop Spark 3.2 support (#581)
d584229c is described below
commit d584229c8e7d9f04b9df2803958b87714356860b
Author: Huaxin Gao <[email protected]>
AuthorDate: Tue Jun 18 16:03:31 2024 -0700
build: Drop Spark 3.2 support (#581)
* build: Drop Spark 3.2 support
* remove un-used import
* fix BloomFilterMightContain
* revert the changes for TimestampNTZType and PartitionIdPassthrough
* address comments and remove more 3.2 related code
* remove un-used import
* put back newDataSourceRDD
* remove un-used import and put back lazy val partitions
* address comments
* Trigger Build
* remove the missed 3.2 pipeline
* address comments
---
.github/workflows/pr_build.yml | 15 +---
.../apache/comet/parquet/CometParquetUtils.scala | 3 +-
.../comet/parquet/CometParquetReadSupport.scala | 13 ++--
.../CometSparkToParquetSchemaConverter.scala | 5 +-
.../org/apache/comet/shims/ShimBatchReader.scala | 2 +-
.../org/apache/comet/shims/ShimFileFormat.scala | 7 +-
.../comet/shims/ShimResolveDefaultColumns.scala | 2 +-
.../sql/comet/shims/ShimCometParquetUtils.scala | 81 ----------------------
.../sql/comet/shims/ShimCometParquetUtils.scala | 38 ----------
core/src/errors.rs | 2 +-
.../contributor-guide/adding_a_new_expression.md | 9 +--
docs/source/user-guide/installation.md | 2 +-
docs/source/user-guide/overview.md | 2 +-
pom.xml | 14 ----
.../apache/comet/CometSparkSessionExtensions.scala | 12 +---
.../org/apache/comet/parquet/ParquetFilters.scala | 4 +-
.../org/apache/comet/serde/QueryPlanSerde.scala | 18 ++---
.../spark/sql/comet/CometBatchScanExec.scala | 10 +--
.../org/apache/spark/sql/comet/CometScanExec.scala | 6 +-
.../shuffle/CometShuffleExchangeExec.scala | 4 +-
.../comet/plans/AliasAwareOutputExpression.scala | 2 +-
.../PartitioningPreservingUnaryExecNode.scala | 2 +-
.../org/apache/comet/shims/CometExprShim.scala | 37 ----------
.../comet/shims/ShimCometBatchScanExec.scala | 14 +---
.../shims/ShimCometBroadcastHashJoinExec.scala | 4 +-
.../comet/shims/ShimCometShuffleExchangeExec.scala | 2 +-
.../shims/ShimCometSparkSessionExtensions.scala | 13 +---
.../shims/ShimCometTakeOrderedAndProjectExec.scala | 2 +-
.../apache/comet/shims/ShimQueryPlanSerde.scala | 9 +--
.../org/apache/comet/shims/ShimSQLConf.scala | 2 +-
.../shims/ShimCometBroadcastExchangeExec.scala | 2 +-
.../spark/sql/comet/shims/ShimCometScanExec.scala | 47 ++++---------
.../comet/shims/ShimCometBatchScanExec.scala | 7 +-
.../spark/sql/comet/shims/ShimCometScanExec.scala | 12 ----
.../scala/org/apache/comet/CometCastSuite.scala | 20 +-----
.../org/apache/comet/CometExpressionSuite.scala | 26 ++-----
.../apache/comet/exec/CometAggregateSuite.scala | 2 +-
.../apache/comet/parquet/ParquetReadSuite.scala | 15 ++--
.../org/apache/spark/sql/CometTPCHQuerySuite.scala | 39 +----------
.../spark/sql/benchmark/CometReadBenchmark.scala | 4 +-
.../spark/sql/comet/CometPlanStabilitySuite.scala | 2 +-
.../apache/spark/comet/shims/ShimTestUtils.scala | 43 ------------
42 files changed, 87 insertions(+), 468 deletions(-)
diff --git a/.github/workflows/pr_build.yml b/.github/workflows/pr_build.yml
index 2bf02335..981905ec 100644
--- a/.github/workflows/pr_build.yml
+++ b/.github/workflows/pr_build.yml
@@ -109,15 +109,8 @@ jobs:
os: [ubuntu-latest]
java_version: [8, 11, 17]
test-target: [java]
- spark-version: ['3.2', '3.3']
+ spark-version: ['3.3']
scala-version: ['2.12', '2.13']
- exclude:
- - java_version: 17
- spark-version: '3.2'
- - java_version: 11
- spark-version: '3.2'
- - spark-version: '3.2'
- scala-version: '2.13'
fail-fast: false
name: ${{ matrix.os }}/java ${{ matrix.java_version
}}-spark-${{matrix.spark-version}}-scala-${{matrix.scala-version}}/${{
matrix.test-target }}
runs-on: ${{ matrix.os }}
@@ -254,15 +247,11 @@ jobs:
matrix:
java_version: [8, 17]
test-target: [java]
- spark-version: ['3.2', '3.3']
+ spark-version: ['3.3']
scala-version: ['2.12', '2.13']
exclude:
- - java_version: 17
- spark-version: '3.2'
- java_version: 8
spark-version: '3.3'
- - spark-version: '3.2'
- scala-version: '2.13'
fail-fast: false
name: macos-14(Silicon)/java ${{ matrix.java_version
}}-spark-${{matrix.spark-version}}-scala-${{matrix.scala-version}}/${{
matrix.test-target }}
runs-on: macos-14
diff --git
a/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala
b/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala
index d03252d0..a37ec7e6 100644
--- a/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala
+++ b/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala
@@ -20,10 +20,9 @@
package org.apache.comet.parquet
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql.comet.shims.ShimCometParquetUtils
import org.apache.spark.sql.internal.SQLConf
-object CometParquetUtils extends ShimCometParquetUtils {
+object CometParquetUtils {
private val PARQUET_FIELD_ID_WRITE_ENABLED =
"spark.sql.parquet.fieldId.write.enabled"
private val PARQUET_FIELD_ID_READ_ENABLED =
"spark.sql.parquet.fieldId.read.enabled"
private val IGNORE_MISSING_PARQUET_FIELD_ID =
"spark.sql.parquet.fieldId.read.ignoreMissing"
diff --git
a/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometParquetReadSupport.scala
b/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometParquetReadSupport.scala
index 0e8a190c..4523a057 100644
---
a/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometParquetReadSupport.scala
+++
b/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometParquetReadSupport.scala
@@ -27,10 +27,9 @@ import org.apache.parquet.schema._
import
org.apache.parquet.schema.LogicalTypeAnnotation.ListLogicalTypeAnnotation
import org.apache.parquet.schema.Type.Repetition
import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.types._
-import org.apache.comet.parquet.CometParquetUtils
-
/**
* This class is copied & slightly modified from [[ParquetReadSupport]] in
Spark. Changes:
* - This doesn't extend from Parquet's `ReadSupport` class since that is
used for row-based
@@ -53,7 +52,7 @@ object CometParquetReadSupport {
ignoreMissingIds: Boolean): MessageType = {
if (!ignoreMissingIds &&
!containsFieldIds(parquetSchema) &&
- CometParquetUtils.hasFieldIds(catalystSchema)) {
+ ParquetUtils.hasFieldIds(catalystSchema)) {
throw new RuntimeException(
"Spark read schema expects field Ids, " +
"but Parquet file schema doesn't contain any field Ids.\n" +
@@ -334,14 +333,14 @@ object CometParquetReadSupport {
}
def matchIdField(f: StructField): Type = {
- val fieldId = CometParquetUtils.getFieldId(f)
+ val fieldId = ParquetUtils.getFieldId(f)
idToParquetFieldMap
.get(fieldId)
.map { parquetTypes =>
if (parquetTypes.size > 1) {
// Need to fail if there is ambiguity, i.e. more than one field is
matched
val parquetTypesString = parquetTypes.map(_.getName).mkString("[",
", ", "]")
- throw
CometParquetUtils.foundDuplicateFieldInFieldIdLookupModeError(
+ throw
QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
fieldId,
parquetTypesString)
} else {
@@ -355,9 +354,9 @@ object CometParquetReadSupport {
}
}
- val shouldMatchById = useFieldId &&
CometParquetUtils.hasFieldIds(structType)
+ val shouldMatchById = useFieldId && ParquetUtils.hasFieldIds(structType)
structType.map { f =>
- if (shouldMatchById && CometParquetUtils.hasFieldId(f)) {
+ if (shouldMatchById && ParquetUtils.hasFieldId(f)) {
matchIdField(f)
} else if (caseSensitive) {
matchCaseSensitiveField(f)
diff --git
a/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometSparkToParquetSchemaConverter.scala
b/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometSparkToParquetSchemaConverter.scala
index 2c8187e1..56adc460 100644
---
a/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometSparkToParquetSchemaConverter.scala
+++
b/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometSparkToParquetSchemaConverter.scala
@@ -26,6 +26,7 @@ import
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
import org.apache.parquet.schema.Type.Repetition._
import org.apache.spark.sql.errors.QueryCompilationErrors
import
org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
+import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -66,8 +67,8 @@ class CometSparkToParquetSchemaConverter(
*/
def convertField(field: StructField): Type = {
val converted = convertField(field, if (field.nullable) OPTIONAL else
REQUIRED)
- if (useFieldId && CometParquetUtils.hasFieldId(field)) {
- converted.withId(CometParquetUtils.getFieldId(field))
+ if (useFieldId && ParquetUtils.hasFieldId(field)) {
+ converted.withId(ParquetUtils.getFieldId(field))
} else {
converted
}
diff --git
a/common/src/main/spark-3.x/org/apache/comet/shims/ShimBatchReader.scala
b/common/src/main/spark-3.x/org/apache/comet/shims/ShimBatchReader.scala
index ece4cfbe..18f91acc 100644
--- a/common/src/main/spark-3.x/org/apache/comet/shims/ShimBatchReader.scala
+++ b/common/src/main/spark-3.x/org/apache/comet/shims/ShimBatchReader.scala
@@ -24,7 +24,7 @@ import
org.apache.spark.sql.execution.datasources.PartitionedFile
object ShimBatchReader {
- // TODO: remove after dropping Spark 3.2 & 3.3 support and directly call
PartitionedFile
+ // TODO: remove after dropping Spark 3.3 support and directly call
PartitionedFile
def newPartitionedFile(partitionValues: InternalRow, file: String):
PartitionedFile =
classOf[PartitionedFile].getDeclaredConstructors
.map(c =>
diff --git
a/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala
b/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala
index 5ab7eaf4..685e8f56 100644
--- a/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala
+++ b/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala
@@ -21,15 +21,12 @@ package org.apache.comet.shims
object ShimFileFormat {
- // TODO: remove after dropping Spark 3.2 & 3.3 support and directly use
FileFormat.ROW_INDEX
+ // TODO: remove after dropping Spark 3.3 support and directly use
FileFormat.ROW_INDEX
val ROW_INDEX = "row_index"
// A name for a temporary column that holds row indexes computed by the file
format reader
// until they can be placed in the _metadata struct.
- // TODO: remove after dropping Spark 3.2 & 3.3 support and directly use
+ // TODO: remove after dropping Spark 3.3 support and directly use
// FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
val ROW_INDEX_TEMPORARY_COLUMN_NAME: String = s"_tmp_metadata_$ROW_INDEX"
-
- // TODO: remove after dropping Spark 3.2 support and use
FileFormat.OPTION_RETURNING_BATCH
- val OPTION_RETURNING_BATCH = "returning_batch"
}
diff --git
a/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala
b/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala
index 8a30c8e0..4f7d4983 100644
---
a/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala
+++
b/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala
@@ -24,7 +24,7 @@ import scala.util.Try
import org.apache.spark.sql.types.{StructField, StructType}
object ShimResolveDefaultColumns {
- // TODO: remove after dropping Spark 3.2 & 3.3 support and directly use
ResolveDefaultColumns
+ // TODO: remove after dropping Spark 3.3 support and directly use
ResolveDefaultColumns
def getExistenceDefaultValue(field: StructField): Any =
Try {
// scalastyle:off classforname
diff --git
a/common/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala
b/common/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala
deleted file mode 100644
index f22ac406..00000000
---
a/common/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.spark.sql.types._
-
-trait ShimCometParquetUtils {
- // The following is copied from QueryExecutionErrors
- // TODO: remove after dropping Spark 3.2.0 support and directly use
- // QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError
- def foundDuplicateFieldInFieldIdLookupModeError(
- requiredId: Int,
- matchedFields: String): Throwable = {
- new RuntimeException(s"""
- |Found duplicate field(s) "$requiredId": $matchedFields
- |in id mapping mode
- """.stripMargin.replaceAll("\n", " "))
- }
-
- // The followings are copied from
org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
- // TODO: remove after dropping Spark 3.2.0 support and directly use
ParquetUtils
- /**
- * A StructField metadata key used to set the field id of a column in the
Parquet schema.
- */
- val FIELD_ID_METADATA_KEY = "parquet.field.id"
-
- /**
- * Whether there exists a field in the schema, whether inner or leaf, has
the parquet field ID
- * metadata.
- */
- def hasFieldIds(schema: StructType): Boolean = {
- def recursiveCheck(schema: DataType): Boolean = {
- schema match {
- case st: StructType =>
- st.exists(field => hasFieldId(field) ||
recursiveCheck(field.dataType))
-
- case at: ArrayType => recursiveCheck(at.elementType)
-
- case mt: MapType => recursiveCheck(mt.keyType) ||
recursiveCheck(mt.valueType)
-
- case _ =>
- // No need to really check primitive types, just to terminate the
recursion
- false
- }
- }
- if (schema.isEmpty) false else recursiveCheck(schema)
- }
-
- def hasFieldId(field: StructField): Boolean =
- field.metadata.contains(FIELD_ID_METADATA_KEY)
-
- def getFieldId(field: StructField): Int = {
- require(
- hasFieldId(field),
- s"The key `$FIELD_ID_METADATA_KEY` doesn't exist in the metadata of " +
field)
- try {
- Math.toIntExact(field.metadata.getLong(FIELD_ID_METADATA_KEY))
- } catch {
- case _: ArithmeticException | _: ClassCastException =>
- throw new IllegalArgumentException(
- s"The key `$FIELD_ID_METADATA_KEY` must be a 32-bit integer")
- }
- }
-}
diff --git
a/common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala
b/common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala
deleted file mode 100644
index d402cd78..00000000
---
a/common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
-import org.apache.spark.sql.types._
-
-trait ShimCometParquetUtils {
- def foundDuplicateFieldInFieldIdLookupModeError(
- requiredId: Int,
- matchedFields: String): Throwable = {
-
QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(requiredId,
matchedFields)
- }
-
- def hasFieldIds(schema: StructType): Boolean =
ParquetUtils.hasFieldIds(schema)
-
- def hasFieldId(field: StructField): Boolean = ParquetUtils.hasFieldId(field)
-
- def getFieldId(field: StructField): Int = ParquetUtils.getFieldId (field)
-}
diff --git a/core/src/errors.rs b/core/src/errors.rs
index 493880c3..b38c5e90 100644
--- a/core/src/errors.rs
+++ b/core/src/errors.rs
@@ -61,7 +61,7 @@ pub enum CometError {
Internal(String),
// Note that this message format is based on Spark 3.4 and is more
detailed than the message
- // returned by Spark 3.2 or 3.3
+ // returned by Spark 3.3
#[error("[CAST_INVALID_INPUT] The value '{value}' of the type
\"{from_type}\" cannot be cast to \"{to_type}\" \
because it is malformed. Correct the value as per the syntax, or
change its target type. \
Use `try_cast` to tolerate malformed input and return NULL instead. If
necessary \
diff --git a/docs/source/contributor-guide/adding_a_new_expression.md
b/docs/source/contributor-guide/adding_a_new_expression.md
index 6cf10c75..6d906c66 100644
--- a/docs/source/contributor-guide/adding_a_new_expression.md
+++ b/docs/source/contributor-guide/adding_a_new_expression.md
@@ -46,7 +46,7 @@ The `QueryPlanSerde` object has a method `exprToProto`, which
is responsible for
For example, the `unhex` function looks like this:
```scala
-case e: Unhex if !isSpark32 =>
+case e: Unhex =>
val unHex = unhexSerde(e)
val childExpr = exprToProtoInternal(unHex._1, inputs)
@@ -59,7 +59,6 @@ case e: Unhex if !isSpark32 =>
A few things to note here:
-* The `isSpark32` check is used to fall back to Spark's implementation of
`unhex` in Spark 3.2. This is somewhat context specific, because in this case,
due to a bug in Spark 3.2 for `unhex`, we want to use the Spark implementation
and not a Comet implementation that would behave differently if correct.
* The function is recursively called on child expressions, so you'll need to
make sure that the child expressions are also converted to protobuf.
* `scalarExprToProtoWithReturnType` is for scalar functions that need return
type information. Your expression may use a different method depending on the
type of expression.
@@ -71,8 +70,6 @@ For example, this is the test case for the `unhex` expression:
```scala
test("unhex") {
- assume(!isSpark32, "unhex function has incorrect behavior in 3.2") // used
to skip the test in Spark 3.2
-
val table = "unhex_table"
withTable(table) {
sql(s"create table $table(col string) using parquet")
@@ -172,11 +169,11 @@ pub(super) fn spark_unhex(args: &[ColumnarValue]) ->
Result<ColumnarValue, DataF
If the expression you're adding has different behavior across different Spark
versions, you'll need to account for that in your implementation. There are two
tools at your disposal to help with this:
1. Shims that exist in
`spark/src/main/spark-$SPARK_VERSION/org/apache/comet/shims/CometExprShim.scala`
for each Spark version. These shims are used to provide compatibility between
different Spark versions.
-2. Variables that correspond to the Spark version, such as `isSpark32`, which
can be used to conditionally execute code based on the Spark version.
+2. Variables that correspond to the Spark version, such as `isSpark33Plus`,
which can be used to conditionally execute code based on the Spark version.
## Shimming to Support Different Spark Versions
-By adding shims for each Spark version, you can provide a consistent interface
for the expression across different Spark versions. For example, `unhex` added
a new optional parameter is Spark 3.4, for if it should `failOnError` or not.
So for version 3.2 and 3.3, the shim is:
+By adding shims for each Spark version, you can provide a consistent interface
for the expression across different Spark versions. For example, `unhex` added
a new optional parameter is Spark 3.4, for if it should `failOnError` or not.
So for version 3.3, the shim is:
```scala
trait CometExprShim {
diff --git a/docs/source/user-guide/installation.md
b/docs/source/user-guide/installation.md
index 7335a488..bdf6c0e0 100644
--- a/docs/source/user-guide/installation.md
+++ b/docs/source/user-guide/installation.md
@@ -28,7 +28,7 @@ Make sure the following requirements are met and software
installed on your mach
## Requirements
-- Apache Spark 3.2, 3.3, or 3.4
+- Apache Spark 3.3, or 3.4
- JDK 8 and up
- GLIBC 2.17 (Centos 7) and up
diff --git a/docs/source/user-guide/overview.md
b/docs/source/user-guide/overview.md
index b5425d77..87f5f286 100644
--- a/docs/source/user-guide/overview.md
+++ b/docs/source/user-guide/overview.md
@@ -40,7 +40,7 @@ The following diagram illustrates the architecture of Comet:
## Current Status
-The project is currently integrated into Apache Spark 3.2, 3.3, and 3.4.
+The project is currently integrated into Apache Spark 3.3, and 3.4.
## Feature Parity with Apache Spark
diff --git a/pom.xml b/pom.xml
index 788ee3d2..4407a9d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -517,20 +517,6 @@ under the License.
</properties>
</profile>
- <profile>
- <id>spark-3.2</id>
- <properties>
- <scala.version>2.12.15</scala.version>
- <spark.version>3.2.2</spark.version>
- <spark.version.short>3.2</spark.version.short>
- <parquet.version>1.12.0</parquet.version>
- <!-- we don't add special test suits for spark-3.2, so a not existed
dir is specified-->
- <additional.3_3.test.source>not-needed-yet</additional.3_3.test.source>
- <additional.3_4.test.source>not-needed-yet</additional.3_4.test.source>
- <shims.minorVerSrc>spark-3.2</shims.minorVerSrc>
- </properties>
- </profile>
-
<profile>
<id>spark-3.3</id>
<properties>
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 0136b62a..e939b43a 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -96,7 +96,7 @@ class CometSparkSessionExtensions
isSchemaSupported(scanExec.scan.asInstanceOf[ParquetScan].readDataSchema) &&
isSchemaSupported(scanExec.scan.asInstanceOf[ParquetScan].readPartitionSchema)
&&
// Comet does not support pushedAggregate
-
getPushedAggregate(scanExec.scan.asInstanceOf[ParquetScan]).isEmpty =>
+
scanExec.scan.asInstanceOf[ParquetScan].pushedAggregate.isEmpty =>
val cometScan =
CometParquetScan(scanExec.scan.asInstanceOf[ParquetScan])
logInfo("Comet extension enabled for Scan")
CometBatchScanExec(
@@ -116,7 +116,7 @@ class CometSparkSessionExtensions
s"Partition schema $readPartitionSchema is not supported")
// Comet does not support pushedAggregate
val info3 = createMessage(
-
getPushedAggregate(scanExec.scan.asInstanceOf[ParquetScan]).isDefined,
+
scanExec.scan.asInstanceOf[ParquetScan].pushedAggregate.isDefined,
"Comet does not support pushed aggregate")
withInfos(scanExec, Seq(info1, info2, info3).flatten.toSet)
scanExec
@@ -992,8 +992,7 @@ object CometSparkSessionExtensions extends Logging {
case BooleanType | ByteType | ShortType | IntegerType | LongType |
FloatType | DoubleType |
BinaryType | StringType | _: DecimalType | DateType | TimestampType =>
true
- // `TimestampNTZType` is private in Spark 3.2.
- case t: DataType if t.typeName == "timestamp_ntz" && !isSpark32 => true
+ case t: DataType if t.typeName == "timestamp_ntz" => true
case dt =>
logInfo(s"Comet extension is disabled because data type $dt is not
supported")
false
@@ -1015,11 +1014,6 @@ object CometSparkSessionExtensions extends Logging {
}
}
- /** Used for operations that weren't available in Spark 3.2 */
- def isSpark32: Boolean = {
- org.apache.spark.SPARK_VERSION.matches("3\\.2\\..*")
- }
-
def isSpark33Plus: Boolean = {
org.apache.spark.SPARK_VERSION >= "3.3"
}
diff --git a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
index 58c2aeb4..17844aba 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
@@ -45,8 +45,8 @@ import
org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
import org.apache.comet.shims.ShimSQLConf
/**
- * Copied from Spark 3.2 & 3.4, in order to fix Parquet shading issue. TODO:
find a way to remove
- * this duplication
+ * Copied from Spark 3.4, in order to fix Parquet shading issue. TODO: find a
way to remove this
+ * duplication
*
* Some utility function to convert Spark data source filters to Parquet
filters.
*/
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 7c4f5f25..67ecfe52 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled,
isCometScan, isSpark32, isSpark34Plus, withInfo}
+import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled,
isCometScan, isSpark34Plus, withInfo}
import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible,
Incompatible, Unsupported}
import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType =>
ProtoDataType, Expr, ScalarFunc}
import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo,
DecimalInfo, ListInfo, MapInfo, StructInfo}
@@ -63,7 +63,6 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde
with CometExprShim
_: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _:
DecimalType |
_: DateType | _: BooleanType | _: NullType =>
true
- // `TimestampNTZType` is private in Spark 3.2.
case dt if dt.typeName == "timestamp_ntz" => true
case dt =>
emitWarning(s"unsupported Spark data type: $dt")
@@ -1413,7 +1412,7 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
}
case UnaryExpression(child) if expr.prettyName == "promote_precision"
=>
- // `UnaryExpression` includes `PromotePrecision` for Spark 3.2 & 3.3
+ // `UnaryExpression` includes `PromotePrecision` for Spark 3.3
// `PromotePrecision` is just a wrapper, don't need to serialize it.
exprToProtoInternal(child, inputs)
@@ -1518,7 +1517,7 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
optExprWithInfo(optExpr, expr, child)
- case e: Unhex if !isSpark32 =>
+ case e: Unhex =>
val unHex = unhexSerde(e)
val childExpr = exprToProtoInternal(unHex._1, inputs)
@@ -1585,9 +1584,7 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
val optExpr = scalarExprToProto("pow", leftExpr, rightExpr)
optExprWithInfo(optExpr, expr, left, right)
- // round function for Spark 3.2 does not allow negative round target
scale. In addition,
- // it has different result precision/scale for decimals. Supporting
only 3.3 and above.
- case r: Round if !isSpark32 =>
+ case r: Round =>
// _scale s a constant, copied from Spark's RoundBase because it is
a protected val
val scaleV: Any = r.scale.eval(EmptyRow)
val _scale: Int = scaleV.asInstanceOf[Int]
@@ -2066,7 +2063,7 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
childExpr)
optExprWithInfo(optExpr, expr, child)
- case b @ BinaryExpression(_, _) if isBloomFilterMightContain(b) =>
+ case b @ BloomFilterMightContain(_, _) =>
val bloomFilter = b.left
val value = b.right
val bloomFilterExpr = exprToProtoInternal(bloomFilter, inputs)
@@ -2244,7 +2241,7 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _:
FloatType |
_: DoubleType | _: StringType | _: DateType | _: DecimalType | _:
BooleanType =>
true
- // `TimestampNTZType` is private in Spark 3.2/3.3.
+ // `TimestampNTZType` is private in Spark 3.3.
case dt if dt.typeName == "timestamp_ntz" => true
case _ => false
}
@@ -2322,12 +2319,9 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
if (childOp.nonEmpty && globalLimitExec.limit >= 0) {
val limitBuilder = OperatorOuterClass.Limit.newBuilder()
- // Spark 3.2 doesn't support offset for GlobalLimit, but newer Spark
versions
- // support it. Before we upgrade to Spark 3.3, just set it zero.
// TODO: Spark 3.3 might have negative limit (-1) for Offset usage.
// When we upgrade to Spark 3.3., we need to address it here.
limitBuilder.setLimit(globalLimitExec.limit)
- limitBuilder.setOffset(0)
Some(result.setLimit(limitBuilder).build())
} else {
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
index d6c3c87a..82ebed95 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
@@ -44,6 +44,10 @@ case class CometBatchScanExec(wrapped: BatchScanExec,
runtimeFilters: Seq[Expres
wrapped.logicalLink.foreach(setLogicalLink)
+ def keyGroupedPartitioning: Option[Seq[Expression]] =
wrapped.keyGroupedPartitioning
+
+ def inputPartitions: Seq[InputPartition] = wrapped.inputPartitions
+
override lazy val inputRDD: RDD[InternalRow] = wrappedScan.inputRDD
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
@@ -144,11 +148,7 @@ case class CometBatchScanExec(wrapped: BatchScanExec,
runtimeFilters: Seq[Expres
}
}
- // Intentionally omitting the return type as it is different depending on
Spark version
- // Spark 3.2.x Seq[InputPartition]
- // Spark 3.3.x Seq[Seq[InputPartition]]
- // TODO: add back the return type after dropping Spark 3.2.0 support
- @transient override lazy val partitions = wrappedScan.partitions
+ @transient override lazy val partitions: Seq[Seq[InputPartition]] =
wrappedScan.partitions
override def supportsColumnar: Boolean = true
}
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
index 9a5b55d6..ca99e36b 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.comet.shims.ShimCometScanExec
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
+import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD
import org.apache.spark.sql.execution.metric._
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -44,7 +45,6 @@ import org.apache.spark.util.collection._
import org.apache.comet.{CometConf, MetricsSupport}
import org.apache.comet.parquet.{CometParquetFileFormat,
CometParquetPartitionReaderFactory}
-import org.apache.comet.shims.ShimFileFormat
/**
* Comet physical scan node for DataSource V1. Most of the code here follow
Spark's
@@ -150,7 +150,7 @@ case class CometScanExec(
lazy val inputRDD: RDD[InternalRow] = {
val options = relation.options +
- (ShimFileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString)
+ (FileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString)
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
@@ -402,7 +402,7 @@ case class CometScanExec(
new ParquetOptions(CaseInsensitiveMap(relation.options), sqlConf),
metrics)
- newDataSourceRDD(
+ new DataSourceRDD(
fsRelation.sparkSession.sparkContext,
partitions.map(Seq(_)),
partitionReaderFactory,
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
index 3f4d7bfd..aabe3c35 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
@@ -583,14 +583,14 @@ class CometShuffleWriteProcessor(
}
/**
- * Copied from Spark `PartitionIdPassthrough` as it is private in Spark 3.2.
+ * Copied from Spark `PartitionIdPassthrough` as it is private in Spark 3.3.
*/
private[spark] class PartitionIdPassthrough(override val numPartitions: Int)
extends Partitioner {
override def getPartition(key: Any): Int = key.asInstanceOf[Int]
}
/**
- * Copied from Spark `ConstantPartitioner` as it doesn't exist in Spark 3.2.
+ * Copied from Spark `ConstantPartitioner` as it doesn't exist in Spark 3.3.
*/
private[spark] class ConstantPartitioner extends Partitioner {
override def numPartitions: Int = 1
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala
index 996526e5..f28ca2bb 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala
@@ -88,7 +88,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper {
}
}
- // Copied from Spark 3.4+ to make it available in Spark 3.2+.
+ // Copied from Spark 3.4+ to make it available in Spark 3.3+.
def multiTransformDown(expr: Expression)(
rule: PartialFunction[Expression, Seq[Expression]]): Stream[Expression]
= {
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala
index 8c6f0af1..d584e860 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.UnaryExecNode
* satisfies distribution requirements.
*
* This is copied from Spark's `PartitioningPreservingUnaryExecNode` because
it is only available
- * in Spark 3.4+. This is a workaround to make it available in Spark 3.2+.
+ * in Spark 3.4+. This is a workaround to make it available in Spark 3.3+.
*/
trait PartitioningPreservingUnaryExecNode extends UnaryExecNode with
AliasAwareOutputExpression {
final override def outputPartitioning: Partitioning = {
diff --git
a/spark/src/main/spark-3.2/org/apache/comet/shims/CometExprShim.scala
b/spark/src/main/spark-3.2/org/apache/comet/shims/CometExprShim.scala
deleted file mode 100644
index 2c6f6ccf..00000000
--- a/spark/src/main/spark-3.2/org/apache/comet/shims/CometExprShim.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.comet.shims
-
-import org.apache.comet.expressions.CometEvalMode
-import org.apache.spark.sql.catalyst.expressions._
-
-/**
- * `CometExprShim` acts as a shim for for parsing expressions from different
Spark versions.
- */
-trait CometExprShim {
- /**
- * Returns a tuple of expressions for the `unhex` function.
- */
- protected def unhexSerde(unhex: Unhex): (Expression, Expression) = {
- (unhex.child, Literal(false))
- }
-
- protected def evalMode(c: Cast): CometEvalMode.Value =
CometEvalMode.fromBoolean(c.ansiEnabled)
-}
-
diff --git
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala
index 9e7cc3ba..49e93111 100644
---
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala
+++
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala
@@ -19,24 +19,12 @@
package org.apache.comet.shims
-import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder}
-import org.apache.spark.sql.connector.read.InputPartition
+import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
trait ShimCometBatchScanExec {
def wrapped: BatchScanExec
- // Only for Spark 3.3+
- def keyGroupedPartitioning: Option[Seq[Expression]] =
wrapped.getClass.getDeclaredMethods
- .filter(_.getName == "keyGroupedPartitioning")
- .flatMap(_.invoke(wrapped).asInstanceOf[Option[Seq[Expression]]])
- .headOption
-
- // Only for Spark 3.3+
- def inputPartitions: Seq[InputPartition] =
wrapped.getClass.getDeclaredMethods
- .filter(_.getName == "inputPartitions")
- .flatMap(_.invoke(wrapped).asInstanceOf[Seq[InputPartition]])
-
// Only for Spark 3.4+
def ordering: Option[Seq[SortOrder]] = wrapped.getClass.getDeclaredMethods
.filter(_.getName == "ordering")
diff --git
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
index eef0ee9d..442bb9e5 100644
---
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
+++
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
@@ -27,9 +27,9 @@ trait ShimCometBroadcastHashJoinExec {
/**
* Returns the expressions that are used for hash partitioning including
`HashPartitioning` and
* `CoalescedHashPartitioning`. They shares same trait
`HashPartitioningLike` since Spark 3.4,
- * but Spark 3.2/3.3 doesn't have `HashPartitioningLike` and
`CoalescedHashPartitioning`.
+ * but Spark 3.3 doesn't have `HashPartitioningLike` and
`CoalescedHashPartitioning`.
*
- * TODO: remove after dropping Spark 3.2 and 3.3 support.
+ * TODO: remove after dropping Spark 3.3 support.
*/
def getHashPartitioningLikeExpressions(partitioning: Partitioning):
Seq[Expression] = {
partitioning.getClass.getDeclaredMethods
diff --git
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
index 350aeb9f..965b6851 100644
---
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
+++
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
@@ -26,7 +26,7 @@ import
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.types.{StructField, StructType}
trait ShimCometShuffleExchangeExec {
- // TODO: remove after dropping Spark 3.2 and 3.3 support
+ // TODO: remove after dropping Spark 3.3 support
def apply(s: ShuffleExchangeExec, shuffleType: ShuffleType):
CometShuffleExchangeExec = {
val advisoryPartitionSize = s.getClass.getDeclaredMethods
.filter(_.getName == "advisoryPartitionSize")
diff --git
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
index 37748533..c8aeacf2 100644
---
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
+++
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
@@ -19,22 +19,11 @@
package org.apache.comet.shims
-import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
import org.apache.spark.sql.execution.{LimitExec, QueryExecution, SparkPlan}
-import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
trait ShimCometSparkSessionExtensions {
/**
- * TODO: delete after dropping Spark 3.2.0 support and directly call
scan.pushedAggregate
- */
- def getPushedAggregate(scan: ParquetScan): Option[Aggregation] =
scan.getClass.getDeclaredFields
- .filter(_.getName == "pushedAggregate")
- .map { a => a.setAccessible(true); a }
- .flatMap(_.get(scan).asInstanceOf[Option[Aggregation]])
- .headOption
-
- /**
- * TODO: delete after dropping Spark 3.2 and 3.3 support
+ * TODO: delete after dropping Spark 3.3 support
*/
def getOffset(limit: LimitExec): Int = getOffsetOpt(limit).getOrElse(0)
diff --git
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
index 2e1c681c..983e099f 100644
---
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
+++
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
@@ -24,7 +24,7 @@ import
org.apache.spark.sql.execution.TakeOrderedAndProjectExec
trait ShimCometTakeOrderedAndProjectExec {
/**
- * TODO: delete after dropping Spark 3.2 and 3.3 support
+ * TODO: delete after dropping Spark 3.3 support
*/
protected def getOffset(plan: TakeOrderedAndProjectExec): Option[Int] = {
plan.getClass.getDeclaredFields
diff --git
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala
index b92d3fc6..1b0996d9 100644
--- a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala
+++ b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala
@@ -19,7 +19,7 @@
package org.apache.comet.shims
-import org.apache.spark.sql.catalyst.expressions.{BinaryArithmetic,
BinaryExpression}
+import org.apache.spark.sql.catalyst.expressions.BinaryArithmetic
import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
trait ShimQueryPlanSerde {
@@ -45,7 +45,7 @@ trait ShimQueryPlanSerde {
}
}
- // TODO: delete after drop Spark 3.2/3.3 support
+ // TODO: delete after drop Spark 3.3 support
// This method is used to check if the aggregate function is in legacy mode.
// EvalMode is an enum object in Spark 3.4.
def isLegacyMode(aggregate: DeclarativeAggregate): Boolean = {
@@ -62,9 +62,4 @@ trait ShimQueryPlanSerde {
"legacy".equalsIgnoreCase(evalMode.head.toString)
}
}
-
- // TODO: delete after drop Spark 3.2 support
- def isBloomFilterMightContain(binary: BinaryExpression): Boolean = {
- binary.getClass.getName ==
"org.apache.spark.sql.catalyst.expressions.BloomFilterMightContain"
- }
}
diff --git a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimSQLConf.scala
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimSQLConf.scala
index c3d0c56e..579db51c 100644
--- a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimSQLConf.scala
+++ b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimSQLConf.scala
@@ -28,7 +28,7 @@ trait ShimSQLConf {
* Spark 3.4 renamed parquetFilterPushDownStringStartWith to
* parquetFilterPushDownStringPredicate
*
- * TODO: delete after dropping Spark 3.2 & 3.3 support and simply use
+ * TODO: delete after dropping Spark 3.3 support and simply use
* parquetFilterPushDownStringPredicate
*/
protected def getPushDownStringPredicate(sqlConf: SQLConf): Boolean =
diff --git
a/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
b/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
index aede4795..afcf653b 100644
---
a/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
+++
b/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
@@ -25,7 +25,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
trait ShimCometBroadcastExchangeExec {
- // TODO: remove after dropping Spark 3.2 and 3.3 support
+ // TODO: remove after dropping Spark 3.3 support
protected def doBroadcast[T: ClassTag](sparkContext: SparkContext, value:
T): Broadcast[Any] = {
// Spark 3.4 has new API `broadcastInternal` to broadcast the relation
without caching the
// unserialized object.
diff --git
a/spark/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
b/spark/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
index 02b97f9f..65fb59a3 100644
---
a/spark/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
+++
b/spark/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
@@ -21,32 +21,27 @@ package org.apache.spark.sql.comet.shims
import org.apache.comet.shims.ShimFileFormat
-import scala.language.implicitConversions
-
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.SparkException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.connector.read.{InputPartition,
PartitionReaderFactory}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil}
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD,
HadoopFsRelation, PartitionDirectory, PartitionedFile}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
-import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD
-import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.{LongType, StructField, StructType}
trait ShimCometScanExec {
def wrapped: FileSourceScanExec
- // TODO: remove after dropping Spark 3.2 support and directly call
wrapped.metadataColumns
+ // TODO: remove after dropping Spark 3.3 support
lazy val metadataColumns: Seq[AttributeReference] =
wrapped.getClass.getDeclaredMethods
.filter(_.getName == "metadataColumns")
.map { a => a.setAccessible(true); a }
.flatMap(_.invoke(wrapped).asInstanceOf[Seq[AttributeReference]])
- // TODO: remove after dropping Spark 3.2 and 3.3 support and directly call
+ // TODO: remove after dropping Spark 3.3 support and directly call
// wrapped.fileConstantMetadataColumns
lazy val fileConstantMetadataColumns: Seq[AttributeReference] =
wrapped.getClass.getDeclaredMethods
@@ -54,18 +49,7 @@ trait ShimCometScanExec {
.map { a => a.setAccessible(true); a }
.flatMap(_.invoke(wrapped).asInstanceOf[Seq[AttributeReference]])
- // TODO: remove after dropping Spark 3.2 support and directly call new
DataSourceRDD
- protected def newDataSourceRDD(
- sc: SparkContext,
- inputPartitions: Seq[Seq[InputPartition]],
- partitionReaderFactory: PartitionReaderFactory,
- columnarReads: Boolean,
- customMetrics: Map[String, SQLMetric]): DataSourceRDD = {
- implicit def flattenSeq(p: Seq[Seq[InputPartition]]): Seq[InputPartition]
= p.flatten
- new DataSourceRDD(sc, inputPartitions, partitionReaderFactory,
columnarReads, customMetrics)
- }
-
- // TODO: remove after dropping Spark 3.2 support and directly call new
FileScanRDD
+ // TODO: remove after dropping Spark 3.3 and 3.4 support and directly call
new FileScanRDD
protected def newFileScanRDD(
fsRelation: HadoopFsRelation,
readFunction: PartitionedFile => Iterator[InternalRow],
@@ -74,10 +58,9 @@ trait ShimCometScanExec {
options: ParquetOptions): FileScanRDD =
classOf[FileScanRDD].getDeclaredConstructors
// Prevent to pick up incorrect constructors from any custom Spark forks.
- .filter(c => List(3, 5, 6).contains(c.getParameterCount()) )
+ .filter(c => List(5, 6).contains(c.getParameterCount()))
.map { c =>
c.getParameterCount match {
- case 3 => c.newInstance(fsRelation.sparkSession, readFunction,
filePartitions)
case 5 =>
c.newInstance(fsRelation.sparkSession, readFunction,
filePartitions, readSchema, metadataColumns)
case 6 =>
@@ -93,19 +76,15 @@ trait ShimCometScanExec {
.last
.asInstanceOf[FileScanRDD]
- // TODO: remove after dropping Spark 3.2 and 3.3 support and directly call
+ // TODO: remove after dropping Spark 3.3 support and directly call
// QueryExecutionErrors.SparkException
protected def invalidBucketFile(path: String, sparkVersion: String):
Throwable = {
- if (sparkVersion >= "3.3") {
- val messageParameters = if (sparkVersion >= "3.4") Map("path" -> path)
else Array(path)
- classOf[SparkException].getDeclaredConstructors
- .filter(_.getParameterCount == 3)
- .map(_.newInstance("INVALID_BUCKET_FILE", messageParameters, null))
- .last
- .asInstanceOf[SparkException]
- } else { // Spark 3.2
- new IllegalStateException(s"Invalid bucket file ${path}")
- }
+ val messageParameters = if (sparkVersion >= "3.4") Map("path" -> path)
else Array(path)
+ classOf[SparkException].getDeclaredConstructors
+ .filter(_.getParameterCount == 3)
+ .map(_.newInstance("INVALID_BUCKET_FILE", messageParameters, null))
+ .last
+ .asInstanceOf[SparkException]
}
// Copied from Spark 3.4 RowIndexUtil due to PARQUET-2161 (tracked in
SPARK-39634)
diff --git
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala
b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala
index 167b539f..d41502f6 100644
---
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala
+++
b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala
@@ -19,16 +19,11 @@
package org.apache.comet.shims
-import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder}
-import org.apache.spark.sql.connector.read.InputPartition
+import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
trait ShimCometBatchScanExec {
def wrapped: BatchScanExec
- def keyGroupedPartitioning: Option[Seq[Expression]] =
wrapped.keyGroupedPartitioning
-
- def inputPartitions: Seq[InputPartition] = wrapped.inputPartitions
-
def ordering: Option[Seq[SortOrder]] = wrapped.ordering
}
diff --git
a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
index 543116c1..48b9c808 100644
---
a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
+++
b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
@@ -24,15 +24,11 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.connector.read.{InputPartition,
PartitionReaderFactory}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
-import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil}
import org.apache.spark.sql.types.StructType
-import org.apache.spark.SparkContext
trait ShimCometScanExec {
def wrapped: FileSourceScanExec
@@ -40,14 +36,6 @@ trait ShimCometScanExec {
lazy val fileConstantMetadataColumns: Seq[AttributeReference] =
wrapped.fileConstantMetadataColumns
- protected def newDataSourceRDD(
- sc: SparkContext,
- inputPartitions: Seq[Seq[InputPartition]],
- partitionReaderFactory: PartitionReaderFactory,
- columnarReads: Boolean,
- customMetrics: Map[String, SQLMetric]): DataSourceRDD =
- new DataSourceRDD(sc, inputPartitions, partitionReaderFactory,
columnarReads, customMetrics)
-
protected def newFileScanRDD(
fsRelation: HadoopFsRelation,
readFunction: PartitionedFile => Iterator[InternalRow],
diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
index 31d718d4..885d6385 100644
--- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
@@ -571,9 +571,6 @@ class CometCastSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("cast StringType to DateType") {
- // error message for invalid dates in Spark 3.2 not supported by Comet see
below issue.
- // https://github.com/apache/datafusion-comet/issues/440
- assume(CometSparkSessionExtensions.isSpark33Plus)
val validDates = Seq(
"262142-01-01",
"262142-01-01 ",
@@ -956,7 +953,7 @@ class CometCastSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
private def castTest(input: DataFrame, toType: DataType): Unit = {
- // we now support the TryCast expression in Spark 3.2 and 3.3
+ // we now support the TryCast expression in Spark 3.3
withTempPath { dir =>
val data = roundtripParquet(input, dir).coalesce(1)
data.createOrReplaceTempView("t")
@@ -1002,7 +999,7 @@ class CometCastSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
} else if (CometSparkSessionExtensions.isSpark34Plus) {
// for Spark 3.4 we expect to reproduce the error message exactly
assert(cometMessage == sparkMessage)
- } else if (CometSparkSessionExtensions.isSpark33Plus) {
+ } else {
// for Spark 3.3 we just need to strip the prefix from the Comet
message
// before comparing
val cometMessageModified = cometMessage
@@ -1015,19 +1012,6 @@ class CometCastSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
} else {
assert(cometMessageModified == sparkMessage)
}
- } else {
- // for Spark 3.2 we just make sure we are seeing a similar type
of error
- if (sparkMessage.contains("causes overflow")) {
- assert(cometMessage.contains("due to an overflow"))
- } else if (sparkMessage.contains("cannot be represented as")) {
- assert(cometMessage.contains("cannot be represented as"))
- } else {
- // assume that this is an invalid input message in the form:
- // `invalid input syntax for type numeric:
-9223372036854775809`
- // we just check that the Comet message contains the same
literal value
- val sparkInvalidValue =
sparkMessage.substring(sparkMessage.indexOf(':') + 2)
- assert(cometMessage.contains(sparkInvalidValue))
- }
}
}
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 1c06e1da..8dbfb71b 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
import org.apache.spark.sql.types.{Decimal, DecimalType}
-import org.apache.comet.CometSparkSessionExtensions.{isSpark32, isSpark33Plus,
isSpark34Plus}
+import org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus,
isSpark34Plus}
class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
import testImplicits._
@@ -51,7 +51,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("decimals divide by zero") {
- // TODO: enable Spark 3.2 & 3.3 tests after supporting decimal divide
operation
+ // TODO: enable Spark 3.3 tests after supporting decimal divide operation
assume(isSpark34Plus)
Seq(true, false).foreach { dictionary =>
@@ -293,7 +293,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("cast timestamp and timestamp_ntz to string") {
- // TODO: make the test pass for Spark 3.2 & 3.3
+ // TODO: make the test pass for Spark 3.3
assume(isSpark34Plus)
withSQLConf(
@@ -318,7 +318,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("cast timestamp and timestamp_ntz to long, date") {
- // TODO: make the test pass for Spark 3.2 & 3.3
+ // TODO: make the test pass for Spark 3.3
assume(isSpark34Plus)
withSQLConf(
@@ -411,7 +411,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("date_trunc with timestamp_ntz") {
- assume(!isSpark32, "timestamp functions for timestamp_ntz have incorrect
behavior in 3.2")
withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
@@ -619,8 +618,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("contains") {
- assume(!isSpark32)
-
val table = "names"
withTable(table) {
sql(s"create table $table(id int, name varchar(20)) using parquet")
@@ -637,8 +634,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("startswith") {
- assume(!isSpark32)
-
val table = "names"
withTable(table) {
sql(s"create table $table(id int, name varchar(20)) using parquet")
@@ -655,8 +650,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("endswith") {
- assume(!isSpark32)
-
val table = "names"
withTable(table) {
sql(s"create table $table(id int, name varchar(20)) using parquet")
@@ -693,7 +686,7 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("decimals arithmetic and comparison") {
- // TODO: enable Spark 3.2 & 3.3 tests after supporting decimal reminder
operation
+ // TODO: enable Spark 3.3 tests after supporting decimal reminder operation
assume(isSpark34Plus)
def makeDecimalRDD(num: Int, decimal: DecimalType, useDictionary:
Boolean): DataFrame = {
@@ -958,10 +951,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("round") {
- assume(
- !isSpark32,
- "round function for Spark 3.2 does not allow negative target scale and
has different result precision/scale for decimals")
-
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
@@ -1177,11 +1166,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("unhex") {
- // When running against Spark 3.2, we include a bug fix for
https://issues.apache.org/jira/browse/SPARK-40924 that
- // was added in Spark 3.3, so although Comet's behavior is more correct
when running against Spark 3.2, it is not
- // the same (and this only applies to edge cases with hex inputs with
lengths that are not divisible by 2)
- assume(!isSpark32, "unhex function has incorrect behavior in 3.2")
-
val table = "unhex_table"
withTable(table) {
sql(s"create table $table(col string) using parquet")
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index ca7bc7df..93be1111 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -800,7 +800,7 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("final decimal avg") {
- // TODO: enable decimal average for Spark 3.2 & 3.3
+ // TODO: enable decimal average for Spark 3.3
assume(isSpark34Plus)
withSQLConf(
diff --git
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 0b37f5cc..bc18d8f1 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -184,7 +184,6 @@ abstract class ParquetReadSuite extends CometTestBase {
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "part-r-0.parquet")
val expected = makeRawTimeParquetFile(path, dictionaryEnabled =
dictionaryEnabled, 10000)
- val useLocalDateTime = spark.version >= "3.3"
readParquetFile(path.toString) { df =>
checkAnswer(
df.select($"_0", $"_1", $"_2", $"_3", $"_4", $"_5"),
@@ -192,17 +191,11 @@ abstract class ParquetReadSuite extends CometTestBase {
case None =>
Row(null, null, null, null, null, null)
case Some(i) =>
- // use `LocalDateTime` for `TimestampNTZType` with Spark 3.3
and above. At the moment,
- // Spark reads Parquet timestamp values into `Timestamp` (with
local timezone)
- // regardless of whether `isAdjustedToUTC` is true or false.
See SPARK-36182.
- // TODO: make `LocalDateTime` default after dropping Spark
3.2.0 support
val ts = new java.sql.Timestamp(i)
- val ldt = if (useLocalDateTime) {
- ts.toLocalDateTime
- .atZone(ZoneId.systemDefault())
- .withZoneSameInstant(ZoneOffset.UTC)
- .toLocalDateTime
- } else ts
+ val ldt = ts.toLocalDateTime
+ .atZone(ZoneId.systemDefault())
+ .withZoneSameInstant(ZoneOffset.UTC)
+ .toLocalDateTime
Row(ts, ts, ts, ldt, ts, ldt)
})
}
diff --git
a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
index ec87f19e..2638a0e6 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
@@ -29,7 +29,7 @@ import
org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.util.{fileToString, resourceToString,
stringToFile}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession}
+import org.apache.spark.sql.test.TestSparkSession
import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
@@ -50,7 +50,7 @@ import org.apache.comet.shims.ShimCometTPCHQuerySuite
* ./mvnw -Dsuites=org.apache.spark.sql.CometTPCHQuerySuite test
* }}}
*/
-class CometTPCHQuerySuite extends QueryTest with CometTPCBase with
ShimCometTPCHQuerySuite {
+class CometTPCHQuerySuite extends QueryTest with TPCBase with
ShimCometTPCHQuerySuite {
private val tpchDataPath = sys.env.get("SPARK_TPCH_DATA")
@@ -273,40 +273,7 @@ class CometTPCHQuerySuite extends QueryTest with
CometTPCBase with ShimCometTPCH
ignore("skipped because env `SPARK_TPCH_DATA` is not set") {}
}
- // TODO: remove once Spark 3.2 & 3.3 is no longer supported
+ // TODO: remove once Spark 3.3 is no longer supported
private def shouldRegenerateGoldenFiles: Boolean =
System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1"
}
-
-/**
- * `TPCBase` doesn't exist in Spark 3.2. TODO: remove once Spark 3.2 is no
longer supported
- */
-trait CometTPCBase extends SharedSparkSession {
- protected def injectStats: Boolean = false
-
- override protected def sparkConf: SparkConf = {
- if (injectStats) {
- super.sparkConf
- .set(SQLConf.MAX_TO_STRING_FIELDS, Int.MaxValue)
- .set(SQLConf.CBO_ENABLED, true)
- .set(SQLConf.PLAN_STATS_ENABLED, true)
- .set(SQLConf.JOIN_REORDER_ENABLED, true)
- } else {
- super.sparkConf.set(SQLConf.MAX_TO_STRING_FIELDS, Int.MaxValue)
- }
- }
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- createTables()
- }
-
- override def afterAll(): Unit = {
- dropTables()
- super.afterAll()
- }
-
- protected def createTables(): Unit
-
- protected def dropTables(): Unit
-}
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
index fc454944..b47de19b 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
@@ -24,8 +24,8 @@ import java.io.File
import scala.collection.JavaConverters._
import scala.util.Random
+import org.apache.spark.TestUtils
import org.apache.spark.benchmark.Benchmark
-import org.apache.spark.comet.shims.ShimTestUtils
import
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnVector
@@ -124,7 +124,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
(col: ColumnVector, i: Int) => longSum +=
col.getUTF8String(i).toLongExact
}
- val files = ShimTestUtils.listDirectory(new File(dir, "parquetV1"))
+ val files = TestUtils.listDirectory(new File(dir, "parquetV1"))
sqlBenchmark.addCase("ParquetReader Spark") { _ =>
files.map(_.asInstanceOf[String]).foreach { p =>
diff --git
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
index 691d2cd6..5e9864f0 100644
---
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
@@ -292,7 +292,7 @@ trait CometPlanStabilitySuite extends
DisableAdaptiveExecutionSuite with TPCDSBa
new TestSparkSession(new SparkContext("local[1]",
this.getClass.getCanonicalName, conf))
}
- // TODO: remove once Spark 3.2 & 3.3 is no longer supported
+ // TODO: remove once Spark 3.3 is no longer supported
private val shouldRegenerateGoldenFiles: Boolean =
System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1"
}
diff --git
a/spark/src/test/spark-3.x/org/apache/spark/comet/shims/ShimTestUtils.scala
b/spark/src/test/spark-3.x/org/apache/spark/comet/shims/ShimTestUtils.scala
deleted file mode 100644
index fcb543f9..00000000
--- a/spark/src/test/spark-3.x/org/apache/spark/comet/shims/ShimTestUtils.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.comet.shims
-
-import java.io.File
-import scala.collection.mutable.ArrayBuffer
-
-object ShimTestUtils {
-
- /**
- * Spark 3.3.0 moved {{{SpecificParquetRecordReaderBase.listDirectory}}} to
- * {{{org.apache.spark.TestUtils.listDirectory}}}. Copying it here to bridge
the difference
- * between Spark 3.2.0 and 3.3.0 TODO: remove after dropping Spark 3.2.0
support and use
- * {{{org.apache.spark.TestUtils.listDirectory}}}
- */
- def listDirectory(path: File): Array[String] = {
- val result = ArrayBuffer.empty[String]
- if (path.isDirectory) {
- path.listFiles.foreach(f => result.appendAll(listDirectory(f)))
- } else {
- val c = path.getName.charAt(0)
- if (c != '.' && c != '_') result.append(path.getAbsolutePath)
- }
- result.toArray
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]