This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new be346c46e chore: Drop support for Spark 3.3 (EOL) (#1529)
be346c46e is described below

commit be346c46e9b2c36b3b949cdd639a9c71794e07f9
Author: Andy Grove <[email protected]>
AuthorDate: Fri Mar 14 13:40:21 2025 -0600

    chore: Drop support for Spark 3.3 (EOL) (#1529)
---
 .github/workflows/pr_build.yml                     |   5 +-
 .../org/apache/comet/shims/ShimBatchReader.scala   |  36 -----
 .../comet/shims/ShimCastOverflowException.scala    |  32 -----
 .../spark/sql/comet/shims/ShimTaskMetrics.scala    |  29 ----
 pom.xml                                            |  14 --
 spark/pom.xml                                      |   1 -
 .../apache/comet/CometSparkSessionExtensions.scala |  16 +--
 .../org/apache/comet/parquet/ParquetFilters.scala  |  18 +--
 .../org/apache/comet/serde/QueryPlanSerde.scala    |  51 ++-----
 .../main/scala/org/apache/comet/serde/arrays.scala |   4 +-
 .../org/apache/comet/shims/CometExprShim.scala     |  40 ------
 .../org/apache/comet/shims/CometExprShim.scala     |   6 -
 .../org/apache/comet/shims/CometExprShim.scala     |   6 -
 .../org/apache/comet/shims/CometExprShim.scala     |   6 -
 .../apache/comet/CometArrayExpressionSuite.scala   |   6 +-
 .../scala/org/apache/comet/CometCastSuite.scala    |  16 +--
 .../org/apache/comet/CometExpressionSuite.scala    |  35 ++---
 .../apache/comet/exec/CometAggregateSuite.scala    |   4 -
 .../org/apache/comet/exec/CometExecSuite.scala     |  22 +--
 .../org/apache/comet/exec/CometJoinSuite.scala     |   3 -
 .../comet/exec/CometNativeShuffleSuite.scala       |   5 -
 .../apache/comet/parquet/ParquetReadSuite.scala    |  68 +++------
 .../org/apache/spark/sql/CometTPCHQuerySuite.scala |   4 -
 .../scala/org/apache/spark/sql/CometTestBase.scala | 158 +++++++--------------
 .../spark/sql/comet/CometPlanStabilitySuite.scala  |   5 +-
 .../apache/comet/CometExpression3_3PlusSuite.scala | 105 --------------
 .../apache/comet/exec/CometExec3_4PlusSuite.scala  |  85 ++++++++++-
 27 files changed, 194 insertions(+), 586 deletions(-)

diff --git a/.github/workflows/pr_build.yml b/.github/workflows/pr_build.yml
index 5418320f5..255fc47c2 100644
--- a/.github/workflows/pr_build.yml
+++ b/.github/workflows/pr_build.yml
@@ -145,7 +145,7 @@ jobs:
         os: [ubuntu-latest]
         java_version: [8, 11, 17]
         test-target: [java]
-        spark-version: ['3.3', '3.4']
+        spark-version: ['3.4']
         scala-version: ['2.12', '2.13']
       fail-fast: false
     name: ${{ matrix.os }}/java ${{ matrix.java_version 
}}-spark-${{matrix.spark-version}}-scala-${{matrix.scala-version}}/${{ 
matrix.test-target }}
@@ -283,11 +283,10 @@ jobs:
       matrix:
         java_version: [8, 17]
         test-target: [java]
-        spark-version: ['3.3', '3.4']
+        spark-version: ['3.4']
         scala-version: ['2.12', '2.13']
         exclude:
           - java_version: 8
-            spark-version: '3.3'
       fail-fast: false
     name: macos-14(Silicon)/java ${{ matrix.java_version 
}}-spark-${{matrix.spark-version}}-scala-${{matrix.scala-version}}/${{ 
matrix.test-target }}
     runs-on: macos-14
diff --git 
a/common/src/main/spark-3.3/org/apache/comet/shims/ShimBatchReader.scala 
b/common/src/main/spark-3.3/org/apache/comet/shims/ShimBatchReader.scala
deleted file mode 100644
index 3cbca896f..000000000
--- a/common/src/main/spark-3.3/org/apache/comet/shims/ShimBatchReader.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-
-object ShimBatchReader {
-
-  def newPartitionedFile(partitionValues: InternalRow, file: String): 
PartitionedFile =
-    PartitionedFile(
-      partitionValues,
-      file,
-      -1, // -1 means we read the entire file
-      -1,
-      Array.empty[String],
-      0,
-      0)
-}
diff --git 
a/common/src/main/spark-3.3/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
 
b/common/src/main/spark-3.3/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
deleted file mode 100644
index 55373309b..000000000
--- 
a/common/src/main/spark-3.3/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.spark.SparkArithmeticException
-import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLConf
-import org.apache.spark.sql.internal.SQLConf
-
-// TODO: Only the Spark 3.3 version of this class is different from the others.
-//       Remove this class after dropping Spark 3.3 support.
-class ShimCastOverflowException(t: String, from: String, to: String)
-  extends SparkArithmeticException(
-    "CAST_OVERFLOW",
-    Array(t, s""""$from"""", s""""$to"""", toSQLConf(SQLConf.ANSI_ENABLED.key))
-  ) {}
diff --git 
a/common/src/main/spark-3.3/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala
 
b/common/src/main/spark-3.3/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala
deleted file mode 100644
index 5b2a5fb5b..000000000
--- 
a/common/src/main/spark-3.3/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.AccumulatorV2
-
-object ShimTaskMetrics {
-
-  def getTaskAccumulator(taskMetrics: TaskMetrics): Option[AccumulatorV2[_, 
_]] =
-    taskMetrics.externalAccums.lastOption
-}
diff --git a/pom.xml b/pom.xml
index e236c1dd7..d5fec779c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,7 +95,6 @@ under the License.
       -Djdk.reflect.useDirectMethodHandle=false
     </extraJavaTestArgs>
     <argLine>-ea -Xmx4g -Xss4m ${extraJavaTestArgs}</argLine>
-    <additional.3_3.test.source>spark-3.3-plus</additional.3_3.test.source>
     <additional.3_4.test.source>spark-3.4-plus</additional.3_4.test.source>
     <additional.3_5.test.source>not-needed</additional.3_5.test.source>
     <additional.pre35.test.source>spark-pre-3.5</additional.pre35.test.source>
@@ -548,19 +547,6 @@ under the License.
       </properties>
     </profile>
 
-    <profile>
-      <id>spark-3.3</id>
-      <properties>
-        <scala.version>2.12.15</scala.version>
-        <spark.version>3.3.2</spark.version>
-        <spark.version.short>3.3</spark.version.short>
-        <parquet.version>1.12.0</parquet.version>
-        <slf4j.version>1.7.32</slf4j.version>
-        <additional.3_4.test.source>not-needed-yet</additional.3_4.test.source>
-        <shims.minorVerSrc>spark-3.3</shims.minorVerSrc>
-      </properties>
-    </profile>
-
     <profile>
       <id>spark-3.4</id>
       <properties>
diff --git a/spark/pom.xml b/spark/pom.xml
index 46cc1c3c1..d37cbe2be 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -260,7 +260,6 @@ under the License.
             </goals>
             <configuration>
               <sources>
-                <source>src/test/${additional.3_3.test.source}</source>
                 <source>src/test/${additional.3_4.test.source}</source>
                 <source>src/test/${additional.3_5.test.source}</source>
                 <source>src/test/${additional.pre35.test.source}</source>
diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 14af8ded9..613b3ac00 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -53,7 +53,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType}
 
 import org.apache.comet.CometConf._
 import org.apache.comet.CometExplainInfo.getActualPlan
-import org.apache.comet.CometSparkSessionExtensions.{createMessage, 
getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, 
isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, 
isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, 
isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus, 
isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos}
+import org.apache.comet.CometSparkSessionExtensions.{createMessage, 
getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, 
isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, 
isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, 
isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark40Plus, 
shouldApplySparkToColumnar, withInfo, withInfos}
 import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
 import org.apache.comet.rules.RewriteJoin
 import org.apache.comet.serde.OperatorOuterClass.Operator
@@ -747,8 +747,7 @@ class CometSparkSessionExtensions
           val newChildren = plan.children.map {
             case b: BroadcastExchangeExec
                 if isCometNative(b.child) &&
-                  CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED.get(conf) &&
-                  isSpark34Plus => // Spark 3.4+ only
+                  CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED.get(conf) =>
               QueryPlanSerde.operator2Proto(b) match {
                 case Some(nativeOp) =>
                   val cometOp = CometBroadcastExchangeExec(b, b.output, 
b.child)
@@ -1235,8 +1234,6 @@ object CometSparkSessionExtensions extends Logging {
       Some(
         s"${COMET_EXEC_BROADCAST_EXCHANGE_ENABLED.key}.enabled is not 
specified and " +
           s"${COMET_EXEC_BROADCAST_FORCE_ENABLED.key} is not specified")
-    } else if (!isSpark34Plus) {
-      Some("Native broadcast requires Spark 3.4 or newer")
     } else {
       None
     }
@@ -1335,15 +1332,6 @@ object CometSparkSessionExtensions extends Logging {
     }
   }
 
-  def isSpark33Plus: Boolean = {
-    org.apache.spark.SPARK_VERSION >= "3.3"
-  }
-
-  /** Used for operations that are available in Spark 3.4+ */
-  def isSpark34Plus: Boolean = {
-    org.apache.spark.SPARK_VERSION >= "3.4"
-  }
-
   def isSpark35Plus: Boolean = {
     org.apache.spark.SPARK_VERSION >= "3.5"
   }
diff --git a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala 
b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
index bcb23986f..30d7804e8 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
@@ -41,7 +41,6 @@ import 
org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulia
 import org.apache.spark.sql.sources
 import org.apache.spark.unsafe.types.UTF8String
 
-import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
 import org.apache.comet.shims.ShimSQLConf
 
 /**
@@ -620,17 +619,12 @@ class ParquetFilters(
     value == null || (nameToParquetField(name).fieldType match {
       case ParquetBooleanType => value.isInstanceOf[JBoolean]
       case ParquetByteType | ParquetShortType | ParquetIntegerType =>
-        if (isSpark34Plus) {
-          value match {
-            // Byte/Short/Int are all stored as INT32 in Parquet so filters 
are built using type
-            // Int. We don't create a filter if the value would overflow.
-            case _: JByte | _: JShort | _: Integer => true
-            case v: JLong => v.longValue() >= Int.MinValue && v.longValue() <= 
Int.MaxValue
-            case _ => false
-          }
-        } else {
-          // If not Spark 3.4+, we still following the old behavior as Spark 
does.
-          value.isInstanceOf[Number]
+        value match {
+          // Byte/Short/Int are all stored as INT32 in Parquet so filters are 
built using type
+          // Int. We don't create a filter if the value would overflow.
+          case _: JByte | _: JShort | _: Integer => true
+          case v: JLong => v.longValue() >= Int.MinValue && v.longValue() <= 
Int.MaxValue
+          case _ => false
         }
       case ParquetLongType => value.isInstanceOf[JLong]
       case ParquetFloatType => value.isInstanceOf[JFloat]
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index a224cd6d3..a1cb5a732 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -46,7 +46,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.{isCometScan, 
isSpark34Plus, withInfo}
+import org.apache.comet.CometSparkSessionExtensions.{isCometScan, withInfo}
 import org.apache.comet.expressions._
 import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => 
ProtoDataType, Expr, ScalarFunc}
 import org.apache.comet.serde.ExprOuterClass.DataType._
@@ -63,10 +63,9 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
 
   def supportedDataType(dt: DataType, allowStruct: Boolean = false): Boolean = 
dt match {
     case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: 
FloatType |
-        _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: 
DecimalType |
-        _: DateType | _: BooleanType | _: NullType =>
+        _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: 
TimestampNTZType |
+        _: DecimalType | _: DateType | _: BooleanType | _: NullType =>
       true
-    case dt if isTimestampNTZType(dt) => true
     case s: StructType if allowStruct =>
       s.fields.map(_.dataType).forall(supportedDataType(_, allowStruct))
     case dt =>
@@ -92,7 +91,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde 
with CometExprShim
       case _: BinaryType => 8
       case _: TimestampType => 9
       case _: DecimalType => 10
-      case dt if isTimestampNTZType(dt) => 11
+      case _: TimestampNTZType => 11
       case _: DateType => 12
       case _: NullType => 13
       case _: ArrayType => 14
@@ -585,8 +584,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
         withInfo(sub, s"Unsupported datatype ${left.dataType}")
         None
 
-      case mul @ Multiply(left, right, _)
-          if supportedDataType(left.dataType) && 
!decimalBeforeSpark34(left.dataType) =>
+      case mul @ Multiply(left, right, _) if supportedDataType(left.dataType) 
=>
         createMathExpression(
           expr,
           left,
@@ -601,13 +599,9 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
         if (!supportedDataType(left.dataType)) {
           withInfo(mul, s"Unsupported datatype ${left.dataType}")
         }
-        if (decimalBeforeSpark34(left.dataType)) {
-          withInfo(mul, "Decimal support requires Spark 3.4 or later")
-        }
         None
 
-      case div @ Divide(left, right, _)
-          if supportedDataType(left.dataType) && 
!decimalBeforeSpark34(left.dataType) =>
+      case div @ Divide(left, right, _) if supportedDataType(left.dataType) =>
         // Datafusion now throws an exception for dividing by zero
         // See https://github.com/apache/arrow-datafusion/pull/6792
         // For now, use NullIf to swap zeros with nulls.
@@ -627,13 +621,9 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
         if (!supportedDataType(left.dataType)) {
           withInfo(div, s"Unsupported datatype ${left.dataType}")
         }
-        if (decimalBeforeSpark34(left.dataType)) {
-          withInfo(div, "Decimal support requires Spark 3.4 or later")
-        }
         None
 
-      case div @ IntegralDivide(left, right, _)
-          if supportedDataType(left.dataType) && 
!decimalBeforeSpark34(left.dataType) =>
+      case div @ IntegralDivide(left, right, _) if 
supportedDataType(left.dataType) =>
         val rightExpr = nullIfWhenPrimitive(right)
 
         val dataType = (left.dataType, right.dataType) match {
@@ -680,13 +670,9 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
         if (!supportedDataType(left.dataType)) {
           withInfo(div, s"Unsupported datatype ${left.dataType}")
         }
-        if (decimalBeforeSpark34(left.dataType)) {
-          withInfo(div, "Decimal support requires Spark 3.4 or later")
-        }
         None
 
-      case rem @ Remainder(left, right, _)
-          if supportedDataType(left.dataType) && 
!decimalBeforeSpark34(left.dataType) =>
+      case rem @ Remainder(left, right, _) if supportedDataType(left.dataType) 
=>
         val rightExpr = nullIfWhenPrimitive(right)
 
         createMathExpression(
@@ -703,9 +689,6 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
         if (!supportedDataType(left.dataType)) {
           withInfo(rem, s"Unsupported datatype ${left.dataType}")
         }
-        if (decimalBeforeSpark34(left.dataType)) {
-          withInfo(rem, "Decimal support requires Spark 3.4 or later")
-        }
         None
 
       case EqualTo(left, right) =>
@@ -798,6 +781,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
             case _: StringType =>
               exprBuilder.setStringVal(value.asInstanceOf[UTF8String].toString)
             case _: TimestampType => 
exprBuilder.setLongVal(value.asInstanceOf[Long])
+            case _: TimestampNTZType => 
exprBuilder.setLongVal(value.asInstanceOf[Long])
             case _: DecimalType =>
               // Pass decimal literal as bytes.
               val unscaled = 
value.asInstanceOf[Decimal].toBigDecimal.underlying.unscaledValue
@@ -808,8 +792,6 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
                 
com.google.protobuf.ByteString.copyFrom(value.asInstanceOf[Array[Byte]])
               exprBuilder.setBytesVal(byteStr)
             case _: DateType => exprBuilder.setIntVal(value.asInstanceOf[Int])
-            case dt if isTimestampNTZType(dt) =>
-              exprBuilder.setLongVal(value.asInstanceOf[Long])
             case dt =>
               logWarning(s"Unexpected date type '$dt' for literal value 
'$value'")
           }
@@ -2242,7 +2224,7 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
     case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: 
FloatType |
         _: DoubleType | _: StringType | _: DateType | _: DecimalType | _: 
BooleanType =>
       true
-    case dt if isTimestampNTZType(dt) => true
+    case TimestampNTZType => true
     case _ => false
   }
 
@@ -2804,16 +2786,6 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
     }
   }
 
-  /**
-   * Checks whether `dt` is a decimal type AND whether Spark version is before 
3.4
-   */
-  private def decimalBeforeSpark34(dt: DataType): Boolean = {
-    !isSpark34Plus && (dt match {
-      case _: DecimalType => true
-      case _ => false
-    })
-  }
-
   /**
    * Check if the datatypes of shuffle input are supported. This is used for 
Columnar shuffle
    * which supports struct/array.
@@ -2953,9 +2925,8 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
       val canSort = sortOrder.head.dataType match {
         case _: BooleanType => true
         case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: 
FloatType |
-            _: DoubleType | _: TimestampType | _: DecimalType | _: DateType =>
+            _: DoubleType | _: TimestampType | _: TimestampType | _: 
DecimalType | _: DateType =>
           true
-        case dt if isTimestampNTZType(dt) => true
         case _: BinaryType | _: StringType => true
         case ArrayType(elementType, _) => canRank(elementType)
         case _ => false
diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala 
b/spark/src/main/scala/org/apache/comet/serde/arrays.scala
index 1ada81b97..60df51b08 100644
--- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala
@@ -33,9 +33,9 @@ object CometArrayRemove extends CometExpressionSerde with 
CometExprShim {
     import DataTypes._
     dt match {
       case BooleanType | ByteType | ShortType | IntegerType | LongType | 
FloatType | DoubleType |
-          _: DecimalType | DateType | TimestampType | StringType | BinaryType 
=>
+          _: DecimalType | DateType | TimestampType | TimestampNTZType | 
StringType |
+          BinaryType =>
         true
-      case t if isTimestampNTZType(t) => true
       case ArrayType(elementType, _) => isTypeSupported(elementType)
       case _: StructType =>
         // https://github.com/apache/datafusion-comet/issues/1307
diff --git 
a/spark/src/main/spark-3.3/org/apache/comet/shims/CometExprShim.scala 
b/spark/src/main/spark-3.3/org/apache/comet/shims/CometExprShim.scala
deleted file mode 100644
index aa6db06d8..000000000
--- a/spark/src/main/spark-3.3/org/apache/comet/shims/CometExprShim.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.comet.shims
-
-import org.apache.comet.expressions.CometEvalMode
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.DataType
-
-/**
- * `CometExprShim` acts as a shim for for parsing expressions from different 
Spark versions.
- */
-trait CometExprShim {
-    /**
-     * Returns a tuple of expressions for the `unhex` function.
-     */
-    protected def unhexSerde(unhex: Unhex): (Expression, Expression) = {
-        (unhex.child, Literal(false))
-    }
-
-    protected def isTimestampNTZType(dt: DataType): Boolean =
-        dt.typeName == "timestamp_ntz" // `TimestampNTZType` is private
-
-    protected def evalMode(c: Cast): CometEvalMode.Value = 
CometEvalMode.fromBoolean(c.ansiEnabled)
-}
diff --git 
a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala 
b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala
index 7709957b4..5f4e3fba2 100644
--- a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala
+++ b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala
@@ -20,7 +20,6 @@ package org.apache.comet.shims
 
 import org.apache.comet.expressions.CometEvalMode
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{DataType, TimestampNTZType}
 
 /**
  * `CometExprShim` acts as a shim for for parsing expressions from different 
Spark versions.
@@ -33,11 +32,6 @@ trait CometExprShim {
         (unhex.child, Literal(unhex.failOnError))
     }
 
-    protected def isTimestampNTZType(dt: DataType): Boolean = dt match {
-        case _: TimestampNTZType => true
-        case _ => false
-    }
-
     protected def evalMode(c: Cast): CometEvalMode.Value =
         CometEvalModeUtil.fromSparkEvalMode(c.evalMode)
 }
diff --git 
a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala 
b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala
index 7709957b4..5f4e3fba2 100644
--- a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala
+++ b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala
@@ -20,7 +20,6 @@ package org.apache.comet.shims
 
 import org.apache.comet.expressions.CometEvalMode
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{DataType, TimestampNTZType}
 
 /**
  * `CometExprShim` acts as a shim for for parsing expressions from different 
Spark versions.
@@ -33,11 +32,6 @@ trait CometExprShim {
         (unhex.child, Literal(unhex.failOnError))
     }
 
-    protected def isTimestampNTZType(dt: DataType): Boolean = dt match {
-        case _: TimestampNTZType => true
-        case _ => false
-    }
-
     protected def evalMode(c: Cast): CometEvalMode.Value =
         CometEvalModeUtil.fromSparkEvalMode(c.evalMode)
 }
diff --git 
a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala 
b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala
index 7709957b4..5f4e3fba2 100644
--- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala
+++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala
@@ -20,7 +20,6 @@ package org.apache.comet.shims
 
 import org.apache.comet.expressions.CometEvalMode
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{DataType, TimestampNTZType}
 
 /**
  * `CometExprShim` acts as a shim for for parsing expressions from different 
Spark versions.
@@ -33,11 +32,6 @@ trait CometExprShim {
         (unhex.child, Literal(unhex.failOnError))
     }
 
-    protected def isTimestampNTZType(dt: DataType): Boolean = dt match {
-        case _: TimestampNTZType => true
-        case _ => false
-    }
-
     protected def evalMode(c: Cast): CometEvalMode.Value =
         CometEvalModeUtil.fromSparkEvalMode(c.evalMode)
 }
diff --git 
a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
index f8d709dc6..8b3299dc9 100644
--- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
@@ -28,7 +28,7 @@ import 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.functions.{array, col, expr, lit, udf}
 import org.apache.spark.sql.types.StructType
 
-import org.apache.comet.CometSparkSessionExtensions.{isSpark34Plus, 
isSpark35Plus}
+import org.apache.comet.CometSparkSessionExtensions.isSpark35Plus
 import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
 
 class CometArrayExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
@@ -135,7 +135,6 @@ class CometArrayExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelp
   }
 
   test("array_append") {
-    assume(isSpark34Plus)
     withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
       Seq(true, false).foreach { dictionaryEnabled =>
         withTempDir { dir =>
@@ -182,7 +181,6 @@ class CometArrayExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelp
   }
 
   test("ArrayInsert") {
-    assume(isSpark34Plus)
     Seq(true, false).foreach(dictionaryEnabled =>
       withTempDir { dir =>
         val path = new Path(dir.toURI.toString, "test.parquet")
@@ -206,7 +204,6 @@ class CometArrayExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelp
   test("ArrayInsertUnsupportedArgs") {
     // This test checks that the else branch in ArrayInsert
     // mapping to the comet is valid and fallback to spark is working fine.
-    assume(isSpark34Plus)
     withTempDir { dir =>
       val path = new Path(dir.toURI.toString, "test.parquet")
       makeParquetFileAllTypes(path, dictionaryEnabled = false, 10000)
@@ -296,7 +293,6 @@ class CometArrayExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelp
   }
 
   test("array_compact") {
-    assume(isSpark34Plus)
     withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
       Seq(true, false).foreach { dictionaryEnabled =>
         withTempDir { dir =>
diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
index 208aef534..7531a9b9a 100644
--- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
@@ -902,7 +902,6 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   test("cast TimestampType to LongType") {
     // https://github.com/apache/datafusion-comet/issues/1441
     assume(!CometConf.isExperimentalNativeScan)
-    assume(CometSparkSessionExtensions.isSpark33Plus)
     castTest(generateTimestampsExtended(), DataTypes.LongType)
   }
 
@@ -1224,22 +1223,9 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
                     sparkException.getMessage
                       .replace(".WITH_SUGGESTION] ", "]")
                       .startsWith(cometMessage))
-                } else if (CometSparkSessionExtensions.isSpark34Plus) {
+                } else {
                   // for Spark 3.4 we expect to reproduce the error message 
exactly
                   assert(cometMessage == sparkMessage)
-                } else {
-                  // for Spark 3.3 we just need to strip the prefix from the 
Comet message
-                  // before comparing
-                  val cometMessageModified = cometMessage
-                    .replace("[CAST_INVALID_INPUT] ", "")
-                    .replace("[CAST_OVERFLOW] ", "")
-                    .replace("[NUMERIC_VALUE_OUT_OF_RANGE] ", "")
-
-                  if (sparkMessage.contains("cannot be represented as")) {
-                    assert(cometMessage.contains("cannot be represented as"))
-                  } else {
-                    assert(cometMessageModified == sparkMessage)
-                  }
                 }
               }
           }
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 7e8e34bfc..48da86a74 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
 import org.apache.spark.sql.types.{Decimal, DecimalType}
 
-import org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus, 
isSpark34Plus, isSpark40Plus}
+import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
 
 class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
   import testImplicits._
@@ -71,9 +71,6 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("decimals divide by zero") {
-    // TODO: enable Spark 3.3 tests after supporting decimal divide operation
-    assume(isSpark34Plus)
-
     Seq(true, false).foreach { dictionary =>
       withSQLConf(
         SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false",
@@ -594,7 +591,6 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("date_trunc with format array") {
-    assume(isSpark33Plus, "TimestampNTZ is supported in Spark 3.3+, See 
SPARK-36182")
     withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
       val numRows = 1000
       Seq(true, false).foreach { dictionaryEnabled =>
@@ -998,9 +994,6 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("decimals arithmetic and comparison") {
-    // TODO: enable Spark 3.3 tests after supporting decimal reminder operation
-    assume(isSpark34Plus)
-
     def makeDecimalRDD(num: Int, decimal: DecimalType, useDictionary: 
Boolean): DataFrame = {
       val div = if (useDictionary) 5 else num // narrow the space to make it 
dictionary encoded
       spark
@@ -1072,7 +1065,6 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("scalar decimal arithmetic operations") {
-    assume(isSpark34Plus)
     withTable("tbl") {
       withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
         sql("CREATE TABLE tbl (a INT) USING PARQUET")
@@ -1737,7 +1729,6 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("Decimal binary ops multiply is aligned to Spark") {
-    assume(isSpark34Plus)
     Seq(true, false).foreach { allowPrecisionLoss =>
       withSQLConf(
         "spark.sql.decimalOperations.allowPrecisionLoss" -> 
allowPrecisionLoss.toString) {
@@ -1795,7 +1786,6 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("Decimal random number tests") {
-    assume(isSpark34Plus) // Only Spark 3.4+ has the fix for SPARK-45786
     val rand = scala.util.Random
     def makeNum(p: Int, s: Int): String = {
       val int1 = rand.nextLong()
@@ -1860,7 +1850,6 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("explain comet") {
-    assume(isSpark34Plus)
     withSQLConf(
       SQLConf.ANSI_ENABLED.key -> "false",
       SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
@@ -2716,17 +2705,15 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
                   | from tbl1 t1 join tbl2 t2 on t1._id = t2._id
                   | order by t1._id""".stripMargin)
 
-              if (isSpark34Plus) {
-                // decimal support requires Spark 3.4 or later
-                checkSparkAnswerAndOperator("""
-                    |select
-                    | t1._12 div t2._12, div(t1._12, t2._12),
-                    | t1._15 div t2._15, div(t1._15, t2._15),
-                    | t1._16 div t2._16, div(t1._16, t2._16),
-                    | t1._17 div t2._17, div(t1._17, t2._17)
-                    | from tbl1 t1 join tbl2 t2 on t1._id = t2._id
-                    | order by t1._id""".stripMargin)
-              }
+              // decimal support requires Spark 3.4 or later
+              checkSparkAnswerAndOperator("""
+                  |select
+                  | t1._12 div t2._12, div(t1._12, t2._12),
+                  | t1._15 div t2._15, div(t1._15, t2._15),
+                  | t1._16 div t2._16, div(t1._16, t2._16),
+                  | t1._17 div t2._17, div(t1._17, t2._17)
+                  | from tbl1 t1 join tbl2 t2 on t1._id = t2._id
+                  | order by t1._id""".stripMargin)
             }
           }
         }
@@ -2735,8 +2722,6 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("test integral divide overflow for decimal") {
-    // decimal support requires Spark 3.4 or later
-    assume(isSpark34Plus)
     if (isSpark40Plus) {
       Seq(true, false)
     } else
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index 3215a984e..2f957b606 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.functions.{count_distinct, sum}
 import org.apache.spark.sql.internal.SQLConf
 
 import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
 import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
 
 /**
@@ -884,9 +883,6 @@ class CometAggregateSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("final decimal avg") {
-    // TODO: enable decimal average for Spark 3.3
-    assume(isSpark34Plus)
-
     withSQLConf(
       CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
       CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true",
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index c02c9ce5e..3b4a78690 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -49,7 +49,7 @@ import 
org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
 import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.comet.{CometConf, ExtendedExplainInfo}
-import org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus, 
isSpark34Plus, isSpark35Plus, isSpark40Plus}
+import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus, 
isSpark40Plus}
 import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
 
 class CometExecSuite extends CometTestBase {
@@ -260,7 +260,6 @@ class CometExecSuite extends CometTestBase {
   }
 
   test("fix CometNativeExec.doCanonicalize for ReusedExchangeExec") {
-    assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 
3.4+")
     withSQLConf(
       CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true",
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
@@ -290,7 +289,6 @@ class CometExecSuite extends CometTestBase {
   }
 
   test("ReusedExchangeExec should work on CometBroadcastExchangeExec") {
-    assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 
3.4+")
     withSQLConf(
       CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true",
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
@@ -370,7 +368,6 @@ class CometExecSuite extends CometTestBase {
   }
 
   test("Repeated shuffle exchange don't fail") {
-    assume(isSpark33Plus)
     Seq("true", "false").foreach { aqeEnabled =>
       withSQLConf(
         SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled,
@@ -392,7 +389,6 @@ class CometExecSuite extends CometTestBase {
   }
 
   test("try_sum should return null if overflow happens before merging") {
-    assume(isSpark33Plus, "try_sum is available in Spark 3.3+")
     val longDf = Seq(Long.MaxValue, Long.MaxValue, 2).toDF("v")
     val yearMonthDf = Seq(Int.MaxValue, Int.MaxValue, 2)
       .map(Period.ofMonths)
@@ -419,7 +415,6 @@ class CometExecSuite extends CometTestBase {
   }
 
   test("CometBroadcastExchangeExec") {
-    assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 
3.4+")
     withSQLConf(CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true") {
       withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_a") {
         withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_b") {
@@ -445,7 +440,6 @@ class CometExecSuite extends CometTestBase {
   }
 
   test("CometBroadcastExchangeExec: empty broadcast") {
-    assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 
3.4+")
     withSQLConf(CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true") {
       withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_a") {
         withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_b") {
@@ -663,7 +657,6 @@ class CometExecSuite extends CometTestBase {
   }
 
   test("Comet native metrics: BroadcastHashJoin") {
-    assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 
3.4+")
     withParquetTable((0 until 5).map(i => (i, i + 1)), "t1") {
       withParquetTable((0 until 5).map(i => (i, i + 1)), "t2") {
         val df = sql("SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON 
t1._1 = t2._1")
@@ -1585,7 +1578,6 @@ class CometExecSuite extends CometTestBase {
   }
 
   test("Fallback to Spark for TakeOrderedAndProjectExec with offset") {
-    assume(isSpark34Plus)
     Seq("true", "false").foreach(aqeEnabled =>
       withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
         withTable("t1") {
@@ -1702,13 +1694,11 @@ class CometExecSuite extends CometTestBase {
 
                 // Verify that the BatchScanExec nodes supported columnar 
output when requested for Spark 3.4+.
                 // Earlier versions support columnar output for fewer type.
-                if (isSpark34Plus) {
-                  val leaves = df.queryExecution.executedPlan.collectLeaves()
-                  if (parquetVectorized && isSpark34Plus) {
-                    assert(leaves.forall(_.supportsColumnar))
-                  } else {
-                    assert(!leaves.forall(_.supportsColumnar))
-                  }
+                val leaves = df.queryExecution.executedPlan.collectLeaves()
+                if (parquetVectorized) {
+                  assert(leaves.forall(_.supportsColumnar))
+                } else {
+                  assert(!leaves.forall(_.supportsColumnar))
                 }
               }
             }
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
index e68d63ef1..e0a873dd1 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.Decimal
 
 import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
 
 class CometJoinSuite extends CometTestBase {
   import testImplicits._
@@ -75,7 +74,6 @@ class CometJoinSuite extends CometTestBase {
   }
 
   test("Broadcast HashJoin without join filter") {
-    assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 
3.4+")
     withSQLConf(
       CometConf.COMET_BATCH_SIZE.key -> "100",
       SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
@@ -103,7 +101,6 @@ class CometJoinSuite extends CometTestBase {
   }
 
   test("Broadcast HashJoin with join filter") {
-    assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 
3.4+")
     withSQLConf(
       CometConf.COMET_BATCH_SIZE.key -> "100",
       SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
index 329688fc5..5b58befcd 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
@@ -29,7 +29,6 @@ import 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.functions.col
 
 import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
 
 class CometNativeShuffleSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)(implicit
@@ -64,10 +63,6 @@ class CometNativeShuffleSuite extends CometTestBase with 
AdaptiveSparkPlanHelper
           val path = new Path(dir.toURI.toString, "test.parquet")
           makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 
1000)
           var allTypes: Seq[Int] = (1 to 20)
-          if (!isSpark34Plus) {
-            // TODO: Remove this once after 
https://github.com/apache/arrow/issues/40038 is fixed
-            allTypes = allTypes.filterNot(Set(14).contains)
-          }
           allTypes.map(i => s"_$i").foreach { c =>
             withSQLConf(
               CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString,
diff --git 
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala 
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 4099759cd..6e9b731d4 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -46,7 +46,7 @@ import org.apache.spark.unsafe.types.UTF8String
 import com.google.common.primitives.UnsignedLong
 
 import org.apache.comet.{CometConf, CometSparkSessionExtensions}
-import org.apache.comet.CometSparkSessionExtensions.{isSpark34Plus, 
isSpark40Plus, usingDataFusionParquetExec}
+import org.apache.comet.CometSparkSessionExtensions.{isSpark40Plus, 
usingDataFusionParquetExec}
 
 abstract class ParquetReadSuite extends CometTestBase {
   import testImplicits._
@@ -345,45 +345,24 @@ abstract class ParquetReadSuite extends CometTestBase {
         n: Int,
         pageSize: Int): Seq[Option[Int]] = {
       val schemaStr = {
-        if (isSpark34Plus) {
-          """
-            |message root {
-            |  optional boolean                 _1;
-            |  optional int32                   _2(INT_8);
-            |  optional int32                   _3(INT_16);
-            |  optional int32                   _4;
-            |  optional int64                   _5;
-            |  optional float                   _6;
-            |  optional double                  _7;
-            |  optional binary                  _8(UTF8);
-            |  optional int32                   _9(UINT_8);
-            |  optional int32                   _10(UINT_16);
-            |  optional int32                   _11(UINT_32);
-            |  optional int64                   _12(UINT_64);
-            |  optional binary                  _13(ENUM);
-            |  optional FIXED_LEN_BYTE_ARRAY(3) _14;
-            |}
-        """.stripMargin
-        } else {
-          """
-            |message root {
-            |  optional boolean                 _1;
-            |  optional int32                   _2(INT_8);
-            |  optional int32                   _3(INT_16);
-            |  optional int32                   _4;
-            |  optional int64                   _5;
-            |  optional float                   _6;
-            |  optional double                  _7;
-            |  optional binary                  _8(UTF8);
-            |  optional int32                   _9(UINT_8);
-            |  optional int32                   _10(UINT_16);
-            |  optional int32                   _11(UINT_32);
-            |  optional int64                   _12(UINT_64);
-            |  optional binary                  _13(ENUM);
-            |  optional binary                  _14(UTF8);
-            |}
-        """.stripMargin
-        }
+        """
+          |message root {
+          |  optional boolean                 _1;
+          |  optional int32                   _2(INT_8);
+          |  optional int32                   _3(INT_16);
+          |  optional int32                   _4;
+          |  optional int64                   _5;
+          |  optional float                   _6;
+          |  optional double                  _7;
+          |  optional binary                  _8(UTF8);
+          |  optional int32                   _9(UINT_8);
+          |  optional int32                   _10(UINT_16);
+          |  optional int32                   _11(UINT_32);
+          |  optional int64                   _12(UINT_64);
+          |  optional binary                  _13(ENUM);
+          |  optional FIXED_LEN_BYTE_ARRAY(3) _14;
+          |}
+      """.stripMargin
       }
 
       val schema = MessageTypeParser.parseMessageType(schemaStr)
@@ -441,11 +420,7 @@ abstract class ParquetReadSuite extends CometTestBase {
                 Row(null, null, null, null, null, null, null, null, null, 
null, null, null, null,
                   null)
               case Some(i) =>
-                val flba_field = if (isSpark34Plus) {
-                  Array.fill(3)(i % 10 + 48) // char '0' is 48 in ascii
-                } else {
-                  (i % 10).toString * 3
-                }
+                val flba_field = Array.fill(3)(i % 10 + 48) // char '0' is 48 
in ascii
                 Row(
                   i % 2 == 0,
                   i.toByte,
@@ -964,7 +939,6 @@ abstract class ParquetReadSuite extends CometTestBase {
   }
 
   test("FIXED_LEN_BYTE_ARRAY support") {
-    assume(isSpark34Plus)
     Seq(true, false).foreach { dictionaryEnabled =>
       def makeRawParquetFile(path: Path): Unit = {
         val schemaStr =
@@ -1194,7 +1168,7 @@ abstract class ParquetReadSuite extends CometTestBase {
   test("row group skipping doesn't overflow when reading into larger type") {
     // Spark 4.0 no longer fails for widening types SPARK-40876
     // 
https://github.com/apache/spark/commit/3361f25dc0ff6e5233903c26ee105711b79ba967
-    assume(isSpark34Plus && !isSpark40Plus && 
!usingDataFusionParquetExec(conf))
+    assume(!isSpark40Plus && !usingDataFusionParquetExec(conf))
     withTempPath { path =>
       Seq(0).toDF("a").write.parquet(path.toString)
       // Reading integer 'a' as a long isn't supported. Check that an 
exception is raised instead
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
index b9fc56b94..6c5951426 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
@@ -32,7 +32,6 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.TestSparkSession
 
 import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
 import org.apache.comet.shims.ShimCometTPCHQuerySuite
 
 /**
@@ -259,9 +258,6 @@ class CometTPCHQuerySuite extends QueryTest with TPCBase 
with ShimCometTPCHQuery
           s"tpch/$name.sql",
           classLoader = Thread.currentThread().getContextClassLoader)
         test(name) {
-          // Only run the tests in Spark 3.4+
-          assume(isSpark34Plus)
-
           val goldenFile = new File(s"$baseResourcePath", s"$name.sql.out")
           joinConfs.foreach { conf =>
             System.gc() // Workaround for GitHub Actions memory limitation, 
see also SPARK-37368
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 10c5ca210..7bdec4215 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -47,7 +47,6 @@ import org.apache.spark.sql.test._
 import org.apache.spark.sql.types.{DecimalType, StructType}
 
 import org.apache.comet._
-import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
 import org.apache.comet.shims.ShimCometSparkSessionExtensions
 
 /**
@@ -440,114 +439,57 @@ abstract class CometTestBase
       // here: https://github.com/apache/parquet-java/issues/3142
       // here: https://github.com/apache/arrow-rs/issues/7040
       // and here: https://github.com/apache/datafusion-comet/issues/1348
-      if (isSpark34Plus) {
-        """
-         |message root {
-         |  optional boolean                  _1;
-         |  optional int32                    _2(INT_8);
-         |  optional int32                    _3(INT_16);
-         |  optional int32                    _4;
-         |  optional int64                    _5;
-         |  optional float                    _6;
-         |  optional double                   _7;
-         |  optional binary                   _8(UTF8);
-         |  optional int32                    _9(UINT_32);
-         |  optional int32                    _10(UINT_32);
-         |  optional int32                    _11(UINT_32);
-         |  optional int64                    _12(UINT_64);
-         |  optional binary                   _13(ENUM);
-         |  optional FIXED_LEN_BYTE_ARRAY(3)  _14;
-         |  optional int32                    _15(DECIMAL(5, 2));
-         |  optional int64                    _16(DECIMAL(18, 10));
-         |  optional FIXED_LEN_BYTE_ARRAY(16) _17(DECIMAL(38, 37));
-         |  optional INT64                    _18(TIMESTAMP(MILLIS,true));
-         |  optional INT64                    _19(TIMESTAMP(MICROS,true));
-         |  optional INT32                    _20(DATE);
-         |  optional INT32                    _id;
-         |}
-        """.stripMargin
-      } else {
-        """
-         |message root {
-         |  optional boolean                  _1;
-         |  optional int32                    _2(INT_8);
-         |  optional int32                    _3(INT_16);
-         |  optional int32                    _4;
-         |  optional int64                    _5;
-         |  optional float                    _6;
-         |  optional double                   _7;
-         |  optional binary                   _8(UTF8);
-         |  optional int32                    _9(UINT_32);
-         |  optional int32                    _10(UINT_32);
-         |  optional int32                    _11(UINT_32);
-         |  optional int64                    _12(UINT_64);
-         |  optional binary                   _13(ENUM);
-         |  optional binary                   _14(UTF8);
-         |  optional int32                    _15(DECIMAL(5, 2));
-         |  optional int64                    _16(DECIMAL(18, 10));
-         |  optional FIXED_LEN_BYTE_ARRAY(16) _17(DECIMAL(38, 37));
-         |  optional INT64                    _18(TIMESTAMP(MILLIS,true));
-         |  optional INT64                    _19(TIMESTAMP(MICROS,true));
-         |  optional INT32                    _20(DATE);
-         |  optional INT32                    _id;
-         |}
-        """.stripMargin
-      }
+      """
+       |message root {
+       |  optional boolean                  _1;
+       |  optional int32                    _2(INT_8);
+       |  optional int32                    _3(INT_16);
+       |  optional int32                    _4;
+       |  optional int64                    _5;
+       |  optional float                    _6;
+       |  optional double                   _7;
+       |  optional binary                   _8(UTF8);
+       |  optional int32                    _9(UINT_32);
+       |  optional int32                    _10(UINT_32);
+       |  optional int32                    _11(UINT_32);
+       |  optional int64                    _12(UINT_64);
+       |  optional binary                   _13(ENUM);
+       |  optional FIXED_LEN_BYTE_ARRAY(3)  _14;
+       |  optional int32                    _15(DECIMAL(5, 2));
+       |  optional int64                    _16(DECIMAL(18, 10));
+       |  optional FIXED_LEN_BYTE_ARRAY(16) _17(DECIMAL(38, 37));
+       |  optional INT64                    _18(TIMESTAMP(MILLIS,true));
+       |  optional INT64                    _19(TIMESTAMP(MICROS,true));
+       |  optional INT32                    _20(DATE);
+       |  optional INT32                    _id;
+       |}
+      """.stripMargin
     } else {
-
-      if (isSpark34Plus) {
-        """
-         |message root {
-         |  optional boolean                  _1;
-         |  optional int32                    _2(INT_8);
-         |  optional int32                    _3(INT_16);
-         |  optional int32                    _4;
-         |  optional int64                    _5;
-         |  optional float                    _6;
-         |  optional double                   _7;
-         |  optional binary                   _8(UTF8);
-         |  optional int32                    _9(UINT_8);
-         |  optional int32                    _10(UINT_16);
-         |  optional int32                    _11(UINT_32);
-         |  optional int64                    _12(UINT_64);
-         |  optional binary                   _13(ENUM);
-         |  optional FIXED_LEN_BYTE_ARRAY(3)  _14;
-         |  optional int32                    _15(DECIMAL(5, 2));
-         |  optional int64                    _16(DECIMAL(18, 10));
-         |  optional FIXED_LEN_BYTE_ARRAY(16) _17(DECIMAL(38, 37));
-         |  optional INT64                    _18(TIMESTAMP(MILLIS,true));
-         |  optional INT64                    _19(TIMESTAMP(MICROS,true));
-         |  optional INT32                    _20(DATE);
-         |  optional INT32                    _id;
-         |}
-        """.stripMargin
-      } else {
-        """
-         |message root {
-         |  optional boolean                  _1;
-         |  optional int32                    _2(INT_8);
-         |  optional int32                    _3(INT_16);
-         |  optional int32                    _4;
-         |  optional int64                    _5;
-         |  optional float                    _6;
-         |  optional double                   _7;
-         |  optional binary                   _8(UTF8);
-         |  optional int32                    _9(UINT_8);
-         |  optional int32                    _10(UINT_16);
-         |  optional int32                    _11(UINT_32);
-         |  optional int64                    _12(UINT_64);
-         |  optional binary                   _13(ENUM);
-         |  optional binary                   _14(UTF8);
-         |  optional int32                    _15(DECIMAL(5, 2));
-         |  optional int64                    _16(DECIMAL(18, 10));
-         |  optional FIXED_LEN_BYTE_ARRAY(16) _17(DECIMAL(38, 37));
-         |  optional INT64                    _18(TIMESTAMP(MILLIS,true));
-         |  optional INT64                    _19(TIMESTAMP(MICROS,true));
-         |  optional INT32                    _20(DATE);
-         |  optional INT32                    _id;
-         |}
-        """.stripMargin
-      }
+      """
+       |message root {
+       |  optional boolean                  _1;
+       |  optional int32                    _2(INT_8);
+       |  optional int32                    _3(INT_16);
+       |  optional int32                    _4;
+       |  optional int64                    _5;
+       |  optional float                    _6;
+       |  optional double                   _7;
+       |  optional binary                   _8(UTF8);
+       |  optional int32                    _9(UINT_8);
+       |  optional int32                    _10(UINT_16);
+       |  optional int32                    _11(UINT_32);
+       |  optional int64                    _12(UINT_64);
+       |  optional binary                   _13(ENUM);
+       |  optional FIXED_LEN_BYTE_ARRAY(3)  _14;
+       |  optional int32                    _15(DECIMAL(5, 2));
+       |  optional int64                    _16(DECIMAL(18, 10));
+       |  optional FIXED_LEN_BYTE_ARRAY(16) _17(DECIMAL(38, 37));
+       |  optional INT64                    _18(TIMESTAMP(MILLIS,true));
+       |  optional INT64                    _19(TIMESTAMP(MICROS,true));
+       |  optional INT32                    _20(DATE);
+       |  optional INT32                    _id;
+       |}
+      """.stripMargin
     }
   }
 
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
index f4c310d8f..2855e553c 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.TestSparkSession
 
 import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.{isSpark34Plus, 
isSpark35Plus, isSpark40Plus}
+import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus, 
isSpark40Plus}
 
 /**
  * Similar to [[org.apache.spark.sql.PlanStabilitySuite]], checks that TPC-DS 
Comet plans don't
@@ -255,9 +255,6 @@ trait CometPlanStabilitySuite extends 
DisableAdaptiveExecutionSuite with TPCDSBa
    * matches a golden file or it will create a new golden file.
    */
   protected def testQuery(tpcdsGroup: String, query: String, suffix: String = 
""): Unit = {
-    // Only run the tests in Spark 3.4+
-    assume(isSpark34Plus)
-
     val queryString = resourceToString(
       s"$tpcdsGroup/$query.sql",
       classLoader = Thread.currentThread().getContextClassLoader)
diff --git 
a/spark/src/test/spark-3.3-plus/org/apache/comet/CometExpression3_3PlusSuite.scala
 
b/spark/src/test/spark-3.3-plus/org/apache/comet/CometExpression3_3PlusSuite.scala
deleted file mode 100644
index d54c8dad0..000000000
--- 
a/spark/src/test/spark-3.3-plus/org/apache/comet/CometExpression3_3PlusSuite.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet
-
-import org.apache.spark.sql.{Column, CometTestBase}
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.spark.sql.catalyst.FunctionIdentifier
-import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain, 
Expression, ExpressionInfo}
-import org.apache.spark.sql.functions.{col, lit}
-import org.apache.spark.util.sketch.BloomFilter
-import java.io.ByteArrayOutputStream
-import scala.util.Random
-
-class CometExpression3_3PlusSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
-  import testImplicits._
-
-  val func_might_contain = new FunctionIdentifier("might_contain")
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    // Register 'might_contain' to builtin.
-    spark.sessionState.functionRegistry.registerFunction(func_might_contain,
-      new ExpressionInfo(classOf[BloomFilterMightContain].getName, 
"might_contain"),
-      (children: Seq[Expression]) => BloomFilterMightContain(children.head, 
children(1)))
-  }
-
-  override def afterAll(): Unit = {
-    spark.sessionState.functionRegistry.dropFunction(func_might_contain)
-    super.afterAll()
-  }
-
-  test("test BloomFilterMightContain can take a constant value input") {
-    val table = "test"
-
-    withTable(table) {
-      sql(s"create table $table(col1 long, col2 int) using parquet")
-      sql(s"insert into $table values (201, 1)")
-      checkSparkAnswerAndOperator(
-        s"""
-          |SELECT might_contain(
-          
|X'00000001000000050000000343A2EC6EA8C117E2D3CDB767296B144FC5BFBCED9737F267', 
col1) FROM $table
-          |""".stripMargin)
-    }
-  }
-
-  test("test NULL inputs for BloomFilterMightContain") {
-    val table = "test"
-
-    withTable(table) {
-      sql(s"create table $table(col1 long, col2 int) using parquet")
-      sql(s"insert into $table values (201, 1), (null, 2)")
-      checkSparkAnswerAndOperator(
-        s"""
-           |SELECT might_contain(null, null) both_null,
-           |       might_contain(null, 1L) null_bf,
-           |       might_contain(
-           |         
X'00000001000000050000000343A2EC6EA8C117E2D3CDB767296B144FC5BFBCED9737F267', 
col1) null_value
-           |       FROM $table
-           |""".stripMargin)
-    }
-  }
-
-  test("test BloomFilterMightContain from random input") {
-    val (longs, bfBytes) = bloomFilterFromRandomInput(10000, 10000)
-    val table = "test"
-
-    withTable(table) {
-      sql(s"create table $table(col1 long, col2 binary) using parquet")
-      spark.createDataset(longs).map(x => (x, bfBytes)).toDF("col1", 
"col2").write.insertInto(table)
-      val df = spark.table(table).select(new 
Column(BloomFilterMightContain(lit(bfBytes).expr, col("col1").expr)))
-      checkSparkAnswerAndOperator(df)
-      // check with scalar subquery
-      checkSparkAnswerAndOperator(
-        s"""
-          |SELECT might_contain((select first(col2) as col2 from $table), 
col1) FROM $table
-          |""".stripMargin)
-    }
-  }
-
-  private def bloomFilterFromRandomInput(expectedItems: Long, expectedBits: 
Long): (Seq[Long], Array[Byte]) = {
-    val bf = BloomFilter.create(expectedItems, expectedBits)
-    val longs = (0 until expectedItems.toInt).map(_ => Random.nextLong())
-    longs.foreach(bf.put)
-    val os = new ByteArrayOutputStream()
-    bf.writeTo(os)
-    (longs, os.toByteArray)
-  }
-}
diff --git 
a/spark/src/test/spark-3.4-plus/org/apache/comet/exec/CometExec3_4PlusSuite.scala
 
b/spark/src/test/spark-3.4-plus/org/apache/comet/exec/CometExec3_4PlusSuite.scala
index 764f7b18d..931f6900a 100644
--- 
a/spark/src/test/spark-3.4-plus/org/apache/comet/exec/CometExec3_4PlusSuite.scala
+++ 
b/spark/src/test/spark-3.4-plus/org/apache/comet/exec/CometExec3_4PlusSuite.scala
@@ -19,18 +19,37 @@
 
 package org.apache.comet.exec
 
+import java.io.ByteArrayOutputStream
+import scala.util.Random
+import org.apache.spark.sql.{Column, CometTestBase}
+import org.apache.comet.CometConf
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain, 
Expression, ExpressionInfo}
+import org.apache.spark.sql.functions.{col, lit}
+import org.apache.spark.util.sketch.BloomFilter
 import org.scalactic.source.Position
 import org.scalatest.Tag
