This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new b8954029c chore: [FOLLOWUP] Drop support for Spark 3.3 (EOL) (#1534)
b8954029c is described below

commit b8954029c9d205d10206df8772c8677752e92155
Author: KAZUYUKI TANIMURA <[email protected]>
AuthorDate: Mon Mar 17 11:13:02 2025 -0700

    chore: [FOLLOWUP] Drop support for Spark 3.3 (EOL) (#1534)
---
 common/pom.xml                                     |  1 -
 .../apache/comet/parquet/ConstantColumnReader.java |  7 +-
 .../spark/sql/comet/CastOverflowException.scala    | 14 +++-
 .../org/apache/comet/shims/ShimFileFormat.scala}   | 14 ++--
 .../comet/shims/ShimCastOverflowException.scala    | 37 -----------
 .../org/apache/comet/shims/ShimFileFormat.scala    | 20 +++---
 .../comet/shims/ShimCastOverflowException.scala    | 37 -----------
 .../org/apache/comet/shims/ShimFileFormat.scala    | 50 --------------
 .../comet/shims/ShimResolveDefaultColumns.scala    | 38 -----------
 .../comet/shims/ShimCastOverflowException.scala    | 37 -----------
 dev/release/build-release-comet.sh                 |  2 -
 docs/source/user-guide/installation.md             |  3 -
 pom.xml                                            |  9 ---
 spark/pom.xml                                      |  4 --
 .../apache/comet/CometSparkSessionExtensions.scala |  9 ++-
 .../comet/parquet/CometParquetFileFormat.scala     |  2 +-
 .../CometParquetPartitionReaderFactory.scala       |  2 +-
 .../org/apache/comet/serde/QueryPlanSerde.scala    |  4 +-
 .../scala/org/apache/comet/serde/aggregates.scala  |  6 +-
 .../src/main/scala/org/apache/spark/Plugins.scala  |  4 +-
 .../spark/sql/comet/CometBatchScanExec.scala       |  5 +-
 .../sql/comet/CometBroadcastExchangeExec.scala     | 22 ++-----
 .../org/apache/spark/sql/comet/CometScanExec.scala |  3 +-
 .../sql/comet/CometTakeOrderedAndProjectExec.scala |  7 +-
 .../shuffle/CometShuffleExchangeExec.scala         | 15 -----
 .../org/apache/spark/sql/comet/operators.scala     | 15 +++--
 .../comet/plans/AliasAwareOutputExpression.scala   | 64 ++----------------
 .../PartitioningPreservingUnaryExecNode.scala      | 76 ----------------------
 .../org/apache/comet/shims/ShimSQLConf.scala}      |  7 +-
 .../spark/sql/comet/shims/ShimCometScanExec.scala  | 55 +++-------------
 .../org/apache/comet/shims/ShimSQLConf.scala       |  9 ---
 .../spark/sql/comet/shims/ShimCometScanExec.scala  |  4 --
 .../comet/shims/ShimCometBatchScanExec.scala       | 33 ----------
 .../shims/ShimCometBroadcastHashJoinExec.scala     | 39 -----------
 .../comet/shims/ShimCometShuffleExchangeExec.scala |  2 +-
 .../shims/ShimCometSparkSessionExtensions.scala    | 14 +---
 .../apache/comet/shims/ShimQueryPlanSerde.scala    | 18 -----
 .../shims/ShimCometBroadcastExchangeExec.scala     | 51 ---------------
 .../spark/comet/shims/ShimCometDriverPlugin.scala  |  7 --
 .../comet/shims/ShimCometBatchScanExec.scala       | 29 ---------
 .../shims/ShimCometBroadcastHashJoinExec.scala     | 31 ---------
 .../shims/ShimCometSparkSessionExtensions.scala    |  6 +-
 .../apache/comet/shims/ShimQueryPlanSerde.scala    |  5 +-
 .../org/apache/comet/shims/ShimSQLConf.scala       |  4 --
 .../shims/ShimCometBroadcastExchangeExec.scala     | 30 ---------
 .../spark/comet/shims/ShimCometDriverPlugin.scala  |  4 --
 .../spark/sql/comet/shims/ShimCometScanExec.scala  |  4 --
 .../org/apache/comet/shims/ShimSQLConf.scala       | 46 -------------
 .../scala/org/apache/comet/CometCastSuite.scala    |  1 -
 .../org/apache/comet/CometExpressionSuite.scala    | 46 ++++++-------
 .../apache/comet/exec/CometExec3_4PlusSuite.scala  | 38 +++++++----
 .../org/apache/comet/exec/CometExecSuite.scala     |  3 +-
 .../org/apache/spark/sql/CometTPCHQuerySuite.scala |  8 +--
 .../spark/sql/comet/CometPlanStabilitySuite.scala  |  6 +-
 .../comet/shims/ShimCometTPCHQuerySuite.scala      |  0
 55 files changed, 137 insertions(+), 870 deletions(-)

diff --git a/common/pom.xml b/common/pom.xml
index 52939dd7d..9b0927b40 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -181,7 +181,6 @@ under the License.
               <sources>
                 <source>src/main/${shims.majorVerSrc}</source>
                 <source>src/main/${shims.minorVerSrc}</source>
-                <source>src/main/${shims.pre35Src}</source>
               </sources>
             </configuration>
           </execution>
diff --git 
a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java 
b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java
index 5fd348eeb..e010f8ab7 100644
--- a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java
@@ -23,11 +23,10 @@ import java.math.BigInteger;
 
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns;
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.types.UTF8String;
 
-import org.apache.comet.shims.ShimResolveDefaultColumns;
-
 /**
  * A column reader that always return constant vectors. Used for reading 
partition columns, for
  * instance.
@@ -41,7 +40,9 @@ public class ConstantColumnReader extends 
MetadataColumnReader {
 
   public ConstantColumnReader(StructField field, int batchSize, boolean 
useDecimal128) {
     this(field.dataType(), TypeUtil.convertToParquet(field), batchSize, 
useDecimal128);
-    this.value = ShimResolveDefaultColumns.getExistenceDefaultValue(field);
+    this.value =
+        ResolveDefaultColumns.getExistenceDefaultValues(new StructType(new 
StructField[] {field}))[
+            0];
     init(value);
   }
 
diff --git 
a/common/src/main/scala/org/apache/spark/sql/comet/CastOverflowException.scala 
b/common/src/main/scala/org/apache/spark/sql/comet/CastOverflowException.scala
index a5d674de2..ccd525939 100644
--- 
a/common/src/main/scala/org/apache/spark/sql/comet/CastOverflowException.scala
+++ 
b/common/src/main/scala/org/apache/spark/sql/comet/CastOverflowException.scala
@@ -19,7 +19,17 @@
 
 package org.apache.spark.sql.comet
 
-import org.apache.spark.sql.comet.shims.ShimCastOverflowException
+import org.apache.spark.SparkArithmeticException
+import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLConf
+import org.apache.spark.sql.internal.SQLConf
 
 class CastOverflowException(t: String, from: String, to: String)
-    extends ShimCastOverflowException(t, from, to) {}
+    extends SparkArithmeticException(
+      "CAST_OVERFLOW",
+      Map(
+        "value" -> t,
+        "sourceType" -> s""""$from"""",
+        "targetType" -> s""""$to"""",
+        "ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key)),
+      Array.empty,
+      "") {}
diff --git 
a/common/src/main/spark-4.0/org/apache/comet/shims/ShimResolveDefaultColumns.scala
 b/common/src/main/spark-3.4/org/apache/comet/shims/ShimFileFormat.scala
similarity index 61%
rename from 
common/src/main/spark-4.0/org/apache/comet/shims/ShimResolveDefaultColumns.scala
rename to common/src/main/spark-3.4/org/apache/comet/shims/ShimFileFormat.scala
index 60e21765b..7b4911e81 100644
--- 
a/common/src/main/spark-4.0/org/apache/comet/shims/ShimResolveDefaultColumns.scala
+++ b/common/src/main/spark-3.4/org/apache/comet/shims/ShimFileFormat.scala
@@ -19,11 +19,15 @@
 
 package org.apache.comet.shims
 
+import org.apache.spark.sql.execution.datasources.{FileFormat, RowIndexUtil}
+import org.apache.spark.sql.types.StructType
 
-import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
-import org.apache.spark.sql.types.{StructField, StructType}
+object ShimFileFormat {
 
-object ShimResolveDefaultColumns {
-  def getExistenceDefaultValue(field: StructField): Any =
-    
ResolveDefaultColumns.getExistenceDefaultValues(StructType(Seq(field))).head
+  // A name for a temporary column that holds row indexes computed by the file 
format reader
+  // until they can be placed in the _metadata struct.
+  val ROW_INDEX_TEMPORARY_COLUMN_NAME: String = 
FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+
+  def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int =
+    RowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema)
 }
diff --git 
a/common/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
 
b/common/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
deleted file mode 100644
index f641b192b..000000000
--- 
a/common/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.spark.SparkArithmeticException
-import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLConf
-import org.apache.spark.sql.internal.SQLConf
-
-// TODO: Only the Spark 3.3 version of this class is different from the others.
-//       Remove this class after dropping Spark 3.3 support.
-class ShimCastOverflowException(t: String, from: String, to: String)
-  extends SparkArithmeticException(
-    "CAST_OVERFLOW",
-    Map(
-      "value" -> t,
-      "sourceType" -> s""""$from"""",
-      "targetType" -> s""""$to"""",
-      "ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key)),
-    Array.empty,
-    "") {}
diff --git 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
 b/common/src/main/spark-3.5/org/apache/comet/shims/ShimFileFormat.scala
similarity index 58%
rename from 
spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
rename to common/src/main/spark-3.5/org/apache/comet/shims/ShimFileFormat.scala
index 983e099f8..1702db135 100644
--- 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
+++ b/common/src/main/spark-3.5/org/apache/comet/shims/ShimFileFormat.scala
@@ -19,17 +19,15 @@
 
 package org.apache.comet.shims
 
-import org.apache.spark.sql.execution.TakeOrderedAndProjectExec
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.ParquetRowIndexUtil
+import org.apache.spark.sql.types.StructType
 
-trait ShimCometTakeOrderedAndProjectExec {
+object ShimFileFormat {
+  // A name for a temporary column that holds row indexes computed by the file 
format reader
+  // until they can be placed in the _metadata struct.
+  val ROW_INDEX_TEMPORARY_COLUMN_NAME = 
ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
 
-  /**
-   * TODO: delete after dropping Spark 3.3 support
-   */
-  protected def getOffset(plan: TakeOrderedAndProjectExec): Option[Int] = {
-    plan.getClass.getDeclaredFields
-      .filter(_.getName == "offset")
-      .map { a => a.setAccessible(true); a.get(plan).asInstanceOf[Int] }
-      .headOption
-  }
+  def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int =
+    ParquetRowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema)
 }
