This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new d584229c build: Drop Spark 3.2 support (#581)
d584229c is described below

commit d584229c8e7d9f04b9df2803958b87714356860b
Author: Huaxin Gao <[email protected]>
AuthorDate: Tue Jun 18 16:03:31 2024 -0700

    build: Drop Spark 3.2 support (#581)
    
    * build: Drop Spark 3.2 support
    
    * remove un-used import
    
    * fix BloomFilterMightContain
    
    * revert the changes for TimestampNTZType and PartitionIdPassthrough
    
    * address comments and remove more 3.2 related code
    
    * remove un-used import
    
    * put back newDataSourceRDD
    
    * remove un-used import and put back lazy val partitions
    
    * address comments
    
    * Trigger Build
    
    * remove the missed 3.2 pipeline
    
    * address comments
---
 .github/workflows/pr_build.yml                     | 15 +---
 .../apache/comet/parquet/CometParquetUtils.scala   |  3 +-
 .../comet/parquet/CometParquetReadSupport.scala    | 13 ++--
 .../CometSparkToParquetSchemaConverter.scala       |  5 +-
 .../org/apache/comet/shims/ShimBatchReader.scala   |  2 +-
 .../org/apache/comet/shims/ShimFileFormat.scala    |  7 +-
 .../comet/shims/ShimResolveDefaultColumns.scala    |  2 +-
 .../sql/comet/shims/ShimCometParquetUtils.scala    | 81 ----------------------
 .../sql/comet/shims/ShimCometParquetUtils.scala    | 38 ----------
 core/src/errors.rs                                 |  2 +-
 .../contributor-guide/adding_a_new_expression.md   |  9 +--
 docs/source/user-guide/installation.md             |  2 +-
 docs/source/user-guide/overview.md                 |  2 +-
 pom.xml                                            | 14 ----
 .../apache/comet/CometSparkSessionExtensions.scala | 12 +---
 .../org/apache/comet/parquet/ParquetFilters.scala  |  4 +-
 .../org/apache/comet/serde/QueryPlanSerde.scala    | 18 ++---
 .../spark/sql/comet/CometBatchScanExec.scala       | 10 +--
 .../org/apache/spark/sql/comet/CometScanExec.scala |  6 +-
 .../shuffle/CometShuffleExchangeExec.scala         |  4 +-
 .../comet/plans/AliasAwareOutputExpression.scala   |  2 +-
 .../PartitioningPreservingUnaryExecNode.scala      |  2 +-
 .../org/apache/comet/shims/CometExprShim.scala     | 37 ----------
 .../comet/shims/ShimCometBatchScanExec.scala       | 14 +---
 .../shims/ShimCometBroadcastHashJoinExec.scala     |  4 +-
 .../comet/shims/ShimCometShuffleExchangeExec.scala |  2 +-
 .../shims/ShimCometSparkSessionExtensions.scala    | 13 +---
 .../shims/ShimCometTakeOrderedAndProjectExec.scala |  2 +-
 .../apache/comet/shims/ShimQueryPlanSerde.scala    |  9 +--
 .../org/apache/comet/shims/ShimSQLConf.scala       |  2 +-
 .../shims/ShimCometBroadcastExchangeExec.scala     |  2 +-
 .../spark/sql/comet/shims/ShimCometScanExec.scala  | 47 ++++---------
 .../comet/shims/ShimCometBatchScanExec.scala       |  7 +-
 .../spark/sql/comet/shims/ShimCometScanExec.scala  | 12 ----
 .../scala/org/apache/comet/CometCastSuite.scala    | 20 +-----
 .../org/apache/comet/CometExpressionSuite.scala    | 26 ++-----
 .../apache/comet/exec/CometAggregateSuite.scala    |  2 +-
 .../apache/comet/parquet/ParquetReadSuite.scala    | 15 ++--
 .../org/apache/spark/sql/CometTPCHQuerySuite.scala | 39 +----------
 .../spark/sql/benchmark/CometReadBenchmark.scala   |  4 +-
 .../spark/sql/comet/CometPlanStabilitySuite.scala  |  2 +-
 .../apache/spark/comet/shims/ShimTestUtils.scala   | 43 ------------
 42 files changed, 87 insertions(+), 468 deletions(-)

diff --git a/.github/workflows/pr_build.yml b/.github/workflows/pr_build.yml
index 2bf02335..981905ec 100644
--- a/.github/workflows/pr_build.yml
+++ b/.github/workflows/pr_build.yml
@@ -109,15 +109,8 @@ jobs:
         os: [ubuntu-latest]
         java_version: [8, 11, 17]
         test-target: [java]
-        spark-version: ['3.2', '3.3']
+        spark-version: ['3.3']
         scala-version: ['2.12', '2.13']
-        exclude:
-          - java_version: 17
-            spark-version: '3.2'
-          - java_version: 11
-            spark-version: '3.2'
-          - spark-version: '3.2'
-            scala-version: '2.13'
       fail-fast: false
     name: ${{ matrix.os }}/java ${{ matrix.java_version 
}}-spark-${{matrix.spark-version}}-scala-${{matrix.scala-version}}/${{ 
matrix.test-target }}
     runs-on: ${{ matrix.os }}
@@ -254,15 +247,11 @@ jobs:
       matrix:
         java_version: [8, 17]
         test-target: [java]
-        spark-version: ['3.2', '3.3']
+        spark-version: ['3.3']
         scala-version: ['2.12', '2.13']
         exclude:
-          - java_version: 17
-            spark-version: '3.2'
           - java_version: 8
             spark-version: '3.3'
-          - spark-version: '3.2'
-            scala-version: '2.13'
       fail-fast: false
     name: macos-14(Silicon)/java ${{ matrix.java_version 
}}-spark-${{matrix.spark-version}}-scala-${{matrix.scala-version}}/${{ 
matrix.test-target }}
     runs-on: macos-14
diff --git 
a/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala 
b/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala
index d03252d0..a37ec7e6 100644
--- a/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala
+++ b/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala
@@ -20,10 +20,9 @@
 package org.apache.comet.parquet
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql.comet.shims.ShimCometParquetUtils
 import org.apache.spark.sql.internal.SQLConf
 
-object CometParquetUtils extends ShimCometParquetUtils {
+object CometParquetUtils {
   private val PARQUET_FIELD_ID_WRITE_ENABLED = 
"spark.sql.parquet.fieldId.write.enabled"
   private val PARQUET_FIELD_ID_READ_ENABLED = 
"spark.sql.parquet.fieldId.read.enabled"
   private val IGNORE_MISSING_PARQUET_FIELD_ID = 
"spark.sql.parquet.fieldId.read.ignoreMissing"
diff --git 
a/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometParquetReadSupport.scala
 
b/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometParquetReadSupport.scala
index 0e8a190c..4523a057 100644
--- 
a/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometParquetReadSupport.scala
+++ 
b/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometParquetReadSupport.scala
@@ -27,10 +27,9 @@ import org.apache.parquet.schema._
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.ListLogicalTypeAnnotation
 import org.apache.parquet.schema.Type.Repetition
 import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
 import org.apache.spark.sql.types._
 
-import org.apache.comet.parquet.CometParquetUtils
-
 /**
  * This class is copied & slightly modified from [[ParquetReadSupport]] in 
Spark. Changes:
  *   - This doesn't extend from Parquet's `ReadSupport` class since that is 
used for row-based
@@ -53,7 +52,7 @@ object CometParquetReadSupport {
       ignoreMissingIds: Boolean): MessageType = {
     if (!ignoreMissingIds &&
       !containsFieldIds(parquetSchema) &&
-      CometParquetUtils.hasFieldIds(catalystSchema)) {
+      ParquetUtils.hasFieldIds(catalystSchema)) {
       throw new RuntimeException(
         "Spark read schema expects field Ids, " +
           "but Parquet file schema doesn't contain any field Ids.\n" +
@@ -334,14 +333,14 @@ object CometParquetReadSupport {
     }
 
     def matchIdField(f: StructField): Type = {
-      val fieldId = CometParquetUtils.getFieldId(f)
+      val fieldId = ParquetUtils.getFieldId(f)
       idToParquetFieldMap
         .get(fieldId)
         .map { parquetTypes =>
           if (parquetTypes.size > 1) {
             // Need to fail if there is ambiguity, i.e. more than one field is 
matched
             val parquetTypesString = parquetTypes.map(_.getName).mkString("[", 
", ", "]")
-            throw 
CometParquetUtils.foundDuplicateFieldInFieldIdLookupModeError(
+            throw 
QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
               fieldId,
               parquetTypesString)
           } else {
@@ -355,9 +354,9 @@ object CometParquetReadSupport {
         }
     }
 
-    val shouldMatchById = useFieldId && 
CometParquetUtils.hasFieldIds(structType)
+    val shouldMatchById = useFieldId && ParquetUtils.hasFieldIds(structType)
     structType.map { f =>
-      if (shouldMatchById && CometParquetUtils.hasFieldId(f)) {
+      if (shouldMatchById && ParquetUtils.hasFieldId(f)) {
         matchIdField(f)
       } else if (caseSensitive) {
         matchCaseSensitiveField(f)
diff --git 
a/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometSparkToParquetSchemaConverter.scala
 
b/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometSparkToParquetSchemaConverter.scala
index 2c8187e1..56adc460 100644
--- 
a/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometSparkToParquetSchemaConverter.scala
+++ 
b/common/src/main/scala/org/apache/spark/sql/comet/parquet/CometSparkToParquetSchemaConverter.scala
@@ -26,6 +26,7 @@ import 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 import org.apache.parquet.schema.Type.Repetition._
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import 
org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
+import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
@@ -66,8 +67,8 @@ class CometSparkToParquetSchemaConverter(
    */
   def convertField(field: StructField): Type = {
     val converted = convertField(field, if (field.nullable) OPTIONAL else 
REQUIRED)
-    if (useFieldId && CometParquetUtils.hasFieldId(field)) {
-      converted.withId(CometParquetUtils.getFieldId(field))
+    if (useFieldId && ParquetUtils.hasFieldId(field)) {
+      converted.withId(ParquetUtils.getFieldId(field))
     } else {
       converted
     }
diff --git 
a/common/src/main/spark-3.x/org/apache/comet/shims/ShimBatchReader.scala 
b/common/src/main/spark-3.x/org/apache/comet/shims/ShimBatchReader.scala
index ece4cfbe..18f91acc 100644
--- a/common/src/main/spark-3.x/org/apache/comet/shims/ShimBatchReader.scala
+++ b/common/src/main/spark-3.x/org/apache/comet/shims/ShimBatchReader.scala
@@ -24,7 +24,7 @@ import 
org.apache.spark.sql.execution.datasources.PartitionedFile
 
 object ShimBatchReader {
 
-  // TODO: remove after dropping Spark 3.2 & 3.3 support and directly call 
PartitionedFile
+  // TODO: remove after dropping Spark 3.3 support and directly call 
PartitionedFile
   def newPartitionedFile(partitionValues: InternalRow, file: String): 
PartitionedFile =
     classOf[PartitionedFile].getDeclaredConstructors
       .map(c =>
diff --git 
a/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala 
b/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala
index 5ab7eaf4..685e8f56 100644
--- a/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala
+++ b/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala
@@ -21,15 +21,12 @@ package org.apache.comet.shims
 
 object ShimFileFormat {
 
-  // TODO: remove after dropping Spark 3.2 & 3.3 support and directly use 
FileFormat.ROW_INDEX
+  // TODO: remove after dropping Spark 3.3 support and directly use 
FileFormat.ROW_INDEX
   val ROW_INDEX = "row_index"
 
   // A name for a temporary column that holds row indexes computed by the file 
format reader
   // until they can be placed in the _metadata struct.
-  // TODO: remove after dropping Spark 3.2 & 3.3 support and directly use
+  // TODO: remove after dropping Spark 3.3 support and directly use
   //       FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
   val ROW_INDEX_TEMPORARY_COLUMN_NAME: String = s"_tmp_metadata_$ROW_INDEX"
-
-  // TODO: remove after dropping Spark 3.2 support and use 
FileFormat.OPTION_RETURNING_BATCH
-  val OPTION_RETURNING_BATCH = "returning_batch"
 }
diff --git 
a/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala
 
b/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala
index 8a30c8e0..4f7d4983 100644
--- 
a/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala
+++ 
b/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala
@@ -24,7 +24,7 @@ import scala.util.Try
 import org.apache.spark.sql.types.{StructField, StructType}
 
 object ShimResolveDefaultColumns {
-  // TODO: remove after dropping Spark 3.2 & 3.3 support and directly use 
ResolveDefaultColumns
+  // TODO: remove after dropping Spark 3.3 support and directly use 
ResolveDefaultColumns
   def getExistenceDefaultValue(field: StructField): Any =
     Try {
       // scalastyle:off classforname
diff --git 
a/common/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala
 
b/common/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala
deleted file mode 100644
index f22ac406..00000000
--- 
a/common/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.spark.sql.types._
-
-trait ShimCometParquetUtils {
-  // The following is copied from QueryExecutionErrors
-  // TODO: remove after dropping Spark 3.2.0 support and directly use
-  //       QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError
-  def foundDuplicateFieldInFieldIdLookupModeError(
-      requiredId: Int,
-      matchedFields: String): Throwable = {
-    new RuntimeException(s"""
-         |Found duplicate field(s) "$requiredId": $matchedFields
-         |in id mapping mode
-     """.stripMargin.replaceAll("\n", " "))
-  }
-
-  // The followings are copied from 
org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
-  // TODO: remove after dropping Spark 3.2.0 support and directly use 
ParquetUtils
-  /**
-   * A StructField metadata key used to set the field id of a column in the 
Parquet schema.
-   */
-  val FIELD_ID_METADATA_KEY = "parquet.field.id"
-
-  /**
-   * Whether there exists a field in the schema, whether inner or leaf, has 
the parquet field ID
-   * metadata.
-   */
-  def hasFieldIds(schema: StructType): Boolean = {
-    def recursiveCheck(schema: DataType): Boolean = {
-      schema match {
-        case st: StructType =>
-          st.exists(field => hasFieldId(field) || 
recursiveCheck(field.dataType))
-
-        case at: ArrayType => recursiveCheck(at.elementType)
-
-        case mt: MapType => recursiveCheck(mt.keyType) || 
recursiveCheck(mt.valueType)
-
-        case _ =>
-          // No need to really check primitive types, just to terminate the 
recursion
-          false
-      }
-    }
-    if (schema.isEmpty) false else recursiveCheck(schema)
-  }
-
-  def hasFieldId(field: StructField): Boolean =
-    field.metadata.contains(FIELD_ID_METADATA_KEY)
-
-  def getFieldId(field: StructField): Int = {
-    require(
-      hasFieldId(field),
-      s"The key `$FIELD_ID_METADATA_KEY` doesn't exist in the metadata of " + 
field)
-    try {
-      Math.toIntExact(field.metadata.getLong(FIELD_ID_METADATA_KEY))
-    } catch {
-      case _: ArithmeticException | _: ClassCastException =>
-        throw new IllegalArgumentException(
-          s"The key `$FIELD_ID_METADATA_KEY` must be a 32-bit integer")
-    }
-  }
-}
diff --git 
a/common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala
 
b/common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala
deleted file mode 100644
index d402cd78..00000000
--- 
a/common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometParquetUtils.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
-import org.apache.spark.sql.types._
-
-trait ShimCometParquetUtils {
-  def foundDuplicateFieldInFieldIdLookupModeError(
-      requiredId: Int,
-      matchedFields: String): Throwable = {
-    
QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(requiredId, 
matchedFields)
-  }
-
-  def hasFieldIds(schema: StructType): Boolean = 
ParquetUtils.hasFieldIds(schema)
-
-  def hasFieldId(field: StructField): Boolean = ParquetUtils.hasFieldId(field)
-
-  def getFieldId(field: StructField): Int = ParquetUtils.getFieldId (field)
-}
diff --git a/core/src/errors.rs b/core/src/errors.rs
index 493880c3..b38c5e90 100644
--- a/core/src/errors.rs
+++ b/core/src/errors.rs
@@ -61,7 +61,7 @@ pub enum CometError {
     Internal(String),
 
     // Note that this message format is based on Spark 3.4 and is more 
detailed than the message
-    // returned by Spark 3.2 or 3.3
+    // returned by Spark 3.3
     #[error("[CAST_INVALID_INPUT] The value '{value}' of the type 
\"{from_type}\" cannot be cast to \"{to_type}\" \
         because it is malformed. Correct the value as per the syntax, or 
change its target type. \
         Use `try_cast` to tolerate malformed input and return NULL instead. If 
necessary \
diff --git a/docs/source/contributor-guide/adding_a_new_expression.md 
b/docs/source/contributor-guide/adding_a_new_expression.md
index 6cf10c75..6d906c66 100644
--- a/docs/source/contributor-guide/adding_a_new_expression.md
+++ b/docs/source/contributor-guide/adding_a_new_expression.md
@@ -46,7 +46,7 @@ The `QueryPlanSerde` object has a method `exprToProto`, which 
is responsible for
 For example, the `unhex` function looks like this:
 
 ```scala
-case e: Unhex if !isSpark32 =>
+case e: Unhex =>
   val unHex = unhexSerde(e)
 
   val childExpr = exprToProtoInternal(unHex._1, inputs)
@@ -59,7 +59,6 @@ case e: Unhex if !isSpark32 =>
 
 A few things to note here:
 
-* The `isSpark32` check is used to fall back to Spark's implementation of 
`unhex` in Spark 3.2. This is somewhat context specific, because in this case, 
due to a bug in Spark 3.2 for `unhex`, we want to use the Spark implementation 
and not a Comet implementation that would behave differently if correct.
 * The function is recursively called on child expressions, so you'll need to 
make sure that the child expressions are also converted to protobuf.
 * `scalarExprToProtoWithReturnType` is for scalar functions that need return 
type information. Your expression may use a different method depending on the 
type of expression.
 
@@ -71,8 +70,6 @@ For example, this is the test case for the `unhex` expression:
 
 ```scala
 test("unhex") {
-  assume(!isSpark32, "unhex function has incorrect behavior in 3.2")  // used 
to skip the test in Spark 3.2
-
   val table = "unhex_table"
   withTable(table) {
     sql(s"create table $table(col string) using parquet")
@@ -172,11 +169,11 @@ pub(super) fn spark_unhex(args: &[ColumnarValue]) -> 
Result<ColumnarValue, DataF
 If the expression you're adding has different behavior across different Spark 
versions, you'll need to account for that in your implementation. There are two 
tools at your disposal to help with this:
 
 1. Shims that exist in 
`spark/src/main/spark-$SPARK_VERSION/org/apache/comet/shims/CometExprShim.scala`
 for each Spark version. These shims are used to provide compatibility between 
different Spark versions.
-2. Variables that correspond to the Spark version, such as `isSpark32`, which 
can be used to conditionally execute code based on the Spark version.
+2. Variables that correspond to the Spark version, such as `isSpark33Plus`, 
which can be used to conditionally execute code based on the Spark version.
 
 ## Shimming to Support Different Spark Versions
 
-By adding shims for each Spark version, you can provide a consistent interface 
for the expression across different Spark versions. For example, `unhex` added 
a new optional parameter is Spark 3.4, for if it should `failOnError` or not. 
So for version 3.2 and 3.3, the shim is:
+By adding shims for each Spark version, you can provide a consistent interface 
for the expression across different Spark versions. For example, `unhex` added 
a new optional parameter is Spark 3.4, for if it should `failOnError` or not. 
So for version 3.3, the shim is:
 
 ```scala
 trait CometExprShim {
diff --git a/docs/source/user-guide/installation.md 
b/docs/source/user-guide/installation.md
index 7335a488..bdf6c0e0 100644
--- a/docs/source/user-guide/installation.md
+++ b/docs/source/user-guide/installation.md
@@ -28,7 +28,7 @@ Make sure the following requirements are met and software 
installed on your mach
 
 ## Requirements
 
-- Apache Spark 3.2, 3.3, or 3.4
+- Apache Spark 3.3, or 3.4
 - JDK 8 and up
 - GLIBC 2.17 (Centos 7) and up
 
diff --git a/docs/source/user-guide/overview.md 
b/docs/source/user-guide/overview.md
index b5425d77..87f5f286 100644
--- a/docs/source/user-guide/overview.md
+++ b/docs/source/user-guide/overview.md
@@ -40,7 +40,7 @@ The following diagram illustrates the architecture of Comet:
 
 ## Current Status
 
-The project is currently integrated into Apache Spark 3.2, 3.3, and 3.4.
+The project is currently integrated into Apache Spark 3.3, and 3.4.
 
 ## Feature Parity with Apache Spark
 
diff --git a/pom.xml b/pom.xml
index 788ee3d2..4407a9d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -517,20 +517,6 @@ under the License.
       </properties>
     </profile>
 
-    <profile>
-      <id>spark-3.2</id>
-      <properties>
-        <scala.version>2.12.15</scala.version>
-        <spark.version>3.2.2</spark.version>
-        <spark.version.short>3.2</spark.version.short>
-        <parquet.version>1.12.0</parquet.version>
-        <!-- we don't add special test suits for spark-3.2, so a not existed 
dir is specified-->
-        <additional.3_3.test.source>not-needed-yet</additional.3_3.test.source>
-        <additional.3_4.test.source>not-needed-yet</additional.3_4.test.source>
-        <shims.minorVerSrc>spark-3.2</shims.minorVerSrc>
-      </properties>
-    </profile>
-
     <profile>
       <id>spark-3.3</id>
       <properties>
diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 0136b62a..e939b43a 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -96,7 +96,7 @@ class CometSparkSessionExtensions
                 
isSchemaSupported(scanExec.scan.asInstanceOf[ParquetScan].readDataSchema) &&
                 
isSchemaSupported(scanExec.scan.asInstanceOf[ParquetScan].readPartitionSchema) 
&&
                 // Comet does not support pushedAggregate
-                
getPushedAggregate(scanExec.scan.asInstanceOf[ParquetScan]).isEmpty =>
+                
scanExec.scan.asInstanceOf[ParquetScan].pushedAggregate.isEmpty =>
             val cometScan = 
CometParquetScan(scanExec.scan.asInstanceOf[ParquetScan])
             logInfo("Comet extension enabled for Scan")
             CometBatchScanExec(
@@ -116,7 +116,7 @@ class CometSparkSessionExtensions
               s"Partition schema $readPartitionSchema is not supported")
             // Comet does not support pushedAggregate
             val info3 = createMessage(
-              
getPushedAggregate(scanExec.scan.asInstanceOf[ParquetScan]).isDefined,
+              
scanExec.scan.asInstanceOf[ParquetScan].pushedAggregate.isDefined,
               "Comet does not support pushed aggregate")
             withInfos(scanExec, Seq(info1, info2, info3).flatten.toSet)
             scanExec
@@ -992,8 +992,7 @@ object CometSparkSessionExtensions extends Logging {
     case BooleanType | ByteType | ShortType | IntegerType | LongType | 
FloatType | DoubleType |
         BinaryType | StringType | _: DecimalType | DateType | TimestampType =>
       true
-    // `TimestampNTZType` is private in Spark 3.2.
-    case t: DataType if t.typeName == "timestamp_ntz" && !isSpark32 => true
+    case t: DataType if t.typeName == "timestamp_ntz" => true
     case dt =>
       logInfo(s"Comet extension is disabled because data type $dt is not 
supported")
       false
@@ -1015,11 +1014,6 @@ object CometSparkSessionExtensions extends Logging {
     }
   }
 
-  /** Used for operations that weren't available in Spark 3.2 */
-  def isSpark32: Boolean = {
-    org.apache.spark.SPARK_VERSION.matches("3\\.2\\..*")
-  }
-
   def isSpark33Plus: Boolean = {
     org.apache.spark.SPARK_VERSION >= "3.3"
   }
diff --git a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala 
b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
index 58c2aeb4..17844aba 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
@@ -45,8 +45,8 @@ import 
org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
 import org.apache.comet.shims.ShimSQLConf
 
 /**
- * Copied from Spark 3.2 & 3.4, in order to fix Parquet shading issue. TODO: 
find a way to remove
- * this duplication
+ * Copied from Spark 3.4, in order to fix Parquet shading issue. TODO: find a 
way to remove this
+ * duplication
  *
  * Some utility function to convert Spark data source filters to Parquet 
filters.
  */
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 7c4f5f25..67ecfe52 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, 
isCometScan, isSpark32, isSpark34Plus, withInfo}
+import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, 
isCometScan, isSpark34Plus, withInfo}
 import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible, 
Incompatible, Unsupported}
 import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => 
ProtoDataType, Expr, ScalarFunc}
 import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, 
DecimalInfo, ListInfo, MapInfo, StructInfo}
@@ -63,7 +63,6 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde 
with CometExprShim
         _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: 
DecimalType |
         _: DateType | _: BooleanType | _: NullType =>
       true
-    // `TimestampNTZType` is private in Spark 3.2.
     case dt if dt.typeName == "timestamp_ntz" => true
     case dt =>
       emitWarning(s"unsupported Spark data type: $dt")
@@ -1413,7 +1412,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
           }
 
         case UnaryExpression(child) if expr.prettyName == "promote_precision" 
=>
-          // `UnaryExpression` includes `PromotePrecision` for Spark 3.2 & 3.3
+          // `UnaryExpression` includes `PromotePrecision` for Spark 3.3
           // `PromotePrecision` is just a wrapper, don't need to serialize it.
           exprToProtoInternal(child, inputs)
 
@@ -1518,7 +1517,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
 
           optExprWithInfo(optExpr, expr, child)
 
-        case e: Unhex if !isSpark32 =>
+        case e: Unhex =>
           val unHex = unhexSerde(e)
 
           val childExpr = exprToProtoInternal(unHex._1, inputs)
@@ -1585,9 +1584,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
           val optExpr = scalarExprToProto("pow", leftExpr, rightExpr)
           optExprWithInfo(optExpr, expr, left, right)
 
-        // round function for Spark 3.2 does not allow negative round target 
scale. In addition,
-        // it has different result precision/scale for decimals. Supporting 
only 3.3 and above.
-        case r: Round if !isSpark32 =>
+        case r: Round =>
           // _scale s a constant, copied from Spark's RoundBase because it is 
a protected val
           val scaleV: Any = r.scale.eval(EmptyRow)
           val _scale: Int = scaleV.asInstanceOf[Int]
@@ -2066,7 +2063,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
             childExpr)
           optExprWithInfo(optExpr, expr, child)
 
-        case b @ BinaryExpression(_, _) if isBloomFilterMightContain(b) =>
+        case b @ BloomFilterMightContain(_, _) =>
           val bloomFilter = b.left
           val value = b.right
           val bloomFilterExpr = exprToProtoInternal(bloomFilter, inputs)
@@ -2244,7 +2241,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
     case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: 
FloatType |
         _: DoubleType | _: StringType | _: DateType | _: DecimalType | _: 
BooleanType =>
       true
-    // `TimestampNTZType` is private in Spark 3.2/3.3.
+    // `TimestampNTZType` is private in Spark 3.3.
     case dt if dt.typeName == "timestamp_ntz" => true
     case _ => false
   }
@@ -2322,12 +2319,9 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
         if (childOp.nonEmpty && globalLimitExec.limit >= 0) {
           val limitBuilder = OperatorOuterClass.Limit.newBuilder()
 
-          // Spark 3.2 doesn't support offset for GlobalLimit, but newer Spark 
versions
-          // support it. Before we upgrade to Spark 3.3, just set it zero.
           // TODO: Spark 3.3 might have negative limit (-1) for Offset usage.
           // When we upgrade to Spark 3.3., we need to address it here.
           limitBuilder.setLimit(globalLimitExec.limit)
-          limitBuilder.setOffset(0)
 
           Some(result.setLimit(limitBuilder).build())
         } else {
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
index d6c3c87a..82ebed95 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
@@ -44,6 +44,10 @@ case class CometBatchScanExec(wrapped: BatchScanExec, 
runtimeFilters: Seq[Expres
 
   wrapped.logicalLink.foreach(setLogicalLink)
 
+  def keyGroupedPartitioning: Option[Seq[Expression]] = 
wrapped.keyGroupedPartitioning
+
+  def inputPartitions: Seq[InputPartition] = wrapped.inputPartitions
+
   override lazy val inputRDD: RDD[InternalRow] = wrappedScan.inputRDD
 
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
@@ -144,11 +148,7 @@ case class CometBatchScanExec(wrapped: BatchScanExec, 
runtimeFilters: Seq[Expres
     }
   }
 
-  // Intentionally omitting the return type as it is different depending on 
Spark version
-  // Spark 3.2.x Seq[InputPartition]
-  // Spark 3.3.x Seq[Seq[InputPartition]]
-  // TODO: add back the return type after dropping Spark 3.2.0 support
-  @transient override lazy val partitions = wrappedScan.partitions
+  @transient override lazy val partitions: Seq[Seq[InputPartition]] = 
wrappedScan.partitions
 
   override def supportsColumnar: Boolean = true
 }
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
index 9a5b55d6..ca99e36b 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.comet.shims.ShimCometScanExec
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
+import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD
 import org.apache.spark.sql.execution.metric._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -44,7 +45,6 @@ import org.apache.spark.util.collection._
 
 import org.apache.comet.{CometConf, MetricsSupport}
 import org.apache.comet.parquet.{CometParquetFileFormat, 
CometParquetPartitionReaderFactory}
-import org.apache.comet.shims.ShimFileFormat
 
 /**
  * Comet physical scan node for DataSource V1. Most of the code here follow 
Spark's
@@ -150,7 +150,7 @@ case class CometScanExec(
 
   lazy val inputRDD: RDD[InternalRow] = {
     val options = relation.options +
-      (ShimFileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString)
+      (FileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString)
     val readFile: (PartitionedFile) => Iterator[InternalRow] =
       relation.fileFormat.buildReaderWithPartitionValues(
         sparkSession = relation.sparkSession,
@@ -402,7 +402,7 @@ case class CometScanExec(
         new ParquetOptions(CaseInsensitiveMap(relation.options), sqlConf),
         metrics)
 
-      newDataSourceRDD(
+      new DataSourceRDD(
         fsRelation.sparkSession.sparkContext,
         partitions.map(Seq(_)),
         partitionReaderFactory,
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
index 3f4d7bfd..aabe3c35 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
@@ -583,14 +583,14 @@ class CometShuffleWriteProcessor(
 }
 
 /**
- * Copied from Spark `PartitionIdPassthrough` as it is private in Spark 3.2.
+ * Copied from Spark `PartitionIdPassthrough` as it is private in Spark 3.3.
  */
 private[spark] class PartitionIdPassthrough(override val numPartitions: Int) 
extends Partitioner {
   override def getPartition(key: Any): Int = key.asInstanceOf[Int]
 }
 
 /**
- * Copied from Spark `ConstantPartitioner` as it doesn't exist in Spark 3.2.
+ * Copied from Spark `ConstantPartitioner` as it doesn't exist in Spark 3.3.
  */
 private[spark] class ConstantPartitioner extends Partitioner {
   override def numPartitions: Int = 1
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala
index 996526e5..f28ca2bb 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala
@@ -88,7 +88,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper {
     }
   }
 
-  // Copied from Spark 3.4+ to make it available in Spark 3.2+.
+  // Copied from Spark 3.4+ to make it available in Spark 3.3+.
   def multiTransformDown(expr: Expression)(
       rule: PartialFunction[Expression, Seq[Expression]]): Stream[Expression] 
= {
 
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala
index 8c6f0af1..d584e860 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.UnaryExecNode
  * satisfies distribution requirements.
  *
  * This is copied from Spark's `PartitioningPreservingUnaryExecNode` because 
it is only available
- * in Spark 3.4+. This is a workaround to make it available in Spark 3.2+.
+ * in Spark 3.4+. This is a workaround to make it available in Spark 3.3+.
  */
 trait PartitioningPreservingUnaryExecNode extends UnaryExecNode with 
AliasAwareOutputExpression {
   final override def outputPartitioning: Partitioning = {
diff --git 
a/spark/src/main/spark-3.2/org/apache/comet/shims/CometExprShim.scala 
b/spark/src/main/spark-3.2/org/apache/comet/shims/CometExprShim.scala
deleted file mode 100644
index 2c6f6ccf..00000000
--- a/spark/src/main/spark-3.2/org/apache/comet/shims/CometExprShim.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.comet.shims
-
-import org.apache.comet.expressions.CometEvalMode
-import org.apache.spark.sql.catalyst.expressions._
-
-/**
- * `CometExprShim` acts as a shim for for parsing expressions from different 
Spark versions.
- */
-trait CometExprShim {
-    /**
-     * Returns a tuple of expressions for the `unhex` function.
-     */
-    protected def unhexSerde(unhex: Unhex): (Expression, Expression) = {
-        (unhex.child, Literal(false))
-    }
-
-    protected def evalMode(c: Cast): CometEvalMode.Value = 
CometEvalMode.fromBoolean(c.ansiEnabled)
-}
-
diff --git 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala
index 9e7cc3ba..49e93111 100644
--- 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala
+++ 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala
@@ -19,24 +19,12 @@
 
 package org.apache.comet.shims
 
-import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder}
-import org.apache.spark.sql.connector.read.InputPartition
+import org.apache.spark.sql.catalyst.expressions.SortOrder
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 
 trait ShimCometBatchScanExec {
   def wrapped: BatchScanExec
 
-  // Only for Spark 3.3+
-  def keyGroupedPartitioning: Option[Seq[Expression]] = 
wrapped.getClass.getDeclaredMethods
-    .filter(_.getName == "keyGroupedPartitioning")
-    .flatMap(_.invoke(wrapped).asInstanceOf[Option[Seq[Expression]]])
-    .headOption
-
-  // Only for Spark 3.3+
-  def inputPartitions: Seq[InputPartition] = 
wrapped.getClass.getDeclaredMethods
-    .filter(_.getName == "inputPartitions")
-    .flatMap(_.invoke(wrapped).asInstanceOf[Seq[InputPartition]])
-
   // Only for Spark 3.4+
   def ordering: Option[Seq[SortOrder]] = wrapped.getClass.getDeclaredMethods
     .filter(_.getName == "ordering")
diff --git 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
index eef0ee9d..442bb9e5 100644
--- 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
+++ 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
@@ -27,9 +27,9 @@ trait ShimCometBroadcastHashJoinExec {
   /**
    * Returns the expressions that are used for hash partitioning including 
`HashPartitioning` and
    * `CoalescedHashPartitioning`. They shares same trait 
`HashPartitioningLike` since Spark 3.4,
-   * but Spark 3.2/3.3 doesn't have `HashPartitioningLike` and 
`CoalescedHashPartitioning`.
+   * but Spark 3.3 doesn't have `HashPartitioningLike` and 
`CoalescedHashPartitioning`.
    *
-   * TODO: remove after dropping Spark 3.2 and 3.3 support.
+   * TODO: remove after dropping Spark 3.3 support.
    */
   def getHashPartitioningLikeExpressions(partitioning: Partitioning): 
Seq[Expression] = {
     partitioning.getClass.getDeclaredMethods
diff --git 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
index 350aeb9f..965b6851 100644
--- 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
+++ 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
@@ -26,7 +26,7 @@ import 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.types.{StructField, StructType}
 
 trait ShimCometShuffleExchangeExec {
-  // TODO: remove after dropping Spark 3.2 and 3.3 support
+  // TODO: remove after dropping Spark 3.3 support
   def apply(s: ShuffleExchangeExec, shuffleType: ShuffleType): 
CometShuffleExchangeExec = {
     val advisoryPartitionSize = s.getClass.getDeclaredMethods
       .filter(_.getName == "advisoryPartitionSize")
diff --git 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
index 37748533..c8aeacf2 100644
--- 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
+++ 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
@@ -19,22 +19,11 @@
 
 package org.apache.comet.shims
 
-import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
 import org.apache.spark.sql.execution.{LimitExec, QueryExecution, SparkPlan}
-import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
 
 trait ShimCometSparkSessionExtensions {
   /**
-   * TODO: delete after dropping Spark 3.2.0 support and directly call 
scan.pushedAggregate
-   */
-  def getPushedAggregate(scan: ParquetScan): Option[Aggregation] = 
scan.getClass.getDeclaredFields
-    .filter(_.getName == "pushedAggregate")
-    .map { a => a.setAccessible(true); a }
-    .flatMap(_.get(scan).asInstanceOf[Option[Aggregation]])
-    .headOption
-
-  /**
-   * TODO: delete after dropping Spark 3.2 and 3.3 support
+   * TODO: delete after dropping Spark 3.3 support
    */
   def getOffset(limit: LimitExec): Int = getOffsetOpt(limit).getOrElse(0)
 
diff --git 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
index 2e1c681c..983e099f 100644
--- 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
+++ 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
@@ -24,7 +24,7 @@ import 
org.apache.spark.sql.execution.TakeOrderedAndProjectExec
 trait ShimCometTakeOrderedAndProjectExec {
 
   /**
-   * TODO: delete after dropping Spark 3.2 and 3.3 support
+   * TODO: delete after dropping Spark 3.3 support
    */
   protected def getOffset(plan: TakeOrderedAndProjectExec): Option[Int] = {
     plan.getClass.getDeclaredFields
diff --git 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala
index b92d3fc6..1b0996d9 100644
--- a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala
+++ b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala
@@ -19,7 +19,7 @@
 
 package org.apache.comet.shims
 
-import org.apache.spark.sql.catalyst.expressions.{BinaryArithmetic, 
BinaryExpression}
+import org.apache.spark.sql.catalyst.expressions.BinaryArithmetic
 import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
 
 trait ShimQueryPlanSerde {
@@ -45,7 +45,7 @@ trait ShimQueryPlanSerde {
     }
   }
 
-  // TODO: delete after drop Spark 3.2/3.3 support
+  // TODO: delete after drop Spark 3.3 support
   // This method is used to check if the aggregate function is in legacy mode.
   // EvalMode is an enum object in Spark 3.4.
   def isLegacyMode(aggregate: DeclarativeAggregate): Boolean = {
@@ -62,9 +62,4 @@ trait ShimQueryPlanSerde {
       "legacy".equalsIgnoreCase(evalMode.head.toString)
     }
   }
-
-  // TODO: delete after drop Spark 3.2 support
-  def isBloomFilterMightContain(binary: BinaryExpression): Boolean = {
-    binary.getClass.getName == 
"org.apache.spark.sql.catalyst.expressions.BloomFilterMightContain"
-  }
 }
diff --git a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimSQLConf.scala 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimSQLConf.scala
index c3d0c56e..579db51c 100644
--- a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimSQLConf.scala
+++ b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimSQLConf.scala
@@ -28,7 +28,7 @@ trait ShimSQLConf {
    * Spark 3.4 renamed parquetFilterPushDownStringStartWith to
    * parquetFilterPushDownStringPredicate
    *
-   * TODO: delete after dropping Spark 3.2 & 3.3 support and simply use
+   * TODO: delete after dropping Spark 3.3 support and simply use
    * parquetFilterPushDownStringPredicate
    */
   protected def getPushDownStringPredicate(sqlConf: SQLConf): Boolean =
diff --git 
a/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
 
b/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
index aede4795..afcf653b 100644
--- 
a/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
+++ 
b/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
@@ -25,7 +25,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.broadcast.Broadcast
 
 trait ShimCometBroadcastExchangeExec {
-  // TODO: remove after dropping Spark 3.2 and 3.3 support
+  // TODO: remove after dropping Spark 3.3 support
   protected def doBroadcast[T: ClassTag](sparkContext: SparkContext, value: 
T): Broadcast[Any] = {
     // Spark 3.4 has new API `broadcastInternal` to broadcast the relation 
without caching the
     // unserialized object.
diff --git 
a/spark/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
 
b/spark/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
index 02b97f9f..65fb59a3 100644
--- 
a/spark/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
+++ 
b/spark/src/main/spark-3.x/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
@@ -21,32 +21,27 @@ package org.apache.spark.sql.comet.shims
 
 import org.apache.comet.shims.ShimFileFormat
 
-import scala.language.implicitConversions
-
 import org.apache.hadoop.fs.{FileStatus, Path}
 
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.SparkException
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
 import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil}
 import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, 
HadoopFsRelation, PartitionDirectory, PartitionedFile}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
-import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD
-import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.{LongType, StructField, StructType}
 
 trait ShimCometScanExec {
   def wrapped: FileSourceScanExec
 
-  // TODO: remove after dropping Spark 3.2 support and directly call 
wrapped.metadataColumns
+  // TODO: remove after dropping Spark 3.3 support
   lazy val metadataColumns: Seq[AttributeReference] = 
wrapped.getClass.getDeclaredMethods
     .filter(_.getName == "metadataColumns")
     .map { a => a.setAccessible(true); a }
     .flatMap(_.invoke(wrapped).asInstanceOf[Seq[AttributeReference]])
 
-  // TODO: remove after dropping Spark 3.2 and 3.3 support and directly call
+  // TODO: remove after dropping Spark 3.3 support and directly call
   //       wrapped.fileConstantMetadataColumns
   lazy val fileConstantMetadataColumns: Seq[AttributeReference] =
     wrapped.getClass.getDeclaredMethods
@@ -54,18 +49,7 @@ trait ShimCometScanExec {
       .map { a => a.setAccessible(true); a }
       .flatMap(_.invoke(wrapped).asInstanceOf[Seq[AttributeReference]])
 
-  // TODO: remove after dropping Spark 3.2 support and directly call new 
DataSourceRDD
-  protected def newDataSourceRDD(
-      sc: SparkContext,
-      inputPartitions: Seq[Seq[InputPartition]],
-      partitionReaderFactory: PartitionReaderFactory,
-      columnarReads: Boolean,
-      customMetrics: Map[String, SQLMetric]): DataSourceRDD = {
-    implicit def flattenSeq(p: Seq[Seq[InputPartition]]): Seq[InputPartition] 
= p.flatten
-    new DataSourceRDD(sc, inputPartitions, partitionReaderFactory, 
columnarReads, customMetrics)
-  }
-
-  // TODO: remove after dropping Spark 3.2 support and directly call new 
FileScanRDD
+  // TODO: remove after dropping Spark 3.3 and 3.4 support and directly call 
new FileScanRDD
   protected def newFileScanRDD(
       fsRelation: HadoopFsRelation,
       readFunction: PartitionedFile => Iterator[InternalRow],
@@ -74,10 +58,9 @@ trait ShimCometScanExec {
       options: ParquetOptions): FileScanRDD =
     classOf[FileScanRDD].getDeclaredConstructors
       // Prevent to pick up incorrect constructors from any custom Spark forks.
-      .filter(c => List(3, 5, 6).contains(c.getParameterCount()) )
+      .filter(c => List(5, 6).contains(c.getParameterCount()))
       .map { c =>
         c.getParameterCount match {
-          case 3 => c.newInstance(fsRelation.sparkSession, readFunction, 
filePartitions)
           case 5 =>
             c.newInstance(fsRelation.sparkSession, readFunction, 
filePartitions, readSchema, metadataColumns)
           case 6 =>
@@ -93,19 +76,15 @@ trait ShimCometScanExec {
       .last
       .asInstanceOf[FileScanRDD]
 
-  // TODO: remove after dropping Spark 3.2 and 3.3 support and directly call
+  // TODO: remove after dropping Spark 3.3 support and directly call
   //       QueryExecutionErrors.SparkException
   protected def invalidBucketFile(path: String, sparkVersion: String): 
Throwable = {
-    if (sparkVersion >= "3.3") {
-      val messageParameters = if (sparkVersion >= "3.4") Map("path" -> path) 
else Array(path)
-      classOf[SparkException].getDeclaredConstructors
-        .filter(_.getParameterCount == 3)
-        .map(_.newInstance("INVALID_BUCKET_FILE", messageParameters, null))
-        .last
-        .asInstanceOf[SparkException]
-    } else { // Spark 3.2
-      new IllegalStateException(s"Invalid bucket file ${path}")
-    }
+    val messageParameters = if (sparkVersion >= "3.4") Map("path" -> path) 
else Array(path)
+    classOf[SparkException].getDeclaredConstructors
+      .filter(_.getParameterCount == 3)
+      .map(_.newInstance("INVALID_BUCKET_FILE", messageParameters, null))
+      .last
+      .asInstanceOf[SparkException]
   }
 
   // Copied from Spark 3.4 RowIndexUtil due to PARQUET-2161 (tracked in 
SPARK-39634)
diff --git 
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala 
b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala
index 167b539f..d41502f6 100644
--- 
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala
+++ 
b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala
@@ -19,16 +19,11 @@
 
 package org.apache.comet.shims
 
-import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder}
-import org.apache.spark.sql.connector.read.InputPartition
+import org.apache.spark.sql.catalyst.expressions.SortOrder
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 
 trait ShimCometBatchScanExec {
   def wrapped: BatchScanExec
 
-  def keyGroupedPartitioning: Option[Seq[Expression]] = 
wrapped.keyGroupedPartitioning
-
-  def inputPartitions: Seq[InputPartition] = wrapped.inputPartitions
-
   def ordering: Option[Seq[SortOrder]] = wrapped.ordering
 }
diff --git 
a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
 
b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
index 543116c1..48b9c808 100644
--- 
a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
+++ 
b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
@@ -24,15 +24,11 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
-import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil}
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.SparkContext
 
 trait ShimCometScanExec {
   def wrapped: FileSourceScanExec
@@ -40,14 +36,6 @@ trait ShimCometScanExec {
   lazy val fileConstantMetadataColumns: Seq[AttributeReference] =
     wrapped.fileConstantMetadataColumns
 
-  protected def newDataSourceRDD(
-      sc: SparkContext,
-      inputPartitions: Seq[Seq[InputPartition]],
-      partitionReaderFactory: PartitionReaderFactory,
-      columnarReads: Boolean,
-      customMetrics: Map[String, SQLMetric]): DataSourceRDD =
-    new DataSourceRDD(sc, inputPartitions, partitionReaderFactory, 
columnarReads, customMetrics)
-
   protected def newFileScanRDD(
       fsRelation: HadoopFsRelation,
       readFunction: PartitionedFile => Iterator[InternalRow],
diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
index 31d718d4..885d6385 100644
--- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
@@ -571,9 +571,6 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("cast StringType to DateType") {
-    // error message for invalid dates in Spark 3.2 not supported by Comet see 
below issue.
-    // https://github.com/apache/datafusion-comet/issues/440
-    assume(CometSparkSessionExtensions.isSpark33Plus)
     val validDates = Seq(
       "262142-01-01",
       "262142-01-01 ",
@@ -956,7 +953,7 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
 
   private def castTest(input: DataFrame, toType: DataType): Unit = {
 
-    // we now support the TryCast expression in Spark 3.2 and 3.3
+    // we now support the TryCast expression in Spark 3.3
     withTempPath { dir =>
       val data = roundtripParquet(input, dir).coalesce(1)
       data.createOrReplaceTempView("t")
@@ -1002,7 +999,7 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
             } else if (CometSparkSessionExtensions.isSpark34Plus) {
               // for Spark 3.4 we expect to reproduce the error message exactly
               assert(cometMessage == sparkMessage)
-            } else if (CometSparkSessionExtensions.isSpark33Plus) {
+            } else {
               // for Spark 3.3 we just need to strip the prefix from the Comet 
message
               // before comparing
               val cometMessageModified = cometMessage
@@ -1015,19 +1012,6 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
               } else {
                 assert(cometMessageModified == sparkMessage)
               }
-            } else {
-              // for Spark 3.2 we just make sure we are seeing a similar type 
of error
-              if (sparkMessage.contains("causes overflow")) {
-                assert(cometMessage.contains("due to an overflow"))
-              } else if (sparkMessage.contains("cannot be represented as")) {
-                assert(cometMessage.contains("cannot be represented as"))
-              } else {
-                // assume that this is an invalid input message in the form:
-                // `invalid input syntax for type numeric: 
-9223372036854775809`
-                // we just check that the Comet message contains the same 
literal value
-                val sparkInvalidValue = 
sparkMessage.substring(sparkMessage.indexOf(':') + 2)
-                assert(cometMessage.contains(sparkInvalidValue))
-              }
             }
         }
 
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 1c06e1da..8dbfb71b 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
 import org.apache.spark.sql.types.{Decimal, DecimalType}
 
-import org.apache.comet.CometSparkSessionExtensions.{isSpark32, isSpark33Plus, 
isSpark34Plus}
+import org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus, 
isSpark34Plus}
 
 class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
   import testImplicits._
@@ -51,7 +51,7 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("decimals divide by zero") {
-    // TODO: enable Spark 3.2 & 3.3 tests after supporting decimal divide 
operation
+    // TODO: enable Spark 3.3 tests after supporting decimal divide operation
     assume(isSpark34Plus)
 
     Seq(true, false).foreach { dictionary =>
@@ -293,7 +293,7 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("cast timestamp and timestamp_ntz to string") {
-    // TODO: make the test pass for Spark 3.2 & 3.3
+    // TODO: make the test pass for Spark 3.3
     assume(isSpark34Plus)
 
     withSQLConf(
@@ -318,7 +318,7 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("cast timestamp and timestamp_ntz to long, date") {
-    // TODO: make the test pass for Spark 3.2 & 3.3
+    // TODO: make the test pass for Spark 3.3
     assume(isSpark34Plus)
 
     withSQLConf(
@@ -411,7 +411,6 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("date_trunc with timestamp_ntz") {
-    assume(!isSpark32, "timestamp functions for timestamp_ntz have incorrect 
behavior in 3.2")
     withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
       Seq(true, false).foreach { dictionaryEnabled =>
         withTempDir { dir =>
@@ -619,8 +618,6 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("contains") {
-    assume(!isSpark32)
-
     val table = "names"
     withTable(table) {
       sql(s"create table $table(id int, name varchar(20)) using parquet")
@@ -637,8 +634,6 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("startswith") {
-    assume(!isSpark32)
-
     val table = "names"
     withTable(table) {
       sql(s"create table $table(id int, name varchar(20)) using parquet")
@@ -655,8 +650,6 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("endswith") {
-    assume(!isSpark32)
-
     val table = "names"
     withTable(table) {
       sql(s"create table $table(id int, name varchar(20)) using parquet")
@@ -693,7 +686,7 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("decimals arithmetic and comparison") {
-    // TODO: enable Spark 3.2 & 3.3 tests after supporting decimal reminder 
operation
+    // TODO: enable Spark 3.3 tests after supporting decimal reminder operation
     assume(isSpark34Plus)
 
     def makeDecimalRDD(num: Int, decimal: DecimalType, useDictionary: 
Boolean): DataFrame = {
@@ -958,10 +951,6 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("round") {
-    assume(
-      !isSpark32,
-      "round function for Spark 3.2 does not allow negative target scale and 
has different result precision/scale for decimals")
-
     Seq(true, false).foreach { dictionaryEnabled =>
       withTempDir { dir =>
         val path = new Path(dir.toURI.toString, "test.parquet")
@@ -1177,11 +1166,6 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("unhex") {
-    // When running against Spark 3.2, we include a bug fix for 
https://issues.apache.org/jira/browse/SPARK-40924 that
-    // was added in Spark 3.3, so although Comet's behavior is more correct 
when running against Spark 3.2, it is not
-    // the same (and this only applies to edge cases with hex inputs with 
lengths that are not divisible by 2)
-    assume(!isSpark32, "unhex function has incorrect behavior in 3.2")
-
     val table = "unhex_table"
     withTable(table) {
       sql(s"create table $table(col string) using parquet")
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index ca7bc7df..93be1111 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -800,7 +800,7 @@ class CometAggregateSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("final decimal avg") {
-    // TODO: enable decimal average for Spark 3.2 & 3.3
+    // TODO: enable decimal average for Spark 3.3
     assume(isSpark34Plus)
 
     withSQLConf(
diff --git 
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala 
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 0b37f5cc..bc18d8f1 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -184,7 +184,6 @@ abstract class ParquetReadSuite extends CometTestBase {
       withTempDir { dir =>
         val path = new Path(dir.toURI.toString, "part-r-0.parquet")
         val expected = makeRawTimeParquetFile(path, dictionaryEnabled = 
dictionaryEnabled, 10000)
-        val useLocalDateTime = spark.version >= "3.3"
         readParquetFile(path.toString) { df =>
           checkAnswer(
             df.select($"_0", $"_1", $"_2", $"_3", $"_4", $"_5"),
@@ -192,17 +191,11 @@ abstract class ParquetReadSuite extends CometTestBase {
               case None =>
                 Row(null, null, null, null, null, null)
               case Some(i) =>
-                // use `LocalDateTime` for `TimestampNTZType` with Spark 3.3 
and above. At the moment,
-                // Spark reads Parquet timestamp values into `Timestamp` (with 
local timezone)
-                // regardless of whether `isAdjustedToUTC` is true or false. 
See SPARK-36182.
-                // TODO: make `LocalDateTime` default after dropping Spark 
3.2.0 support
                 val ts = new java.sql.Timestamp(i)
-                val ldt = if (useLocalDateTime) {
-                  ts.toLocalDateTime
-                    .atZone(ZoneId.systemDefault())
-                    .withZoneSameInstant(ZoneOffset.UTC)
-                    .toLocalDateTime
-                } else ts
+                val ldt = ts.toLocalDateTime
+                  .atZone(ZoneId.systemDefault())
+                  .withZoneSameInstant(ZoneOffset.UTC)
+                  .toLocalDateTime
                 Row(ts, ts, ts, ldt, ts, ldt)
             })
         }
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
index ec87f19e..2638a0e6 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
@@ -29,7 +29,7 @@ import 
org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.util.{fileToString, resourceToString, 
stringToFile}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession}
+import org.apache.spark.sql.test.TestSparkSession
 
 import org.apache.comet.CometConf
 import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
@@ -50,7 +50,7 @@ import org.apache.comet.shims.ShimCometTPCHQuerySuite
  *     ./mvnw -Dsuites=org.apache.spark.sql.CometTPCHQuerySuite test
  * }}}
  */
-class CometTPCHQuerySuite extends QueryTest with CometTPCBase with 
ShimCometTPCHQuerySuite {
+class CometTPCHQuerySuite extends QueryTest with TPCBase with 
ShimCometTPCHQuerySuite {
 
   private val tpchDataPath = sys.env.get("SPARK_TPCH_DATA")
 
@@ -273,40 +273,7 @@ class CometTPCHQuerySuite extends QueryTest with 
CometTPCBase with ShimCometTPCH
     ignore("skipped because env `SPARK_TPCH_DATA` is not set") {}
   }
 
-  // TODO: remove once Spark 3.2 & 3.3 is no longer supported
+  // TODO: remove once Spark 3.3 is no longer supported
   private def shouldRegenerateGoldenFiles: Boolean =
     System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1"
 }
-
-/**
- * `TPCBase` doesn't exist in Spark 3.2. TODO: remove once Spark 3.2 is no 
longer supported
- */
-trait CometTPCBase extends SharedSparkSession {
-  protected def injectStats: Boolean = false
-
-  override protected def sparkConf: SparkConf = {
-    if (injectStats) {
-      super.sparkConf
-        .set(SQLConf.MAX_TO_STRING_FIELDS, Int.MaxValue)
-        .set(SQLConf.CBO_ENABLED, true)
-        .set(SQLConf.PLAN_STATS_ENABLED, true)
-        .set(SQLConf.JOIN_REORDER_ENABLED, true)
-    } else {
-      super.sparkConf.set(SQLConf.MAX_TO_STRING_FIELDS, Int.MaxValue)
-    }
-  }
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    createTables()
-  }
-
-  override def afterAll(): Unit = {
-    dropTables()
-    super.afterAll()
-  }
-
-  protected def createTables(): Unit
-
-  protected def dropTables(): Unit
-}
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
index fc454944..b47de19b 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
@@ -24,8 +24,8 @@ import java.io.File
 import scala.collection.JavaConverters._
 import scala.util.Random
 
+import org.apache.spark.TestUtils
 import org.apache.spark.benchmark.Benchmark
-import org.apache.spark.comet.shims.ShimTestUtils
 import 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.ColumnVector
@@ -124,7 +124,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
             (col: ColumnVector, i: Int) => longSum += 
col.getUTF8String(i).toLongExact
         }
 
-        val files = ShimTestUtils.listDirectory(new File(dir, "parquetV1"))
+        val files = TestUtils.listDirectory(new File(dir, "parquetV1"))
 
         sqlBenchmark.addCase("ParquetReader Spark") { _ =>
           files.map(_.asInstanceOf[String]).foreach { p =>
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
index 691d2cd6..5e9864f0 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
@@ -292,7 +292,7 @@ trait CometPlanStabilitySuite extends 
DisableAdaptiveExecutionSuite with TPCDSBa
     new TestSparkSession(new SparkContext("local[1]", 
this.getClass.getCanonicalName, conf))
   }
 
-  // TODO: remove once Spark 3.2 & 3.3 is no longer supported
+  // TODO: remove once Spark 3.3 is no longer supported
   private val shouldRegenerateGoldenFiles: Boolean =
     System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1"
 }
diff --git 
a/spark/src/test/spark-3.x/org/apache/spark/comet/shims/ShimTestUtils.scala 
b/spark/src/test/spark-3.x/org/apache/spark/comet/shims/ShimTestUtils.scala
deleted file mode 100644
index fcb543f9..00000000
--- a/spark/src/test/spark-3.x/org/apache/spark/comet/shims/ShimTestUtils.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.comet.shims
-
-import java.io.File
-import scala.collection.mutable.ArrayBuffer
-
-object ShimTestUtils {
-
-  /**
-   * Spark 3.3.0 moved {{{SpecificParquetRecordReaderBase.listDirectory}}} to
-   * {{{org.apache.spark.TestUtils.listDirectory}}}. Copying it here to bridge 
the difference
-   * between Spark 3.2.0 and 3.3.0 TODO: remove after dropping Spark 3.2.0 
support and use
-   * {{{org.apache.spark.TestUtils.listDirectory}}}
-   */
-  def listDirectory(path: File): Array[String] = {
-    val result = ArrayBuffer.empty[String]
-    if (path.isDirectory) {
-      path.listFiles.foreach(f => result.appendAll(listDirectory(f)))
-    } else {
-      val c = path.getName.charAt(0)
-      if (c != '.' && c != '_') result.append(path.getAbsolutePath)
-    }
-    result.toArray
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to