-
-import org.apache.spark.sql.CometTestBase
-import org.apache.comet.CometConf
-
 /**
  * This test suite contains tests for only Spark 3.4+.
  */
 class CometExec3_4PlusSuite extends CometTestBase {
   import testImplicits._
 
+  val func_might_contain = new FunctionIdentifier("might_contain")
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    // Register 'might_contain' to builtin.
+    spark.sessionState.functionRegistry.registerFunction(func_might_contain,
+      new ExpressionInfo(classOf[BloomFilterMightContain].getName, 
"might_contain"),
+      (children: Seq[Expression]) => BloomFilterMightContain(children.head, 
children(1)))
+  }
+
+  override def afterAll(): Unit = {
+    spark.sessionState.functionRegistry.dropFunction(func_might_contain)
+    super.afterAll()
+  }
+
   override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)(implicit
       pos: Position): Unit = {
     super.test(testName, testTags: _*) {
@@ -100,4 +119,62 @@ class CometExec3_4PlusSuite extends CometTestBase {
       checkSparkAnswer(mapData.toDF().offset(99))
     }
   }
+
+  test("test BloomFilterMightContain can take a constant value input") {
+    val table = "test"
+
+    withTable(table) {
+      sql(s"create table $table(col1 long, col2 int) using parquet")
+      sql(s"insert into $table values (201, 1)")
+      checkSparkAnswerAndOperator(
+        s"""
+           |SELECT might_contain(
+           
|X'00000001000000050000000343A2EC6EA8C117E2D3CDB767296B144FC5BFBCED9737F267', 
col1) FROM $table
+           |""".stripMargin)
+    }
+  }
+
+  test("test NULL inputs for BloomFilterMightContain") {
+    val table = "test"
+
+    withTable(table) {
+      sql(s"create table $table(col1 long, col2 int) using parquet")
+      sql(s"insert into $table values (201, 1), (null, 2)")
+      checkSparkAnswerAndOperator(
+        s"""
+           |SELECT might_contain(null, null) both_null,
+           |       might_contain(null, 1L) null_bf,
+           |       might_contain(
+           |         
X'00000001000000050000000343A2EC6EA8C117E2D3CDB767296B144FC5BFBCED9737F267', 
col1) null_value
+           |       FROM $table
+           |""".stripMargin)
+    }
+  }
+
+  test("test BloomFilterMightContain from random input") {
+    val (longs, bfBytes) = bloomFilterFromRandomInput(10000, 10000)
+    val table = "test"
+
+    withTable(table) {
+      sql(s"create table $table(col1 long, col2 binary) using parquet")
+      spark.createDataset(longs).map(x => (x, bfBytes)).toDF("col1", 
"col2").write.insertInto(table)
+      val df = spark.table(table).select(new 
Column(BloomFilterMightContain(lit(bfBytes).expr, col("col1").expr)))
+      checkSparkAnswerAndOperator(df)
+      // check with scalar subquery
+      checkSparkAnswerAndOperator(
+        s"""
+           |SELECT might_contain((select first(col2) as col2 from $table), 
col1) FROM $table
+           |""".stripMargin)
+    }
+  }
+
+  private def bloomFilterFromRandomInput(expectedItems: Long, expectedBits: 
Long): (Seq[Long], Array[Byte]) = {
+    val bf = BloomFilter.create(expectedItems, expectedBits)
+    val longs = (0 until expectedItems.toInt).map(_ => Random.nextLong())
+    longs.foreach(bf.put)
+    val os = new ByteArrayOutputStream()
+    bf.writeTo(os)
+    (longs, os.toByteArray)
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to