diff --git 
a/common/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
 
b/common/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
deleted file mode 100644
index f641b192b..000000000
--- 
a/common/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.spark.SparkArithmeticException
-import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLConf
-import org.apache.spark.sql.internal.SQLConf
-
-// TODO: Only the Spark 3.3 version of this class is different from the others.
-//       Remove this class after dropping Spark 3.3 support.
-class ShimCastOverflowException(t: String, from: String, to: String)
-  extends SparkArithmeticException(
-    "CAST_OVERFLOW",
-    Map(
-      "value" -> t,
-      "sourceType" -> s""""$from"""",
-      "targetType" -> s""""$to"""",
-      "ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key)),
-    Array.empty,
-    "") {}
diff --git 
a/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala 
b/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala
deleted file mode 100644
index c34c947b5..000000000
--- a/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.types.{LongType, StructField, StructType}
-
-object ShimFileFormat {
-
-  // TODO: remove after dropping Spark 3.3 support and directly use 
FileFormat.ROW_INDEX
-  val ROW_INDEX = "row_index"
-
-  // A name for a temporary column that holds row indexes computed by the file 
format reader
-  // until they can be placed in the _metadata struct.
-  // TODO: remove after dropping Spark 3.3 support and directly use
-  //       FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
-  val ROW_INDEX_TEMPORARY_COLUMN_NAME: String = s"_tmp_metadata_$ROW_INDEX"
-
-  // TODO: remove after dropping Spark 3.3 support and directly use
-  //       RowIndexUtil.findRowIndexColumnIndexInSchema
-  def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int = {
-    sparkSchema.fields.zipWithIndex.find { case (field: StructField, _: Int) =>
-      field.name == ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
-    } match {
-      case Some((field: StructField, idx: Int)) =>
-        if (field.dataType != LongType) {
-          throw new RuntimeException(
-            s"${ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} must be of 
LongType")
-        }
-        idx
-      case _ => -1
-    }
-  }
-}
diff --git 
a/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala
 
b/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala
deleted file mode 100644
index 4f7d49831..000000000
--- 
a/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import scala.util.Try
-
-import org.apache.spark.sql.types.{StructField, StructType}
-
-object ShimResolveDefaultColumns {
-  // TODO: remove after dropping Spark 3.3 support and directly use 
ResolveDefaultColumns
-  def getExistenceDefaultValue(field: StructField): Any =
-    Try {
-      // scalastyle:off classforname
-      
Class.forName("org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$")
-      // scalastyle:on classforname
-    }.map { objClass =>
-      val objInstance = objClass.getField("MODULE$").get(null)
-      val method = objClass.getMethod("getExistenceDefaultValues", 
classOf[StructType])
-      method.invoke(objInstance, 
StructType(Seq(field))).asInstanceOf[Array[Any]].head
-    }.getOrElse(null)
-}
diff --git 
a/common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
 
b/common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
deleted file mode 100644
index f641b192b..000000000
--- 
a/common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.spark.SparkArithmeticException
-import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLConf
-import org.apache.spark.sql.internal.SQLConf
-
-// TODO: Only the Spark 3.3 version of this class is different from the others.
-//       Remove this class after dropping Spark 3.3 support.
-class ShimCastOverflowException(t: String, from: String, to: String)
-  extends SparkArithmeticException(
-    "CAST_OVERFLOW",
-    Map(
-      "value" -> t,
-      "sourceType" -> s""""$from"""",
-      "targetType" -> s""""$to"""",
-      "ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key)),
-    Array.empty,
-    "") {}
diff --git a/dev/release/build-release-comet.sh 
b/dev/release/build-release-comet.sh
index bd5f43e09..e8339213e 100755
--- a/dev/release/build-release-comet.sh
+++ b/dev/release/build-release-comet.sh
@@ -192,8 +192,6 @@ LOCAL_REPO=$(mktemp -d /tmp/comet-staging-repo-XXXXX)
 
 ./mvnw  "-Dmaven.repo.local=${LOCAL_REPO}" -P spark-3.4 -P scala-2.12  
-DskipTests install
 ./mvnw  "-Dmaven.repo.local=${LOCAL_REPO}" -P spark-3.4 -P scala-2.13  
-DskipTests install
-./mvnw  "-Dmaven.repo.local=${LOCAL_REPO}" -P spark-3.3 -P scala-2.12  
-DskipTests install
-./mvnw  "-Dmaven.repo.local=${LOCAL_REPO}" -P spark-3.3 -P scala-2.13  
-DskipTests install
 ./mvnw  "-Dmaven.repo.local=${LOCAL_REPO}" -P spark-3.5 -P scala-2.12  
-DskipTests install
 ./mvnw  "-Dmaven.repo.local=${LOCAL_REPO}" -P spark-3.5 -P scala-2.13  
-DskipTests install
 
diff --git a/docs/source/user-guide/installation.md 
b/docs/source/user-guide/installation.md
index 937280ae4..2a2fc9c8c 100644
--- a/docs/source/user-guide/installation.md
+++ b/docs/source/user-guide/installation.md
@@ -32,7 +32,6 @@ Make sure the following requirements are met and software 
installed on your mach
 
 Comet currently supports the following versions of Apache Spark:
 
-- 3.3.x (Java 8/11/17, Scala 2.12/2.13)
 - 3.4.x (Java 8/11/17, Scala 2.12/2.13)
 - 3.5.x (Java 8/11/17, Scala 2.12/2.13)
 
@@ -51,8 +50,6 @@ is currently necessary to build from source.
 
 Here are the direct links for downloading the Comet 0.6.0 jar file.
 
-- [Comet plugin for Spark 3.3 / Scala 
2.12](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.3_2.12/0.6.0/comet-spark-spark3.3_2.12-0.6.0.jar)
-- [Comet plugin for Spark 3.3 / Scala 
2.13](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.3_2.13/0.6.0/comet-spark-spark3.3_2.13-0.6.0.jar)
 - [Comet plugin for Spark 3.4 / Scala 
2.12](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.4_2.12/0.6.0/comet-spark-spark3.4_2.12-0.6.0.jar)
 - [Comet plugin for Spark 3.4 / Scala 
2.13](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.4_2.13/0.6.0/comet-spark-spark3.4_2.13-0.6.0.jar)
 - [Comet plugin for Spark 3.5 / Scala 
2.12](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.12/0.6.0/comet-spark-spark3.5_2.12-0.6.0.jar)
diff --git a/pom.xml b/pom.xml
index cf82935cd..259760d51 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,12 +95,8 @@ under the License.
       -Djdk.reflect.useDirectMethodHandle=false
     </extraJavaTestArgs>
     <argLine>-ea -Xmx4g -Xss4m ${extraJavaTestArgs}</argLine>
-    <additional.3_4.test.source>spark-3.4-plus</additional.3_4.test.source>
-    <additional.3_5.test.source>not-needed</additional.3_5.test.source>
-    <additional.pre35.test.source>spark-pre-3.5</additional.pre35.test.source>
     <shims.majorVerSrc>spark-3.x</shims.majorVerSrc>
     <shims.minorVerSrc>spark-3.4</shims.minorVerSrc>
-    <shims.pre35Src>spark-pre-3.5</shims.pre35Src>
   </properties>
 
   <dependencyManagement>
@@ -565,9 +561,6 @@ under the License.
         <parquet.version>1.13.1</parquet.version>
         <slf4j.version>2.0.7</slf4j.version>
         <shims.minorVerSrc>spark-3.5</shims.minorVerSrc>
-        <shims.pre35Src>not-needed</shims.pre35Src>
-        <additional.pre35.test.source>not-needed</additional.pre35.test.source>
-        <additional.3_5.test.source>spark-3.5</additional.3_5.test.source>
       </properties>
     </profile>
 
@@ -585,8 +578,6 @@ under the License.
         <slf4j.version>2.0.13</slf4j.version>
         <shims.majorVerSrc>spark-4.0</shims.majorVerSrc>
         <shims.minorVerSrc>not-needed-yet</shims.minorVerSrc>
-        <shims.pre35Src>not-needed</shims.pre35Src>
-        <additional.pre35.test.source>not-needed</additional.pre35.test.source>
         <!-- Use jdk17 by default -->
         <java.version>17</java.version>
         <maven.compiler.source>${java.version}</maven.compiler.source>
diff --git a/spark/pom.xml b/spark/pom.xml
index 7cd0b5057..d28b5d29e 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -260,9 +260,6 @@ under the License.
             </goals>
             <configuration>
               <sources>
-                <source>src/test/${additional.3_4.test.source}</source>
-                <source>src/test/${additional.3_5.test.source}</source>
-                <source>src/test/${additional.pre35.test.source}</source>
                 <source>src/test/${shims.majorVerSrc}</source>
                 <source>src/test/${shims.minorVerSrc}</source>
               </sources>
@@ -278,7 +275,6 @@ under the License.
               <sources>
                 <source>src/main/${shims.majorVerSrc}</source>
                 <source>src/main/${shims.minorVerSrc}</source>
-                <source>src/main/${shims.pre35Src}</source>
               </sources>
             </configuration>
           </execution>
diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 613b3ac00..800cab832 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -438,7 +438,7 @@ class CometSparkSessionExtensions
               op
           }
 
-        case op: LocalLimitExec if getOffset(op) == 0 =>
+        case op: LocalLimitExec =>
           val newOp = transform1(op)
           newOp match {
             case Some(nativeOp) =>
@@ -447,7 +447,7 @@ class CometSparkSessionExtensions
               op
           }
 
-        case op: GlobalLimitExec if getOffset(op) == 0 =>
+        case op: GlobalLimitExec if op.offset == 0 =>
           val newOp = transform1(op)
           newOp match {
             case Some(nativeOp) =>
@@ -459,10 +459,10 @@ class CometSparkSessionExtensions
         case op: CollectLimitExec
             if isCometNative(op.child) && 
CometConf.COMET_EXEC_COLLECT_LIMIT_ENABLED.get(conf)
               && isCometShuffleEnabled(conf)
-              && getOffset(op) == 0 =>
+              && op.offset == 0 =>
           QueryPlanSerde.operator2Proto(op) match {
             case Some(nativeOp) =>
-              val offset = getOffset(op)
+              val offset = op.offset
               val cometOp =
                 CometCollectLimitExec(op, op.limit, offset, op.child)
               CometSinkPlaceHolder(nativeOp, op, cometOp)
@@ -742,7 +742,6 @@ class CometSparkSessionExtensions
         // exchange. It is only used for Comet native execution. We only 
transform Spark broadcast
         // exchange to Comet broadcast exchange if its downstream is a Comet 
native plan or if the
         // broadcast exchange is forced to be enabled by Comet config.
-        // Note that `CometBroadcastExchangeExec` is only supported for Spark 
3.4+.
         case plan if 
plan.children.exists(_.isInstanceOf[BroadcastExchangeExec]) =>
           val newChildren = plan.children.map {
             case b: BroadcastExchangeExec
diff --git 
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala 
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
index 4b775db02..a3e917f79 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
@@ -91,7 +91,7 @@ class CometParquetFileFormat extends ParquetFileFormat with 
MetricsSupport with
     val pushDownDate = sqlConf.parquetFilterPushDownDate
     val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
     val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
-    val pushDownStringPredicate = getPushDownStringPredicate(sqlConf)
+    val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
     val pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold
     val optionsMap = CaseInsensitiveMap[String](options)
     val parquetOptions = new ParquetOptions(optionsMap, sqlConf)
diff --git 
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
 
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
index 3a430d4c6..01ccde94b 100644
--- 
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
+++ 
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
@@ -63,7 +63,7 @@ case class CometParquetPartitionReaderFactory(
   private val pushDownDate = sqlConf.parquetFilterPushDownDate
   private val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
   private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
-  private val pushDownStringPredicate = getPushDownStringPredicate(sqlConf)
+  private val pushDownStringPredicate = 
sqlConf.parquetFilterPushDownStringPredicate
   private val pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold
   private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
   private val parquetFilterPushDown = sqlConf.parquetFilterPushDown
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index a1cb5a732..ec94ffd55 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -1997,9 +1997,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
         }
       case _: ArrayRemove => convert(CometArrayRemove)
       case _: ArrayContains => convert(CometArrayContains)
-      // Function introduced in 3.4.0. Refer by name to provide compatibility
-      // with older Spark builds
-      case _ if expr.prettyName == "array_append" => convert(CometArrayAppend)
+      case _: ArrayAppend => convert(CometArrayAppend)
       case _: ArrayIntersect => convert(CometArrayIntersect)
       case _: ArrayJoin => convert(CometArrayJoin)
       case _: ArraysOverlap => convert(CometArraysOverlap)
diff --git a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala 
b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
index da5e9ff53..0284e553c 100644
--- a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
@@ -21,7 +21,7 @@ package org.apache.comet.serde
 
 import scala.collection.JavaConverters.asJavaIterableConverter
 
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, EvalMode, 
Expression}
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
Average, BitAndAgg, BitOrAgg, BitXorAgg, BloomFilterAggregate, 
CentralMomentAgg, Corr, Covariance, CovPopulation, CovSample, First, Last, 
StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{ByteType, DecimalType, IntegerType, 
LongType, ShortType, StringType}
@@ -140,7 +140,7 @@ object CometAverage extends CometAggregateExpressionSerde 
with ShimQueryPlanSerd
       return None
     }
 
-    if (!isLegacyMode(avg)) {
+    if (avg.evalMode != EvalMode.LEGACY) {
       withInfo(aggExpr, "Average is only supported in legacy mode")
       return None
     }
@@ -195,7 +195,7 @@ object CometSum extends CometAggregateExpressionSerde with 
ShimQueryPlanSerde {
       return None
     }
 
-    if (!isLegacyMode(sum)) {
+    if (sum.evalMode != EvalMode.LEGACY) {
       withInfo(aggExpr, "Sum is only supported in legacy mode")
       return None
     }
diff --git a/spark/src/main/scala/org/apache/spark/Plugins.scala 
b/spark/src/main/scala/org/apache/spark/Plugins.scala
index c4adf48e9..835fa515b 100644
--- a/spark/src/main/scala/org/apache/spark/Plugins.scala
+++ b/spark/src/main/scala/org/apache/spark/Plugins.scala
@@ -25,7 +25,7 @@ import java.util.Collections
 import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
 import org.apache.spark.comet.shims.ShimCometDriverPlugin
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{EXECUTOR_MEMORY, 
EXECUTOR_MEMORY_OVERHEAD}
+import org.apache.spark.internal.config.{EXECUTOR_MEMORY, 
EXECUTOR_MEMORY_OVERHEAD, EXECUTOR_MEMORY_OVERHEAD_FACTOR}
 import org.apache.spark.sql.internal.StaticSQLConf
 
 import org.apache.comet.{CometConf, CometSparkSessionExtensions}
@@ -57,7 +57,7 @@ class CometDriverPlugin extends DriverPlugin with Logging 
with ShimCometDriverPl
         // By default, executorMemory * spark.executor.memoryOverheadFactor, 
with minimum of 384MB
         val executorMemory =
           sc.getConf.getSizeAsMb(EXECUTOR_MEMORY.key, EXECUTOR_MEMORY_DEFAULT)
-        val memoryOverheadFactor = getMemoryOverheadFactor(sc.getConf)
+        val memoryOverheadFactor = 
sc.getConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
         val memoryOverheadMinMib = getMemoryOverheadMinMib(sc.getConf)
 
         Math.max((executorMemory * memoryOverheadFactor).toLong, 
memoryOverheadMinMib)
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
index dc1f2db8b..220df65ac 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
@@ -21,7 +21,7 @@ package org.apache.spark.sql.comet
 
 import org.apache.spark.rdd._
 import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
DynamicPruningExpression, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
DynamicPruningExpression, Expression, Literal, SortOrder}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.connector.read._
@@ -33,12 +33,11 @@ import org.apache.spark.sql.vectorized._
 import com.google.common.base.Objects
 
 import org.apache.comet.{DataTypeSupport, MetricsSupport}
-import org.apache.comet.shims.ShimCometBatchScanExec
 
 case class CometBatchScanExec(wrapped: BatchScanExec, runtimeFilters: 
Seq[Expression])
     extends DataSourceV2ScanExecBase
-    with ShimCometBatchScanExec
     with CometPlan {
+  def ordering: Option[Seq[SortOrder]] = wrapped.ordering
 
   wrapped.logicalLink.foreach(setLogicalLink)
 
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
index 6bc519ab9..3285159be 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
@@ -29,9 +29,7 @@ import scala.concurrent.duration.NANOSECONDS
 import scala.util.control.NonFatal
 
 import org.apache.spark.{broadcast, Partition, SparkContext, SparkEnv, 
TaskContext}
-import org.apache.spark.comet.shims.ShimCometBroadcastExchangeExec
 import org.apache.spark.io.CompressionCodec
-import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -61,16 +59,12 @@ import org.apache.comet.CometRuntimeException
  * Note that this class cannot extend `CometExec` as usual similar to other 
Comet operators. As
  * the trait `BroadcastExchangeLike` in Spark extends abstract class 
`Exchange`, it limits the
  * flexibility to extend `CometExec` and `Exchange` at the same time.
- *
- * Note that this only supports Spark 3.4 and later, because the serialization 
class
- * `ChunkedByteBuffer` is only serializable in Spark 3.4 and later.
  */
 case class CometBroadcastExchangeExec(
     originalPlan: SparkPlan,
     override val output: Seq[Attribute],
     override val child: SparkPlan)
-    extends BroadcastExchangeLike
-    with ShimCometBroadcastExchangeExec {
+    extends BroadcastExchangeLike {
   import CometBroadcastExchangeExec._
 
   override val runId: UUID = UUID.randomUUID
@@ -181,7 +175,8 @@ case class CometBroadcastExchangeExec(
         longMetric("buildTime") += NANOSECONDS.toMillis(beforeBroadcast - 
beforeBuild)
 
         // (3.4 only) SPARK-39983 - Broadcast the relation without caching the 
unserialized object.
-        val broadcasted = doBroadcast(sparkContext, batches)
+        val broadcasted = sparkContext
+          .broadcastInternal(batches, true)
           .asInstanceOf[broadcast.Broadcast[Any]]
         longMetric("broadcastTime") += NANOSECONDS.toMillis(System.nanoTime() 
- beforeBroadcast)
         val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
@@ -193,16 +188,7 @@ case class CometBroadcastExchangeExec(
         // SparkFatalException, which is a subclass of Exception. 
ThreadUtils.awaitResult
         // will catch this exception and re-throw the wrapped fatal throwable.
         case oe: OutOfMemoryError =>
-          // Spark 3.4 has two parameters for 
`notEnoughMemoryToBuildAndBroadcastTableError`, which
-          // is different to Spark 3.3. We simply create the error message 
here.
-          val error =
-            new OutOfMemoryError(
-              "Not enough memory to build and broadcast the table to all " +
-                "worker nodes. As a workaround, you can either disable 
broadcast by setting " +
-                s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 or 
increase the spark " +
-                s"driver memory by setting ${SparkLauncher.DRIVER_MEMORY} to a 
higher value.")
-              .initCause(oe.getCause)
-          val ex = new SparkFatalException(error)
+          val ex = new SparkFatalException(oe)
           promise.tryFailure(ex)
           throw ex
         case e if !NonFatal(e) =>
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
index 7c8db1ab9..9e97c06d5 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.comet.shims.ShimCometScanExec
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetOptions}
@@ -307,7 +308,7 @@ case class CometScanExec(
         .groupBy { f =>
           BucketingUtils
             .getBucketId(new Path(f.filePath.toString()).getName)
-            .getOrElse(throw invalidBucketFile(f.filePath.toString(), 
sparkContext.version))
+            .getOrElse(throw 
QueryExecutionErrors.invalidBucketFile(f.filePath.toString()))
         }
 
     val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
index 19586628a..885253868 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 import org.apache.comet.serde.QueryPlanSerde.exprToProto
 import org.apache.comet.serde.QueryPlanSerde.supportedSortType
-import org.apache.comet.shims.ShimCometTakeOrderedAndProjectExec
 
 /**
  * Comet physical plan node for Spark `TakeOrderedAndProjectExec`.
@@ -130,12 +129,12 @@ case class CometTakeOrderedAndProjectExec(
     this.copy(child = newChild)
 }
 
-object CometTakeOrderedAndProjectExec extends 
ShimCometTakeOrderedAndProjectExec {
+object CometTakeOrderedAndProjectExec {
   // TODO: support offset for Spark 3.4
   def isSupported(plan: TakeOrderedAndProjectExec): Boolean = {
     val exprs = plan.projectList.map(exprToProto(_, plan.child.output))
     val sortOrders = plan.sortOrder.map(exprToProto(_, plan.child.output))
-    exprs.forall(_.isDefined) && sortOrders.forall(_.isDefined) && 
getOffset(plan).getOrElse(
-      0) == 0 && supportedSortType(plan, plan.sortOrder)
+    exprs.forall(_.isDefined) && sortOrders.forall(_.isDefined) && plan.offset 
== 0 &&
+    supportedSortType(plan, plan.sortOrder)
   }
 }
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
index 7c27fb196..f2af4402d 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
@@ -617,18 +617,3 @@ class CometShuffleWriteProcessor(
     }
   }
 }
-
-/**
- * Copied from Spark `PartitionIdPassthrough` as it is private in Spark 3.3.
- */
-private[spark] class PartitionIdPassthrough(override val numPartitions: Int) 
extends Partitioner {
-  override def getPartition(key: Any): Int = key.asInstanceOf[Int]
-}
-
-/**
- * Copied from Spark `ConstantPartitioner` as it doesn't exist in Spark 3.3.
- */
-private[spark] class ConstantPartitioner extends Partitioner {
-  override def numPartitions: Int = 1
-  override def getPartition(key: Any): Int = 0
-}
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index f36b41aa6..3b9c6bdbc 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -31,11 +31,11 @@ import 
org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expre
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
AggregateMode}
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, 
BuildSide}
 import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, PartitioningCollection, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
HashPartitioningLike, Partitioning, PartitioningCollection, UnknownPartitioning}
 import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
-import org.apache.spark.sql.comet.plans.PartitioningPreservingUnaryExecNode
 import org.apache.spark.sql.comet.util.Utils
 import org.apache.spark.sql.execution.{BinaryExecNode, ColumnarToRowExec, 
ExecSubqueryExpression, ExplainUtils, LeafExecNode, ScalarSubquery, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.execution.PartitioningPreservingUnaryExecNode
 import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, 
BroadcastQueryStageExec, ShuffleQueryStageExec}
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -47,7 +47,6 @@ import com.google.common.base.Objects
 
 import org.apache.comet.{CometConf, CometExecIterator, CometRuntimeException}
 import org.apache.comet.serde.OperatorOuterClass.Operator
-import org.apache.comet.shims.ShimCometBroadcastHashJoinExec
 
 /**
  * A Comet physical operator
@@ -793,8 +792,7 @@ case class CometBroadcastHashJoinExec(
     override val left: SparkPlan,
     override val right: SparkPlan,
     override val serializedPlanOpt: SerializedPlan)
-    extends CometBinaryExec
-    with ShimCometBroadcastHashJoinExec {
+    extends CometBinaryExec {
 
   // The following logic of `outputPartitioning` is copied from Spark 
`BroadcastHashJoinExec`.
   protected lazy val streamedPlan: SparkPlan = buildSide match {
@@ -883,8 +881,13 @@ case class CometBroadcastHashJoinExec(
       }
     }
 
+    val hashPartitioningLikeExpressions =
+      partitioning match {
+        case p: HashPartitioningLike => p.expressions
+        case _ => Seq()
+      }
     PartitioningCollection(
-      
generateExprCombinations(getHashPartitioningLikeExpressions(partitioning), Nil)
+      generateExprCombinations(hashPartitioningLikeExpressions, Nil)
         .map(exprs => 
partitioning.withNewChildren(exprs).asInstanceOf[Partitioning]))
   }
 
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala
index f28ca2bba..1388f7d97 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala
@@ -23,16 +23,13 @@ import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeSet, Expression, NamedExpression}
-import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+import 
org.apache.spark.sql.internal.SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT
 
 /**
  * A trait that provides functionality to handle aliases in the 
`outputExpressions`.
  */
 trait AliasAwareOutputExpression extends SQLConfHelper {
-  // `SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT` is Spark 3.4+ only.
-  // Use a default value for now.
-  protected val aliasCandidateLimit: Int =
-    
conf.getConfString("spark.sql.optimizer.expressionProjectionCandidateLimit", 
"100").toInt
+  protected val aliasCandidateLimit: Int = 
conf.getConf(EXPRESSION_PROJECTION_CANDIDATE_LIMIT)
   protected def outputExpressions: Seq[NamedExpression]
 
   /**
@@ -76,7 +73,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper {
    */
   protected def projectExpression(expr: Expression): Stream[Expression] = {
     val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
-    multiTransformDown(expr) {
+    expr.multiTransformDown {
       // Mapping with aliases
       case e: Expression if aliasMap.contains(e.canonicalized) =>
         aliasMap(e.canonicalized).toSeq ++ (if (e.containsChild.nonEmpty) 
Seq(e) else Seq.empty)
@@ -85,60 +82,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper {
       // This prune will go up to the closest `multiTransformDown()` call and 
returns `Stream.empty`
       // there.
       case a: Attribute if !outputSet.contains(a) => Seq.empty
-    }
-  }
-
-  // Copied from Spark 3.4+ to make it available in Spark 3.3+.
-  def multiTransformDown(expr: Expression)(
-      rule: PartialFunction[Expression, Seq[Expression]]): Stream[Expression] 
= {
-
-    // We could return `Seq(this)` if the `rule` doesn't apply and handle both
-    // - the doesn't apply
-    // - and the rule returns a one element `Seq(originalNode)`
-    // cases together. The returned `Seq` can be a `Stream` and unfortunately 
it doesn't seem like
-    // there is a way to match on a one element stream without eagerly 
computing the tail's head.
-    // This contradicts with the purpose of only taking the necessary elements 
from the
-    // alternatives. I.e. the "multiTransformDown is lazy" test case in 
`TreeNodeSuite` would fail.
-    // Please note that this behaviour has a downside as well that we can only 
mark the rule on the
-    // original node ineffective if the rule didn't match.
-    var ruleApplied = true
-    val afterRules = CurrentOrigin.withOrigin(expr.origin) {
-      rule.applyOrElse(
-        expr,
-        (_: Expression) => {
-          ruleApplied = false
-          Seq.empty
-        })
-    }
-
-    val afterRulesStream = if (afterRules.isEmpty) {
-      if (ruleApplied) {
-        // If the rule returned with empty alternatives then prune
-        Stream.empty
-      } else {
-        // If the rule was not applied then keep the original node
-        Stream(expr)
-      }
-    } else {
-      // If the rule was applied then use the returned alternatives
-      afterRules.toStream.map { afterRule =>
-        if (expr fastEquals afterRule) {
-          expr
-        } else {
-          afterRule.copyTagsFrom(expr)
-          afterRule
-        }
-      }
-    }
-
-    afterRulesStream.flatMap { afterRule =>
-      if (afterRule.containsChild.nonEmpty) {
-        generateCartesianProduct(afterRule.children.map(c => () => 
multiTransformDown(c)(rule)))
-          .map(afterRule.withNewChildren)
-      } else {
-        Stream(afterRule)
-      }
-    }
+    }.toStream
   }
 
   def generateCartesianProduct[T](elementSeqs: Seq[() => Seq[T]]): 
Stream[Seq[T]] = {
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala
deleted file mode 100644
index d584e8609..000000000
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.plans
-
-import scala.collection.mutable
-
-import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression}
-import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
PartitioningCollection, UnknownPartitioning}
-import org.apache.spark.sql.execution.UnaryExecNode
-
-/**
- * A trait that handles aliases in the `outputExpressions` to produce 
`outputPartitioning` that
- * satisfies distribution requirements.
- *
- * This is copied from Spark's `PartitioningPreservingUnaryExecNode` because 
it is only available
- * in Spark 3.4+. This is a workaround to make it available in Spark 3.3+.
- */
-trait PartitioningPreservingUnaryExecNode extends UnaryExecNode with 
AliasAwareOutputExpression {
-  final override def outputPartitioning: Partitioning = {
-    val partitionings: Seq[Partitioning] = if (hasAlias) {
-      flattenPartitioning(child.outputPartitioning).flatMap {
-        case e: Expression =>
-          // We need unique partitionings but if the input partitioning is
-          // `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id -> 
b` aliases then after
-          // the projection we have 4 partitionings:
-          // `HashPartitioning(Seq(a + a))`, `HashPartitioning(Seq(a + b))`,
-          // `HashPartitioning(Seq(b + a))`, `HashPartitioning(Seq(b + b))`, 
but
-          // `HashPartitioning(Seq(a + b))` is the same as 
`HashPartitioning(Seq(b + a))`.
-          val partitioningSet = mutable.Set.empty[Expression]
-          projectExpression(e)
-            .filter(e => partitioningSet.add(e.canonicalized))
-            .take(aliasCandidateLimit)
-            .asInstanceOf[Stream[Partitioning]]
-        case o => Seq(o)
-      }
-    } else {
-      // Filter valid partitiongs (only reference output attributes of the 
current plan node)
-      val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
-      flattenPartitioning(child.outputPartitioning).filter {
-        case e: Expression => e.references.subsetOf(outputSet)
-        case _ => true
-      }
-    }
-    partitionings match {
-      case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions)
-      case Seq(p) => p
-      case ps => PartitioningCollection(ps)
-    }
-  }
-
-  private def flattenPartitioning(partitioning: Partitioning): 
Seq[Partitioning] = {
-    partitioning match {
-      case PartitioningCollection(childPartitionings) =>
-        childPartitionings.flatMap(flattenPartitioning)
-      case rest =>
-        rest +: Nil
-    }
-  }
-}
diff --git 
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
 b/spark/src/main/spark-3.4/org/apache/comet/shims/ShimSQLConf.scala
similarity index 80%
rename from 
spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
rename to spark/src/main/spark-3.4/org/apache/comet/shims/ShimSQLConf.scala
index 5a8ac97b3..0bff426c2 100644
--- 
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
+++ b/spark/src/main/spark-3.4/org/apache/comet/shims/ShimSQLConf.scala
@@ -19,8 +19,9 @@
 
 package org.apache.comet.shims
 
-import org.apache.spark.sql.execution.TakeOrderedAndProjectExec
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 
-trait ShimCometTakeOrderedAndProjectExec {
-  protected def getOffset(plan: TakeOrderedAndProjectExec): Option[Int] = 
Some(plan.offset)
+trait ShimSQLConf {
+  protected val LEGACY = LegacyBehaviorPolicy.LEGACY
+  protected val CORRECTED = LegacyBehaviorPolicy.CORRECTED
 }
diff --git 
a/spark/src/main/spark-pre-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
 
b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
similarity index 59%
rename from 
spark/src/main/spark-pre-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
rename to 
spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
index a25be3bc0..1bd3c7c98 100644
--- 
a/spark/src/main/spark-pre-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
+++ 
b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
@@ -20,8 +20,9 @@
 package org.apache.spark.sql.comet.shims
 
 import org.apache.comet.shims.ShimFileFormat
+
 import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.spark.SparkException
+
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
@@ -34,57 +35,21 @@ import org.apache.spark.sql.types.StructType
 trait ShimCometScanExec {
   def wrapped: FileSourceScanExec
 
-  // TODO: remove after dropping Spark 3.3 support
-  lazy val metadataColumns: Seq[AttributeReference] = 
wrapped.getClass.getDeclaredMethods
-    .filter(_.getName == "metadataColumns")
-    .map { a => a.setAccessible(true); a }
-    .flatMap(_.invoke(wrapped).asInstanceOf[Seq[AttributeReference]])
-
-  // TODO: remove after dropping Spark 3.3 support and directly call
-  //       wrapped.fileConstantMetadataColumns
   lazy val fileConstantMetadataColumns: Seq[AttributeReference] =
-    wrapped.getClass.getDeclaredMethods
-      .filter(_.getName == "fileConstantMetadataColumns")
-      .map { a => a.setAccessible(true); a }
-      .flatMap(_.invoke(wrapped).asInstanceOf[Seq[AttributeReference]])
+    wrapped.fileConstantMetadataColumns
 
-  // TODO: remove after dropping Spark 3.3 and 3.4 support and directly call 
new FileScanRDD
   protected def newFileScanRDD(
       fsRelation: HadoopFsRelation,
       readFunction: PartitionedFile => Iterator[InternalRow],
       filePartitions: Seq[FilePartition],
       readSchema: StructType,
-      options: ParquetOptions): FileScanRDD =
-    classOf[FileScanRDD].getDeclaredConstructors
-      // Prevent to pick up incorrect constructors from any custom Spark forks.
-      .filter(c => List(5, 6).contains(c.getParameterCount()))
-      .map { c =>
-        c.getParameterCount match {
-          case 5 =>
-            c.newInstance(fsRelation.sparkSession, readFunction, 
filePartitions, readSchema, metadataColumns)
-          case 6 =>
-            c.newInstance(
-              fsRelation.sparkSession,
-              readFunction,
-              filePartitions,
-              readSchema,
-              fileConstantMetadataColumns,
-              options)
-        }
-      }
-      .last
-      .asInstanceOf[FileScanRDD]
-
-  // TODO: remove after dropping Spark 3.3 support and directly call
-  //       QueryExecutionErrors.SparkException
-  protected def invalidBucketFile(path: String, sparkVersion: String): 
Throwable = {
-    val messageParameters = if (sparkVersion >= "3.4") Map("path" -> path) 
else Array(path)
-    classOf[SparkException].getDeclaredConstructors
-      .filter(_.getParameterCount == 3)
-      .map(_.newInstance("INVALID_BUCKET_FILE", messageParameters, null))
-      .last
-      .asInstanceOf[SparkException]
-  }
+      options: ParquetOptions): FileScanRDD = new FileScanRDD(
+    fsRelation.sparkSession,
+    readFunction,
+    filePartitions,
+    readSchema,
+    fileConstantMetadataColumns,
+    options)
 
   protected def isNeededForSchema(sparkSchema: StructType): Boolean = {
     // TODO: remove after PARQUET-2161 becomes available in Parquet (tracked 
in SPARK-39634)
diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/ShimSQLConf.scala 
b/spark/src/main/spark-3.5/org/apache/comet/shims/ShimSQLConf.scala
index aafe4aa7c..bdb273946 100644
--- a/spark/src/main/spark-3.5/org/apache/comet/shims/ShimSQLConf.scala
+++ b/spark/src/main/spark-3.5/org/apache/comet/shims/ShimSQLConf.scala
@@ -20,17 +20,8 @@
 package org.apache.comet.shims
 
 import org.apache.spark.sql.internal.LegacyBehaviorPolicy
-import org.apache.spark.sql.internal.SQLConf
 
 trait ShimSQLConf {
-
-  /**
-   * Spark 3.4 renamed parquetFilterPushDownStringStartWith to
-   * parquetFilterPushDownStringPredicate
-   */
-  protected def getPushDownStringPredicate(sqlConf: SQLConf): Boolean =
-    sqlConf.parquetFilterPushDownStringPredicate
-
   protected val LEGACY = LegacyBehaviorPolicy.LEGACY
   protected val CORRECTED = LegacyBehaviorPolicy.CORRECTED
 }
diff --git 
a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
 
b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
index e4a5584aa..cee444e3f 100644
--- 
a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
+++ 
b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
-import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
@@ -52,9 +51,6 @@ trait ShimCometScanExec {
     fsRelation.fileFormat.fileConstantMetadataExtractors,
     options)
 
-  protected def invalidBucketFile(path: String, sparkVersion: String): 
Throwable =
-    QueryExecutionErrors.invalidBucketFile(path)
-
   // see SPARK-39634
   protected def isNeededForSchema(sparkSchema: StructType): Boolean = false
 
diff --git 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala
deleted file mode 100644
index 49e931110..000000000
--- 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.catalyst.expressions.SortOrder
-import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
-
-trait ShimCometBatchScanExec {
-  def wrapped: BatchScanExec
-
-  // Only for Spark 3.4+
-  def ordering: Option[Seq[SortOrder]] = wrapped.getClass.getDeclaredMethods
-    .filter(_.getName == "ordering")
-    .flatMap(_.invoke(wrapped).asInstanceOf[Option[Seq[SortOrder]]])
-    .headOption
-}
diff --git 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
deleted file mode 100644
index 442bb9e58..000000000
--- 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-
-trait ShimCometBroadcastHashJoinExec {
-
-  /**
-   * Returns the expressions that are used for hash partitioning including 
`HashPartitioning` and
-   * `CoalescedHashPartitioning`. They shares same trait 
`HashPartitioningLike` since Spark 3.4,
-   * but Spark 3.3 doesn't have `HashPartitioningLike` and 
`CoalescedHashPartitioning`.
-   *
-   * TODO: remove after dropping Spark 3.3 support.
-   */
-  def getHashPartitioningLikeExpressions(partitioning: Partitioning): 
Seq[Expression] = {
-    partitioning.getClass.getDeclaredMethods
-      .filter(_.getName == "expressions")
-      .flatMap(_.invoke(partitioning).asInstanceOf[Seq[Expression]])
-  }
-}
diff --git 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
index 965b6851e..319803544 100644
--- 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
+++ 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
@@ -26,7 +26,7 @@ import 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.types.{StructField, StructType}
 
 trait ShimCometShuffleExchangeExec {
-  // TODO: remove after dropping Spark 3.3 support
+  // TODO: remove after dropping Spark 3.4 support
   def apply(s: ShuffleExchangeExec, shuffleType: ShuffleType): 
CometShuffleExchangeExec = {
     val advisoryPartitionSize = s.getClass.getDeclaredMethods
       .filter(_.getName == "advisoryPartitionSize")
diff --git 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
index c8aeacf2a..ab20cd7bd 100644
--- 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
+++ 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
@@ -19,27 +19,15 @@
 
 package org.apache.comet.shims
 
-import org.apache.spark.sql.execution.{LimitExec, QueryExecution, SparkPlan}
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
 
 trait ShimCometSparkSessionExtensions {
-  /**
-   * TODO: delete after dropping Spark 3.3 support
-   */
-  def getOffset(limit: LimitExec): Int = getOffsetOpt(limit).getOrElse(0)
-
   /**
    * TODO: delete after dropping Spark 3.x support and directly call
    *       SQLConf.EXTENDED_EXPLAIN_PROVIDERS.key
    */
   protected val EXTENDED_EXPLAIN_PROVIDERS_KEY = 
"spark.sql.extendedExplainProviders"
 
-  private def getOffsetOpt(plan: SparkPlan): Option[Int] = 
plan.getClass.getDeclaredFields
-    .filter(_.getName == "offset")
-    .map { a => a.setAccessible(true); a.get(plan) }
-    .filter(_.isInstanceOf[Int])
-    .map(_.asInstanceOf[Int])
-    .headOption
-
   // Extended info is available only since Spark 4.0.0
   // (https://issues.apache.org/jira/browse/SPARK-47289)
   def supportsExtendedExplainInfo(qe: QueryExecution): Boolean = {
diff --git 
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala 
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala
index 1b0996d9d..c47b399cf 100644
--- a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala
+++ b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala
@@ -44,22 +44,4 @@ trait ShimQueryPlanSerde {
       failOnError.head
     }
   }
-
-  // TODO: delete after drop Spark 3.3 support
-  // This method is used to check if the aggregate function is in legacy mode.
-  // EvalMode is an enum object in Spark 3.4.
-  def isLegacyMode(aggregate: DeclarativeAggregate): Boolean = {
-    val evalMode = aggregate.getClass.getDeclaredMethods
-      .flatMap(m =>
-        m.getName match {
-          case "evalMode" => Some(m.invoke(aggregate))
-          case _ => None
-        })
-
-    if (evalMode.isEmpty) {
-      true
-    } else {
-      "legacy".equalsIgnoreCase(evalMode.head.toString)
-    }
-  }
 }
diff --git 
a/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
 
b/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
deleted file mode 100644
index afcf653b4..000000000
--- 
a/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.comet.shims
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.SparkContext
-import org.apache.spark.broadcast.Broadcast
-
-trait ShimCometBroadcastExchangeExec {
-  // TODO: remove after dropping Spark 3.3 support
-  protected def doBroadcast[T: ClassTag](sparkContext: SparkContext, value: 
T): Broadcast[Any] = {
-    // Spark 3.4 has new API `broadcastInternal` to broadcast the relation 
without caching the
-    // unserialized object.
-    val classTag = implicitly[ClassTag[T]]
-    val broadcasted = sparkContext.getClass.getDeclaredMethods
-      .filter(_.getName == "broadcastInternal")
-      .map { a => a.setAccessible(true); a }
-      .map { method =>
-        method
-          .invoke(
-            sparkContext.asInstanceOf[Object],
-            value.asInstanceOf[Object],
-            true.asInstanceOf[Object],
-            classTag.asInstanceOf[Object])
-          .asInstanceOf[Broadcast[Any]]
-      }
-      .headOption
-    // Fallback to the old API if the new API is not available.
-    broadcasted
-      .getOrElse(sparkContext.broadcast(value.asInstanceOf[Object]))
-      .asInstanceOf[Broadcast[Any]]
-  }
-}
diff --git 
a/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
 
b/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
index cfb6a0088..d6a7e2d7f 100644
--- 
a/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
+++ 
b/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
@@ -22,17 +22,10 @@ package org.apache.spark.comet.shims
 import org.apache.spark.SparkConf
 
 trait ShimCometDriverPlugin {
-  // `org.apache.spark.internal.config.EXECUTOR_MEMORY_OVERHEAD_FACTOR` was 
added since Spark 3.3.0
-  private val EXECUTOR_MEMORY_OVERHEAD_FACTOR = 
"spark.executor.memoryOverheadFactor"
-  private val EXECUTOR_MEMORY_OVERHEAD_FACTOR_DEFAULT = 0.1
   // `org.apache.spark.internal.config.EXECUTOR_MIN_MEMORY_OVERHEAD` was added 
since Spark 4.0.0
   private val EXECUTOR_MIN_MEMORY_OVERHEAD = "spark.executor.minMemoryOverhead"
   private val EXECUTOR_MIN_MEMORY_OVERHEAD_DEFAULT = 384L
 
-  def getMemoryOverheadFactor(sc: SparkConf): Double =
-    sc.getDouble(
-      EXECUTOR_MEMORY_OVERHEAD_FACTOR,
-      EXECUTOR_MEMORY_OVERHEAD_FACTOR_DEFAULT)
   def getMemoryOverheadMinMib(sc: SparkConf): Long =
     sc.getLong(EXECUTOR_MIN_MEMORY_OVERHEAD, 
EXECUTOR_MIN_MEMORY_OVERHEAD_DEFAULT)
 }
diff --git 
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala 
b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala
deleted file mode 100644
index d41502f6e..000000000
--- 
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.catalyst.expressions.SortOrder
-import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
-
-trait ShimCometBatchScanExec {
-  def wrapped: BatchScanExec
-
-  def ordering: Option[Seq[SortOrder]] = wrapped.ordering
-}
diff --git 
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
 
b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
deleted file mode 100644
index 1f689b400..000000000
--- 
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioningLike, 
Partitioning}
-
-trait ShimCometBroadcastHashJoinExec {
-  protected def getHashPartitioningLikeExpressions(partitioning: 
Partitioning): Seq[Expression] =
-    partitioning match {
-      case p: HashPartitioningLike => p.expressions
-      case _ => Seq()
-    }
-}
diff --git 
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
 
b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
index 9fb7355ee..e68c0cb3e 100644
--- 
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
+++ 
b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
@@ -20,17 +20,13 @@
 package org.apache.comet.shims
 
 import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
-import org.apache.spark.sql.execution.{CollectLimitExec, GlobalLimitExec, 
LocalLimitExec, QueryExecution}
+import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
 import org.apache.spark.sql.internal.SQLConf
 
 trait ShimCometSparkSessionExtensions {
   protected def getPushedAggregate(scan: ParquetScan): Option[Aggregation] = 
scan.pushedAggregate
 
-  protected def getOffset(limit: LocalLimitExec): Int = 0
-  protected def getOffset(limit: GlobalLimitExec): Int = limit.offset
-  protected def getOffset(limit: CollectLimitExec): Int = limit.offset
-
   protected def supportsExtendedExplainInfo(qe: QueryExecution): Boolean = true
 
   protected val EXTENDED_EXPLAIN_PROVIDERS_KEY = 
SQLConf.EXTENDED_EXPLAIN_PROVIDERS.key
diff --git 
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimQueryPlanSerde.scala 
b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimQueryPlanSerde.scala
index 4d261f3c2..10821881b 100644
--- a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimQueryPlanSerde.scala
+++ b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimQueryPlanSerde.scala
@@ -19,7 +19,7 @@
 
 package org.apache.comet.shims
 
-import org.apache.spark.sql.catalyst.expressions.{BinaryArithmetic, 
BinaryExpression, BloomFilterMightContain, EvalMode}
+import org.apache.spark.sql.catalyst.expressions.{BinaryArithmetic, 
BinaryExpression, BloomFilterMightContain}
 import org.apache.spark.sql.catalyst.expressions.aggregate.{Average, Sum}
 
 trait ShimQueryPlanSerde {
@@ -29,9 +29,6 @@ trait ShimQueryPlanSerde {
   protected def getFailOnError(aggregate: Sum): Boolean = 
aggregate.initQueryContext().isDefined
   protected def getFailOnError(aggregate: Average): Boolean = 
aggregate.initQueryContext().isDefined
 
-  protected def isLegacyMode(aggregate: Sum): Boolean = 
aggregate.evalMode.equals(EvalMode.LEGACY)
-  protected def isLegacyMode(aggregate: Average): Boolean = 
aggregate.evalMode.equals(EvalMode.LEGACY)
-
   protected def isBloomFilterMightContain(binary: BinaryExpression): Boolean =
     binary.isInstanceOf[BloomFilterMightContain]
 }
diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimSQLConf.scala 
b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimSQLConf.scala
index 574967767..bdb273946 100644
--- a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimSQLConf.scala
+++ b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimSQLConf.scala
@@ -19,13 +19,9 @@
 
 package org.apache.comet.shims
 
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.LegacyBehaviorPolicy
 
 trait ShimSQLConf {
-  protected def getPushDownStringPredicate(sqlConf: SQLConf): Boolean =
-    sqlConf.parquetFilterPushDownStringPredicate
-
   protected val LEGACY = LegacyBehaviorPolicy.LEGACY
   protected val CORRECTED = LegacyBehaviorPolicy.CORRECTED
 }
diff --git 
a/spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
 
b/spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
deleted file mode 100644
index ba87a2515..000000000
--- 
a/spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.comet.shims
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.SparkContext
-import org.apache.spark.broadcast.Broadcast
-
-trait ShimCometBroadcastExchangeExec {
-  protected def doBroadcast[T: ClassTag](sparkContext: SparkContext, value: 
T): Broadcast[Any] =
-    sparkContext.broadcastInternal(value, true)
-}
diff --git 
a/spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
 
b/spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
index f7a57a642..de3263bea 100644
--- 
a/spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
+++ 
b/spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
@@ -20,13 +20,9 @@
 package org.apache.spark.comet.shims
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.config.EXECUTOR_MEMORY_OVERHEAD_FACTOR
 import org.apache.spark.internal.config.EXECUTOR_MIN_MEMORY_OVERHEAD
 
 trait ShimCometDriverPlugin {
-  protected def getMemoryOverheadFactor(sparkConf: SparkConf): Double = 
sparkConf.get(
-    EXECUTOR_MEMORY_OVERHEAD_FACTOR)
-
   protected def getMemoryOverheadMinMib(sparkConf: SparkConf): Long = 
sparkConf.get(
     EXECUTOR_MIN_MEMORY_OVERHEAD)
 }
diff --git 
a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
 
b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
index 3edc43278..7fe9ea53a 100644
--- 
a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
+++ 
b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
@@ -24,7 +24,6 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, FileSourceConstantMetadataAttribute, Literal}
-import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.{FileSourceScanExec, 
PartitionedFileUtil, ScalarSubquery}
@@ -53,9 +52,6 @@ trait ShimCometScanExec {
       options)
   }
 
-  protected def invalidBucketFile(path: String, sparkVersion: String): 
Throwable =
-    QueryExecutionErrors.invalidBucketFile(path)
-
   // see SPARK-39634
   protected def isNeededForSchema(sparkSchema: StructType): Boolean = false
 
diff --git 
a/spark/src/main/spark-pre-3.5/org/apache/comet/shims/ShimSQLConf.scala 
b/spark/src/main/spark-pre-3.5/org/apache/comet/shims/ShimSQLConf.scala
deleted file mode 100644
index 579db51c3..000000000
--- a/spark/src/main/spark-pre-3.5/org/apache/comet/shims/ShimSQLConf.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
-
-trait ShimSQLConf {
-
-  /**
-   * Spark 3.4 renamed parquetFilterPushDownStringStartWith to
-   * parquetFilterPushDownStringPredicate
-   *
-   * TODO: delete after dropping Spark 3.3 support and simply use
-   * parquetFilterPushDownStringPredicate
-   */
-  protected def getPushDownStringPredicate(sqlConf: SQLConf): Boolean =
-    sqlConf.getClass.getMethods
-      .flatMap(m =>
-        m.getName match {
-          case "parquetFilterPushDownStringStartWith" | 
"parquetFilterPushDownStringPredicate" =>
-            Some(m.invoke(sqlConf).asInstanceOf[Boolean])
-          case _ => None
-        })
-      .head
-
-  protected val LEGACY = LegacyBehaviorPolicy.LEGACY
-  protected val CORRECTED = LegacyBehaviorPolicy.CORRECTED
-}
diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
index 7531a9b9a..68febbab2 100644
--- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
@@ -1163,7 +1163,6 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
       hasIncompatibleType: Boolean = false,
       testAnsi: Boolean = true): Unit = {
 
-    // we now support the TryCast expression in Spark 3.3
     withTempPath { dir =>
       val data = roundtripParquet(input, dir).coalesce(1)
       data.createOrReplaceTempView("t")
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 48da86a74..c3bd2efef 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -2689,32 +2689,26 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
           randomSize = 10000)
         withParquetTable(path1.toString, "tbl1") {
           withParquetTable(path2.toString, "tbl2") {
-            // disable broadcast, as comet on spark 3.3 does not support 
broadcast exchange
-            withSQLConf(
-              SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-              SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
-              checkSparkAnswerAndOperator("""
-                  |select
-                  | t1._2 div t2._2, div(t1._2, t2._2),
-                  | t1._3 div t2._3, div(t1._3, t2._3),
-                  | t1._4 div t2._4, div(t1._4, t2._4),
-                  | t1._5 div t2._5, div(t1._5, t2._5),
-                  | t1._9 div t2._9, div(t1._9, t2._9),
-                  | t1._10 div t2._10, div(t1._10, t2._10),
-                  | t1._11 div t2._11, div(t1._11, t2._11)
-                  | from tbl1 t1 join tbl2 t2 on t1._id = t2._id
-                  | order by t1._id""".stripMargin)
-
-              // decimal support requires Spark 3.4 or later
-              checkSparkAnswerAndOperator("""
-                  |select
-                  | t1._12 div t2._12, div(t1._12, t2._12),
-                  | t1._15 div t2._15, div(t1._15, t2._15),
-                  | t1._16 div t2._16, div(t1._16, t2._16),
-                  | t1._17 div t2._17, div(t1._17, t2._17)
-                  | from tbl1 t1 join tbl2 t2 on t1._id = t2._id
-                  | order by t1._id""".stripMargin)
-            }
+            checkSparkAnswerAndOperator("""
+                |select
+                | t1._2 div t2._2, div(t1._2, t2._2),
+                | t1._3 div t2._3, div(t1._3, t2._3),
+                | t1._4 div t2._4, div(t1._4, t2._4),
+                | t1._5 div t2._5, div(t1._5, t2._5),
+                | t1._9 div t2._9, div(t1._9, t2._9),
+                | t1._10 div t2._10, div(t1._10, t2._10),
+                | t1._11 div t2._11, div(t1._11, t2._11)
+                | from tbl1 t1 join tbl2 t2 on t1._id = t2._id
+                | order by t1._id""".stripMargin)
+
+            checkSparkAnswerAndOperator("""
+                |select
+                | t1._12 div t2._12, div(t1._12, t2._12),
+                | t1._15 div t2._15, div(t1._15, t2._15),
+                | t1._16 div t2._16, div(t1._16, t2._16),
+                | t1._17 div t2._17, div(t1._17, t2._17)
+                | from tbl1 t1 join tbl2 t2 on t1._id = t2._id
+                | order by t1._id""".stripMargin)
           }
         }
       }
diff --git 
a/spark/src/test/spark-3.4-plus/org/apache/comet/exec/CometExec3_4PlusSuite.scala
 b/spark/src/test/scala/org/apache/comet/exec/CometExec3_4PlusSuite.scala
similarity index 93%
rename from 
spark/src/test/spark-3.4-plus/org/apache/comet/exec/CometExec3_4PlusSuite.scala
rename to spark/src/test/scala/org/apache/comet/exec/CometExec3_4PlusSuite.scala
index 931f6900a..71060de64 100644
--- 
a/spark/src/test/spark-3.4-plus/org/apache/comet/exec/CometExec3_4PlusSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExec3_4PlusSuite.scala
@@ -20,15 +20,20 @@
 package org.apache.comet.exec
 
 import java.io.ByteArrayOutputStream
+
 import scala.util.Random
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
 import org.apache.spark.sql.{Column, CometTestBase}
-import org.apache.comet.CometConf
 import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain, 
Expression, ExpressionInfo}
 import org.apache.spark.sql.functions.{col, lit}
 import org.apache.spark.util.sketch.BloomFilter
-import org.scalactic.source.Position
-import org.scalatest.Tag
+
+import org.apache.comet.CometConf
+
 /**
  * This test suite contains tests for only Spark 3.4+.
  */
@@ -40,7 +45,8 @@ class CometExec3_4PlusSuite extends CometTestBase {
   override def beforeAll(): Unit = {
     super.beforeAll()
     // Register 'might_contain' to builtin.
-    spark.sessionState.functionRegistry.registerFunction(func_might_contain,
+    spark.sessionState.functionRegistry.registerFunction(
+      func_might_contain,
       new ExpressionInfo(classOf[BloomFilterMightContain].getName, 
"might_contain"),
       (children: Seq[Expression]) => BloomFilterMightContain(children.head, 
children(1)))
   }
@@ -126,8 +132,7 @@ class CometExec3_4PlusSuite extends CometTestBase {
     withTable(table) {
       sql(s"create table $table(col1 long, col2 int) using parquet")
       sql(s"insert into $table values (201, 1)")
-      checkSparkAnswerAndOperator(
-        s"""
+      checkSparkAnswerAndOperator(s"""
            |SELECT might_contain(
            
|X'00000001000000050000000343A2EC6EA8C117E2D3CDB767296B144FC5BFBCED9737F267', 
col1) FROM $table
            |""".stripMargin)
@@ -140,8 +145,7 @@ class CometExec3_4PlusSuite extends CometTestBase {
     withTable(table) {
       sql(s"create table $table(col1 long, col2 int) using parquet")
       sql(s"insert into $table values (201, 1), (null, 2)")
-      checkSparkAnswerAndOperator(
-        s"""
+      checkSparkAnswerAndOperator(s"""
            |SELECT might_contain(null, null) both_null,
            |       might_contain(null, 1L) null_bf,
            |       might_contain(
@@ -157,18 +161,26 @@ class CometExec3_4PlusSuite extends CometTestBase {
 
     withTable(table) {
       sql(s"create table $table(col1 long, col2 binary) using parquet")
-      spark.createDataset(longs).map(x => (x, bfBytes)).toDF("col1", 
"col2").write.insertInto(table)
-      val df = spark.table(table).select(new 
Column(BloomFilterMightContain(lit(bfBytes).expr, col("col1").expr)))
+      spark
+        .createDataset(longs)
+        .map(x => (x, bfBytes))
+        .toDF("col1", "col2")
+        .write
+        .insertInto(table)
+      val df = spark
+        .table(table)
+        .select(new Column(BloomFilterMightContain(lit(bfBytes).expr, 
col("col1").expr)))
       checkSparkAnswerAndOperator(df)
       // check with scalar subquery
-      checkSparkAnswerAndOperator(
-        s"""
+      checkSparkAnswerAndOperator(s"""
            |SELECT might_contain((select first(col2) as col2 from $table), 
col1) FROM $table
            |""".stripMargin)
     }
   }
 
-  private def bloomFilterFromRandomInput(expectedItems: Long, expectedBits: 
Long): (Seq[Long], Array[Byte]) = {
+  private def bloomFilterFromRandomInput(
+      expectedItems: Long,
+      expectedBits: Long): (Seq[Long], Array[Byte]) = {
     val bf = BloomFilter.create(expectedItems, expectedBits)
     val longs = (0 until expectedItems.toInt).map(_ => Random.nextLong())
     longs.foreach(bf.put)
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 3b4a78690..91a8bdf8b 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -371,8 +371,7 @@ class CometExecSuite extends CometTestBase {
     Seq("true", "false").foreach { aqeEnabled =>
       withSQLConf(
         SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled,
-        // `REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION` is a new config in 
Spark 3.3+.
-        "spark.sql.requireAllClusterKeysForDistribution" -> "true",
+        SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key -> "true",
         CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
         val df =
           Seq(("a", 1, 1), ("a", 2, 2), ("b", 1, 3), ("b", 1, 4)).toDF("key1", 
"key2", "value")
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
index 6c5951426..5128a44f4 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
@@ -146,7 +146,7 @@ class CometTPCHQuerySuite extends QueryTest with TPCBase 
with ShimCometTPCHQuery
         val (schema, output) = 
handleExceptions(getNormalizedQueryExecutionResult(spark, query))
         val queryString = query.trim
         val outputString = output.mkString("\n").replaceAll("\\s+$", "")
-        if (shouldRegenerateGoldenFiles) {
+        if (regenerateGoldenFiles) {
           val goldenOutput = {
             s"-- Automatically generated by ${getClass.getSimpleName}\n\n" +
               "-- !query schema\n" +
@@ -227,7 +227,7 @@ class CometTPCHQuerySuite extends QueryTest with TPCBase 
with ShimCometTPCHQuery
   val allJoinConfCombinations: Seq[Map[String, String]] =
     Seq(sortMergeJoinConf, broadcastHashJoinConf, shuffledHashJoinConf)
 
-  val joinConfs: Seq[Map[String, String]] = if (shouldRegenerateGoldenFiles) {
+  val joinConfs: Seq[Map[String, String]] = if (regenerateGoldenFiles) {
     require(
       !sys.env.contains("SPARK_TPCH_JOIN_CONF"),
       "'SPARK_TPCH_JOIN_CONF' cannot be set together with 
'SPARK_GENERATE_GOLDEN_FILES'")
@@ -269,8 +269,4 @@ class CometTPCHQuerySuite extends QueryTest with TPCBase 
with ShimCometTPCHQuery
   } else {
     ignore("skipped because env `SPARK_TPCH_DATA` is not set") {}
   }
-
-  // TODO: remove once Spark 3.3 is no longer supported
-  private def shouldRegenerateGoldenFiles: Boolean =
-    System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1"
 }
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
index 2855e553c..9f611612f 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
@@ -282,7 +282,7 @@ trait CometPlanStabilitySuite extends 
DisableAdaptiveExecutionSuite with TPCDSBa
 
       assert(ValidateRequirements.validate(plan))
 
-      if (shouldRegenerateGoldenFiles) {
+      if (regenerateGoldenFiles) {
         generateGoldenFile(plan, query + suffix, explain)
       } else {
         checkWithApproved(plan, query + suffix, explain)
@@ -306,10 +306,6 @@ trait CometPlanStabilitySuite extends 
DisableAdaptiveExecutionSuite with TPCDSBa
 
     new TestSparkSession(new SparkContext("local[1]", 
this.getClass.getCanonicalName, conf))
   }
-
-  // TODO: remove once Spark 3.3 is no longer supported
-  private val shouldRegenerateGoldenFiles: Boolean =
-    System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1"
 }
 
 class CometTPCDSV1_4_PlanStabilitySuite extends CometPlanStabilitySuite {
diff --git 
a/spark/src/test/spark-pre-3.5/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala
 b/spark/src/test/spark-3.4/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala
similarity index 100%
rename from 
spark/src/test/spark-pre-3.5/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala
rename to 
spark/src/test/spark-3.4/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to