This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new be346c46e chore: Drop support for Spark 3.3 (EOL) (#1529)
be346c46e is described below
commit be346c46e9b2c36b3b949cdd639a9c71794e07f9
Author: Andy Grove <[email protected]>
AuthorDate: Fri Mar 14 13:40:21 2025 -0600
chore: Drop support for Spark 3.3 (EOL) (#1529)
---
.github/workflows/pr_build.yml | 5 +-
.../org/apache/comet/shims/ShimBatchReader.scala | 36 -----
.../comet/shims/ShimCastOverflowException.scala | 32 -----
.../spark/sql/comet/shims/ShimTaskMetrics.scala | 29 ----
pom.xml | 14 --
spark/pom.xml | 1 -
.../apache/comet/CometSparkSessionExtensions.scala | 16 +--
.../org/apache/comet/parquet/ParquetFilters.scala | 18 +--
.../org/apache/comet/serde/QueryPlanSerde.scala | 51 ++-----
.../main/scala/org/apache/comet/serde/arrays.scala | 4 +-
.../org/apache/comet/shims/CometExprShim.scala | 40 ------
.../org/apache/comet/shims/CometExprShim.scala | 6 -
.../org/apache/comet/shims/CometExprShim.scala | 6 -
.../org/apache/comet/shims/CometExprShim.scala | 6 -
.../apache/comet/CometArrayExpressionSuite.scala | 6 +-
.../scala/org/apache/comet/CometCastSuite.scala | 16 +--
.../org/apache/comet/CometExpressionSuite.scala | 35 ++---
.../apache/comet/exec/CometAggregateSuite.scala | 4 -
.../org/apache/comet/exec/CometExecSuite.scala | 22 +--
.../org/apache/comet/exec/CometJoinSuite.scala | 3 -
.../comet/exec/CometNativeShuffleSuite.scala | 5 -
.../apache/comet/parquet/ParquetReadSuite.scala | 68 +++------
.../org/apache/spark/sql/CometTPCHQuerySuite.scala | 4 -
.../scala/org/apache/spark/sql/CometTestBase.scala | 158 +++++++--------------
.../spark/sql/comet/CometPlanStabilitySuite.scala | 5 +-
.../apache/comet/CometExpression3_3PlusSuite.scala | 105 --------------
.../apache/comet/exec/CometExec3_4PlusSuite.scala | 85 ++++++++++-
27 files changed, 194 insertions(+), 586 deletions(-)
diff --git a/.github/workflows/pr_build.yml b/.github/workflows/pr_build.yml
index 5418320f5..255fc47c2 100644
--- a/.github/workflows/pr_build.yml
+++ b/.github/workflows/pr_build.yml
@@ -145,7 +145,7 @@ jobs:
os: [ubuntu-latest]
java_version: [8, 11, 17]
test-target: [java]
- spark-version: ['3.3', '3.4']
+ spark-version: ['3.4']
scala-version: ['2.12', '2.13']
fail-fast: false
name: ${{ matrix.os }}/java ${{ matrix.java_version
}}-spark-${{matrix.spark-version}}-scala-${{matrix.scala-version}}/${{
matrix.test-target }}
@@ -283,11 +283,10 @@ jobs:
matrix:
java_version: [8, 17]
test-target: [java]
- spark-version: ['3.3', '3.4']
+ spark-version: ['3.4']
scala-version: ['2.12', '2.13']
exclude:
- java_version: 8
- spark-version: '3.3'
fail-fast: false
name: macos-14(Silicon)/java ${{ matrix.java_version
}}-spark-${{matrix.spark-version}}-scala-${{matrix.scala-version}}/${{
matrix.test-target }}
runs-on: macos-14
diff --git
a/common/src/main/spark-3.3/org/apache/comet/shims/ShimBatchReader.scala
b/common/src/main/spark-3.3/org/apache/comet/shims/ShimBatchReader.scala
deleted file mode 100644
index 3cbca896f..000000000
--- a/common/src/main/spark-3.3/org/apache/comet/shims/ShimBatchReader.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-
-object ShimBatchReader {
-
- def newPartitionedFile(partitionValues: InternalRow, file: String):
PartitionedFile =
- PartitionedFile(
- partitionValues,
- file,
- -1, // -1 means we read the entire file
- -1,
- Array.empty[String],
- 0,
- 0)
-}
diff --git
a/common/src/main/spark-3.3/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
b/common/src/main/spark-3.3/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
deleted file mode 100644
index 55373309b..000000000
---
a/common/src/main/spark-3.3/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.spark.SparkArithmeticException
-import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLConf
-import org.apache.spark.sql.internal.SQLConf
-
-// TODO: Only the Spark 3.3 version of this class is different from the others.
-// Remove this class after dropping Spark 3.3 support.
-class ShimCastOverflowException(t: String, from: String, to: String)
- extends SparkArithmeticException(
- "CAST_OVERFLOW",
- Array(t, s""""$from"""", s""""$to"""", toSQLConf(SQLConf.ANSI_ENABLED.key))
- ) {}
diff --git
a/common/src/main/spark-3.3/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala
b/common/src/main/spark-3.3/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala
deleted file mode 100644
index 5b2a5fb5b..000000000
---
a/common/src/main/spark-3.3/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.AccumulatorV2
-
-object ShimTaskMetrics {
-
- def getTaskAccumulator(taskMetrics: TaskMetrics): Option[AccumulatorV2[_,
_]] =
- taskMetrics.externalAccums.lastOption
-}
diff --git a/pom.xml b/pom.xml
index e236c1dd7..d5fec779c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,7 +95,6 @@ under the License.
-Djdk.reflect.useDirectMethodHandle=false
</extraJavaTestArgs>
<argLine>-ea -Xmx4g -Xss4m ${extraJavaTestArgs}</argLine>
- <additional.3_3.test.source>spark-3.3-plus</additional.3_3.test.source>
<additional.3_4.test.source>spark-3.4-plus</additional.3_4.test.source>
<additional.3_5.test.source>not-needed</additional.3_5.test.source>
<additional.pre35.test.source>spark-pre-3.5</additional.pre35.test.source>
@@ -548,19 +547,6 @@ under the License.
</properties>
</profile>
- <profile>
- <id>spark-3.3</id>
- <properties>
- <scala.version>2.12.15</scala.version>
- <spark.version>3.3.2</spark.version>
- <spark.version.short>3.3</spark.version.short>
- <parquet.version>1.12.0</parquet.version>
- <slf4j.version>1.7.32</slf4j.version>
- <additional.3_4.test.source>not-needed-yet</additional.3_4.test.source>
- <shims.minorVerSrc>spark-3.3</shims.minorVerSrc>
- </properties>
- </profile>
-
<profile>
<id>spark-3.4</id>
<properties>
diff --git a/spark/pom.xml b/spark/pom.xml
index 46cc1c3c1..d37cbe2be 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -260,7 +260,6 @@ under the License.
</goals>
<configuration>
<sources>
- <source>src/test/${additional.3_3.test.source}</source>
<source>src/test/${additional.3_4.test.source}</source>
<source>src/test/${additional.3_5.test.source}</source>
<source>src/test/${additional.pre35.test.source}</source>
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 14af8ded9..613b3ac00 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -53,7 +53,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType}
import org.apache.comet.CometConf._
import org.apache.comet.CometExplainInfo.getActualPlan
-import org.apache.comet.CometSparkSessionExtensions.{createMessage,
getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason,
isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled,
isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode,
isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus,
isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos}
+import org.apache.comet.CometSparkSessionExtensions.{createMessage,
getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason,
isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled,
isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode,
isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark40Plus,
shouldApplySparkToColumnar, withInfo, withInfos}
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
import org.apache.comet.rules.RewriteJoin
import org.apache.comet.serde.OperatorOuterClass.Operator
@@ -747,8 +747,7 @@ class CometSparkSessionExtensions
val newChildren = plan.children.map {
case b: BroadcastExchangeExec
if isCometNative(b.child) &&
- CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED.get(conf) &&
- isSpark34Plus => // Spark 3.4+ only
+ CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED.get(conf) =>
QueryPlanSerde.operator2Proto(b) match {
case Some(nativeOp) =>
val cometOp = CometBroadcastExchangeExec(b, b.output,
b.child)
@@ -1235,8 +1234,6 @@ object CometSparkSessionExtensions extends Logging {
Some(
s"${COMET_EXEC_BROADCAST_EXCHANGE_ENABLED.key}.enabled is not
specified and " +
s"${COMET_EXEC_BROADCAST_FORCE_ENABLED.key} is not specified")
- } else if (!isSpark34Plus) {
- Some("Native broadcast requires Spark 3.4 or newer")
} else {
None
}
@@ -1335,15 +1332,6 @@ object CometSparkSessionExtensions extends Logging {
}
}
- def isSpark33Plus: Boolean = {
- org.apache.spark.SPARK_VERSION >= "3.3"
- }
-
- /** Used for operations that are available in Spark 3.4+ */
- def isSpark34Plus: Boolean = {
- org.apache.spark.SPARK_VERSION >= "3.4"
- }
-
def isSpark35Plus: Boolean = {
org.apache.spark.SPARK_VERSION >= "3.5"
}
diff --git a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
index bcb23986f..30d7804e8 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
@@ -41,7 +41,6 @@ import
org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulia
import org.apache.spark.sql.sources
import org.apache.spark.unsafe.types.UTF8String
-import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
import org.apache.comet.shims.ShimSQLConf
/**
@@ -620,17 +619,12 @@ class ParquetFilters(
value == null || (nameToParquetField(name).fieldType match {
case ParquetBooleanType => value.isInstanceOf[JBoolean]
case ParquetByteType | ParquetShortType | ParquetIntegerType =>
- if (isSpark34Plus) {
- value match {
- // Byte/Short/Int are all stored as INT32 in Parquet so filters
are built using type
- // Int. We don't create a filter if the value would overflow.
- case _: JByte | _: JShort | _: Integer => true
- case v: JLong => v.longValue() >= Int.MinValue && v.longValue() <=
Int.MaxValue
- case _ => false
- }
- } else {
- // If not Spark 3.4+, we still following the old behavior as Spark
does.
- value.isInstanceOf[Number]
+ value match {
+ // Byte/Short/Int are all stored as INT32 in Parquet so filters are
built using type
+ // Int. We don't create a filter if the value would overflow.
+ case _: JByte | _: JShort | _: Integer => true
+ case v: JLong => v.longValue() >= Int.MinValue && v.longValue() <=
Int.MaxValue
+ case _ => false
}
case ParquetLongType => value.isInstanceOf[JLong]
case ParquetFloatType => value.isInstanceOf[JFloat]
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index a224cd6d3..a1cb5a732 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -46,7 +46,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.{isCometScan,
isSpark34Plus, withInfo}
+import org.apache.comet.CometSparkSessionExtensions.{isCometScan, withInfo}
import org.apache.comet.expressions._
import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType =>
ProtoDataType, Expr, ScalarFunc}
import org.apache.comet.serde.ExprOuterClass.DataType._
@@ -63,10 +63,9 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
def supportedDataType(dt: DataType, allowStruct: Boolean = false): Boolean =
dt match {
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _:
FloatType |
- _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _:
DecimalType |
- _: DateType | _: BooleanType | _: NullType =>
+ _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _:
TimestampNTZType |
+ _: DecimalType | _: DateType | _: BooleanType | _: NullType =>
true
- case dt if isTimestampNTZType(dt) => true
case s: StructType if allowStruct =>
s.fields.map(_.dataType).forall(supportedDataType(_, allowStruct))
case dt =>
@@ -92,7 +91,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde
with CometExprShim
case _: BinaryType => 8
case _: TimestampType => 9
case _: DecimalType => 10
- case dt if isTimestampNTZType(dt) => 11
+ case _: TimestampNTZType => 11
case _: DateType => 12
case _: NullType => 13
case _: ArrayType => 14
@@ -585,8 +584,7 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
withInfo(sub, s"Unsupported datatype ${left.dataType}")
None
- case mul @ Multiply(left, right, _)
- if supportedDataType(left.dataType) &&
!decimalBeforeSpark34(left.dataType) =>
+ case mul @ Multiply(left, right, _) if supportedDataType(left.dataType)
=>
createMathExpression(
expr,
left,
@@ -601,13 +599,9 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
if (!supportedDataType(left.dataType)) {
withInfo(mul, s"Unsupported datatype ${left.dataType}")
}
- if (decimalBeforeSpark34(left.dataType)) {
- withInfo(mul, "Decimal support requires Spark 3.4 or later")
- }
None
- case div @ Divide(left, right, _)
- if supportedDataType(left.dataType) &&
!decimalBeforeSpark34(left.dataType) =>
+ case div @ Divide(left, right, _) if supportedDataType(left.dataType) =>
// Datafusion now throws an exception for dividing by zero
// See https://github.com/apache/arrow-datafusion/pull/6792
// For now, use NullIf to swap zeros with nulls.
@@ -627,13 +621,9 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
if (!supportedDataType(left.dataType)) {
withInfo(div, s"Unsupported datatype ${left.dataType}")
}
- if (decimalBeforeSpark34(left.dataType)) {
- withInfo(div, "Decimal support requires Spark 3.4 or later")
- }
None
- case div @ IntegralDivide(left, right, _)
- if supportedDataType(left.dataType) &&
!decimalBeforeSpark34(left.dataType) =>
+ case div @ IntegralDivide(left, right, _) if
supportedDataType(left.dataType) =>
val rightExpr = nullIfWhenPrimitive(right)
val dataType = (left.dataType, right.dataType) match {
@@ -680,13 +670,9 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
if (!supportedDataType(left.dataType)) {
withInfo(div, s"Unsupported datatype ${left.dataType}")
}
- if (decimalBeforeSpark34(left.dataType)) {
- withInfo(div, "Decimal support requires Spark 3.4 or later")
- }
None
- case rem @ Remainder(left, right, _)
- if supportedDataType(left.dataType) &&
!decimalBeforeSpark34(left.dataType) =>
+ case rem @ Remainder(left, right, _) if supportedDataType(left.dataType)
=>
val rightExpr = nullIfWhenPrimitive(right)
createMathExpression(
@@ -703,9 +689,6 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
if (!supportedDataType(left.dataType)) {
withInfo(rem, s"Unsupported datatype ${left.dataType}")
}
- if (decimalBeforeSpark34(left.dataType)) {
- withInfo(rem, "Decimal support requires Spark 3.4 or later")
- }
None
case EqualTo(left, right) =>
@@ -798,6 +781,7 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
case _: StringType =>
exprBuilder.setStringVal(value.asInstanceOf[UTF8String].toString)
case _: TimestampType =>
exprBuilder.setLongVal(value.asInstanceOf[Long])
+ case _: TimestampNTZType =>
exprBuilder.setLongVal(value.asInstanceOf[Long])
case _: DecimalType =>
// Pass decimal literal as bytes.
val unscaled =
value.asInstanceOf[Decimal].toBigDecimal.underlying.unscaledValue
@@ -808,8 +792,6 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
com.google.protobuf.ByteString.copyFrom(value.asInstanceOf[Array[Byte]])
exprBuilder.setBytesVal(byteStr)
case _: DateType => exprBuilder.setIntVal(value.asInstanceOf[Int])
- case dt if isTimestampNTZType(dt) =>
- exprBuilder.setLongVal(value.asInstanceOf[Long])
case dt =>
logWarning(s"Unexpected date type '$dt' for literal value
'$value'")
}
@@ -2242,7 +2224,7 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _:
FloatType |
_: DoubleType | _: StringType | _: DateType | _: DecimalType | _:
BooleanType =>
true
- case dt if isTimestampNTZType(dt) => true
+ case TimestampNTZType => true
case _ => false
}
@@ -2804,16 +2786,6 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
}
}
- /**
- * Checks whether `dt` is a decimal type AND whether Spark version is before
3.4
- */
- private def decimalBeforeSpark34(dt: DataType): Boolean = {
- !isSpark34Plus && (dt match {
- case _: DecimalType => true
- case _ => false
- })
- }
-
/**
* Check if the datatypes of shuffle input are supported. This is used for
Columnar shuffle
* which supports struct/array.
@@ -2953,9 +2925,8 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
val canSort = sortOrder.head.dataType match {
case _: BooleanType => true
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _:
FloatType |
- _: DoubleType | _: TimestampType | _: DecimalType | _: DateType =>
+ _: DoubleType | _: TimestampType | _: TimestampType | _:
DecimalType | _: DateType =>
true
- case dt if isTimestampNTZType(dt) => true
case _: BinaryType | _: StringType => true
case ArrayType(elementType, _) => canRank(elementType)
case _ => false
diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala
b/spark/src/main/scala/org/apache/comet/serde/arrays.scala
index 1ada81b97..60df51b08 100644
--- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala
@@ -33,9 +33,9 @@ object CometArrayRemove extends CometExpressionSerde with
CometExprShim {
import DataTypes._
dt match {
case BooleanType | ByteType | ShortType | IntegerType | LongType |
FloatType | DoubleType |
- _: DecimalType | DateType | TimestampType | StringType | BinaryType
=>
+ _: DecimalType | DateType | TimestampType | TimestampNTZType |
StringType |
+ BinaryType =>
true
- case t if isTimestampNTZType(t) => true
case ArrayType(elementType, _) => isTypeSupported(elementType)
case _: StructType =>
// https://github.com/apache/datafusion-comet/issues/1307
diff --git
a/spark/src/main/spark-3.3/org/apache/comet/shims/CometExprShim.scala
b/spark/src/main/spark-3.3/org/apache/comet/shims/CometExprShim.scala
deleted file mode 100644
index aa6db06d8..000000000
--- a/spark/src/main/spark-3.3/org/apache/comet/shims/CometExprShim.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.comet.shims
-
-import org.apache.comet.expressions.CometEvalMode
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.DataType
-
-/**
- * `CometExprShim` acts as a shim for for parsing expressions from different
Spark versions.
- */
-trait CometExprShim {
- /**
- * Returns a tuple of expressions for the `unhex` function.
- */
- protected def unhexSerde(unhex: Unhex): (Expression, Expression) = {
- (unhex.child, Literal(false))
- }
-
- protected def isTimestampNTZType(dt: DataType): Boolean =
- dt.typeName == "timestamp_ntz" // `TimestampNTZType` is private
-
- protected def evalMode(c: Cast): CometEvalMode.Value =
CometEvalMode.fromBoolean(c.ansiEnabled)
-}
diff --git
a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala
b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala
index 7709957b4..5f4e3fba2 100644
--- a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala
+++ b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala
@@ -20,7 +20,6 @@ package org.apache.comet.shims
import org.apache.comet.expressions.CometEvalMode
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{DataType, TimestampNTZType}
/**
* `CometExprShim` acts as a shim for for parsing expressions from different
Spark versions.
@@ -33,11 +32,6 @@ trait CometExprShim {
(unhex.child, Literal(unhex.failOnError))
}
- protected def isTimestampNTZType(dt: DataType): Boolean = dt match {
- case _: TimestampNTZType => true
- case _ => false
- }
-
protected def evalMode(c: Cast): CometEvalMode.Value =
CometEvalModeUtil.fromSparkEvalMode(c.evalMode)
}
diff --git
a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala
b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala
index 7709957b4..5f4e3fba2 100644
--- a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala
+++ b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala
@@ -20,7 +20,6 @@ package org.apache.comet.shims
import org.apache.comet.expressions.CometEvalMode
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{DataType, TimestampNTZType}
/**
* `CometExprShim` acts as a shim for for parsing expressions from different
Spark versions.
@@ -33,11 +32,6 @@ trait CometExprShim {
(unhex.child, Literal(unhex.failOnError))
}
- protected def isTimestampNTZType(dt: DataType): Boolean = dt match {
- case _: TimestampNTZType => true
- case _ => false
- }
-
protected def evalMode(c: Cast): CometEvalMode.Value =
CometEvalModeUtil.fromSparkEvalMode(c.evalMode)
}
diff --git
a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala
b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala
index 7709957b4..5f4e3fba2 100644
--- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala
+++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala
@@ -20,7 +20,6 @@ package org.apache.comet.shims
import org.apache.comet.expressions.CometEvalMode
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{DataType, TimestampNTZType}
/**
* `CometExprShim` acts as a shim for for parsing expressions from different
Spark versions.
@@ -33,11 +32,6 @@ trait CometExprShim {
(unhex.child, Literal(unhex.failOnError))
}
- protected def isTimestampNTZType(dt: DataType): Boolean = dt match {
- case _: TimestampNTZType => true
- case _ => false
- }
-
protected def evalMode(c: Cast): CometEvalMode.Value =
CometEvalModeUtil.fromSparkEvalMode(c.evalMode)
}
diff --git
a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
index f8d709dc6..8b3299dc9 100644
--- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
@@ -28,7 +28,7 @@ import
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions.{array, col, expr, lit, udf}
import org.apache.spark.sql.types.StructType
-import org.apache.comet.CometSparkSessionExtensions.{isSpark34Plus,
isSpark35Plus}
+import org.apache.comet.CometSparkSessionExtensions.isSpark35Plus
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
class CometArrayExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
@@ -135,7 +135,6 @@ class CometArrayExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelp
}
test("array_append") {
- assume(isSpark34Plus)
withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
@@ -182,7 +181,6 @@ class CometArrayExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelp
}
test("ArrayInsert") {
- assume(isSpark34Plus)
Seq(true, false).foreach(dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
@@ -206,7 +204,6 @@ class CometArrayExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelp
test("ArrayInsertUnsupportedArgs") {
// This test checks that the else branch in ArrayInsert
// mapping to the comet is valid and fallback to spark is working fine.
- assume(isSpark34Plus)
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllTypes(path, dictionaryEnabled = false, 10000)
@@ -296,7 +293,6 @@ class CometArrayExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelp
}
test("array_compact") {
- assume(isSpark34Plus)
withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
index 208aef534..7531a9b9a 100644
--- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
@@ -902,7 +902,6 @@ class CometCastSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("cast TimestampType to LongType") {
// https://github.com/apache/datafusion-comet/issues/1441
assume(!CometConf.isExperimentalNativeScan)
- assume(CometSparkSessionExtensions.isSpark33Plus)
castTest(generateTimestampsExtended(), DataTypes.LongType)
}
@@ -1224,22 +1223,9 @@ class CometCastSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
sparkException.getMessage
.replace(".WITH_SUGGESTION] ", "]")
.startsWith(cometMessage))
- } else if (CometSparkSessionExtensions.isSpark34Plus) {
+ } else {
// for Spark 3.4 we expect to reproduce the error message
exactly
assert(cometMessage == sparkMessage)
- } else {
- // for Spark 3.3 we just need to strip the prefix from the
Comet message
- // before comparing
- val cometMessageModified = cometMessage
- .replace("[CAST_INVALID_INPUT] ", "")
- .replace("[CAST_OVERFLOW] ", "")
- .replace("[NUMERIC_VALUE_OUT_OF_RANGE] ", "")
-
- if (sparkMessage.contains("cannot be represented as")) {
- assert(cometMessage.contains("cannot be represented as"))
- } else {
- assert(cometMessageModified == sparkMessage)
- }
}
}
}
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 7e8e34bfc..48da86a74 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
import org.apache.spark.sql.types.{Decimal, DecimalType}
-import org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus,
isSpark34Plus, isSpark40Plus}
+import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
import testImplicits._
@@ -71,9 +71,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("decimals divide by zero") {
- // TODO: enable Spark 3.3 tests after supporting decimal divide operation
- assume(isSpark34Plus)
-
Seq(true, false).foreach { dictionary =>
withSQLConf(
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false",
@@ -594,7 +591,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("date_trunc with format array") {
- assume(isSpark33Plus, "TimestampNTZ is supported in Spark 3.3+, See
SPARK-36182")
withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
val numRows = 1000
Seq(true, false).foreach { dictionaryEnabled =>
@@ -998,9 +994,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("decimals arithmetic and comparison") {
- // TODO: enable Spark 3.3 tests after supporting decimal reminder operation
- assume(isSpark34Plus)
-
def makeDecimalRDD(num: Int, decimal: DecimalType, useDictionary:
Boolean): DataFrame = {
val div = if (useDictionary) 5 else num // narrow the space to make it
dictionary encoded
spark
@@ -1072,7 +1065,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("scalar decimal arithmetic operations") {
- assume(isSpark34Plus)
withTable("tbl") {
withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
sql("CREATE TABLE tbl (a INT) USING PARQUET")
@@ -1737,7 +1729,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("Decimal binary ops multiply is aligned to Spark") {
- assume(isSpark34Plus)
Seq(true, false).foreach { allowPrecisionLoss =>
withSQLConf(
"spark.sql.decimalOperations.allowPrecisionLoss" ->
allowPrecisionLoss.toString) {
@@ -1795,7 +1786,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("Decimal random number tests") {
- assume(isSpark34Plus) // Only Spark 3.4+ has the fix for SPARK-45786
val rand = scala.util.Random
def makeNum(p: Int, s: Int): String = {
val int1 = rand.nextLong()
@@ -1860,7 +1850,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("explain comet") {
- assume(isSpark34Plus)
withSQLConf(
SQLConf.ANSI_ENABLED.key -> "false",
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
@@ -2716,17 +2705,15 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
| from tbl1 t1 join tbl2 t2 on t1._id = t2._id
| order by t1._id""".stripMargin)
- if (isSpark34Plus) {
- // decimal support requires Spark 3.4 or later
- checkSparkAnswerAndOperator("""
- |select
- | t1._12 div t2._12, div(t1._12, t2._12),
- | t1._15 div t2._15, div(t1._15, t2._15),
- | t1._16 div t2._16, div(t1._16, t2._16),
- | t1._17 div t2._17, div(t1._17, t2._17)
- | from tbl1 t1 join tbl2 t2 on t1._id = t2._id
- | order by t1._id""".stripMargin)
- }
+ // decimal support requires Spark 3.4 or later
+ checkSparkAnswerAndOperator("""
+ |select
+ | t1._12 div t2._12, div(t1._12, t2._12),
+ | t1._15 div t2._15, div(t1._15, t2._15),
+ | t1._16 div t2._16, div(t1._16, t2._16),
+ | t1._17 div t2._17, div(t1._17, t2._17)
+ | from tbl1 t1 join tbl2 t2 on t1._id = t2._id
+ | order by t1._id""".stripMargin)
}
}
}
@@ -2735,8 +2722,6 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("test integral divide overflow for decimal") {
- // decimal support requires Spark 3.4 or later
- assume(isSpark34Plus)
if (isSpark40Plus) {
Seq(true, false)
} else
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index 3215a984e..2f957b606 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.functions.{count_distinct, sum}
import org.apache.spark.sql.internal.SQLConf
import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
/**
@@ -884,9 +883,6 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
test("final decimal avg") {
- // TODO: enable decimal average for Spark 3.3
- assume(isSpark34Plus)
-
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true",
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index c02c9ce5e..3b4a78690 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -49,7 +49,7 @@ import
org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
import org.apache.spark.unsafe.types.UTF8String
import org.apache.comet.{CometConf, ExtendedExplainInfo}
-import org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus,
isSpark34Plus, isSpark35Plus, isSpark40Plus}
+import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus,
isSpark40Plus}
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
class CometExecSuite extends CometTestBase {
@@ -260,7 +260,6 @@ class CometExecSuite extends CometTestBase {
}
test("fix CometNativeExec.doCanonicalize for ReusedExchangeExec") {
- assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark
3.4+")
withSQLConf(
CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
@@ -290,7 +289,6 @@ class CometExecSuite extends CometTestBase {
}
test("ReusedExchangeExec should work on CometBroadcastExchangeExec") {
- assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark
3.4+")
withSQLConf(
CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
@@ -370,7 +368,6 @@ class CometExecSuite extends CometTestBase {
}
test("Repeated shuffle exchange don't fail") {
- assume(isSpark33Plus)
Seq("true", "false").foreach { aqeEnabled =>
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled,
@@ -392,7 +389,6 @@ class CometExecSuite extends CometTestBase {
}
test("try_sum should return null if overflow happens before merging") {
- assume(isSpark33Plus, "try_sum is available in Spark 3.3+")
val longDf = Seq(Long.MaxValue, Long.MaxValue, 2).toDF("v")
val yearMonthDf = Seq(Int.MaxValue, Int.MaxValue, 2)
.map(Period.ofMonths)
@@ -419,7 +415,6 @@ class CometExecSuite extends CometTestBase {
}
test("CometBroadcastExchangeExec") {
- assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark
3.4+")
withSQLConf(CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_a") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_b") {
@@ -445,7 +440,6 @@ class CometExecSuite extends CometTestBase {
}
test("CometBroadcastExchangeExec: empty broadcast") {
- assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark
3.4+")
withSQLConf(CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_a") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_b") {
@@ -663,7 +657,6 @@ class CometExecSuite extends CometTestBase {
}
test("Comet native metrics: BroadcastHashJoin") {
- assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark
3.4+")
withParquetTable((0 until 5).map(i => (i, i + 1)), "t1") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "t2") {
val df = sql("SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON
t1._1 = t2._1")
@@ -1585,7 +1578,6 @@ class CometExecSuite extends CometTestBase {
}
test("Fallback to Spark for TakeOrderedAndProjectExec with offset") {
- assume(isSpark34Plus)
Seq("true", "false").foreach(aqeEnabled =>
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
withTable("t1") {
@@ -1702,13 +1694,11 @@ class CometExecSuite extends CometTestBase {
// Verify that the BatchScanExec nodes supported columnar
output when requested for Spark 3.4+.
// Earlier versions support columnar output for fewer type.
- if (isSpark34Plus) {
- val leaves = df.queryExecution.executedPlan.collectLeaves()
- if (parquetVectorized && isSpark34Plus) {
- assert(leaves.forall(_.supportsColumnar))
- } else {
- assert(!leaves.forall(_.supportsColumnar))
- }
+ val leaves = df.queryExecution.executedPlan.collectLeaves()
+ if (parquetVectorized) {
+ assert(leaves.forall(_.supportsColumnar))
+ } else {
+ assert(!leaves.forall(_.supportsColumnar))
}
}
}
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
index e68d63ef1..e0a873dd1 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.Decimal
import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
class CometJoinSuite extends CometTestBase {
import testImplicits._
@@ -75,7 +74,6 @@ class CometJoinSuite extends CometTestBase {
}
test("Broadcast HashJoin without join filter") {
- assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark
3.4+")
withSQLConf(
CometConf.COMET_BATCH_SIZE.key -> "100",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
@@ -103,7 +101,6 @@ class CometJoinSuite extends CometTestBase {
}
test("Broadcast HashJoin with join filter") {
- assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark
3.4+")
withSQLConf(
CometConf.COMET_BATCH_SIZE.key -> "100",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
index 329688fc5..5b58befcd 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
@@ -29,7 +29,6 @@ import
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions.col
import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
class CometNativeShuffleSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)(implicit
@@ -64,10 +63,6 @@ class CometNativeShuffleSuite extends CometTestBase with
AdaptiveSparkPlanHelper
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled,
1000)
var allTypes: Seq[Int] = (1 to 20)
- if (!isSpark34Plus) {
- // TODO: Remove this once after
https://github.com/apache/arrow/issues/40038 is fixed
- allTypes = allTypes.filterNot(Set(14).contains)
- }
allTypes.map(i => s"_$i").foreach { c =>
withSQLConf(
CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString,
diff --git
a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index 4099759cd..6e9b731d4 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -46,7 +46,7 @@ import org.apache.spark.unsafe.types.UTF8String
import com.google.common.primitives.UnsignedLong
import org.apache.comet.{CometConf, CometSparkSessionExtensions}
-import org.apache.comet.CometSparkSessionExtensions.{isSpark34Plus,
isSpark40Plus, usingDataFusionParquetExec}
+import org.apache.comet.CometSparkSessionExtensions.{isSpark40Plus,
usingDataFusionParquetExec}
abstract class ParquetReadSuite extends CometTestBase {
import testImplicits._
@@ -345,45 +345,24 @@ abstract class ParquetReadSuite extends CometTestBase {
n: Int,
pageSize: Int): Seq[Option[Int]] = {
val schemaStr = {
- if (isSpark34Plus) {
- """
- |message root {
- | optional boolean _1;
- | optional int32 _2(INT_8);
- | optional int32 _3(INT_16);
- | optional int32 _4;
- | optional int64 _5;
- | optional float _6;
- | optional double _7;
- | optional binary _8(UTF8);
- | optional int32 _9(UINT_8);
- | optional int32 _10(UINT_16);
- | optional int32 _11(UINT_32);
- | optional int64 _12(UINT_64);
- | optional binary _13(ENUM);
- | optional FIXED_LEN_BYTE_ARRAY(3) _14;
- |}
- """.stripMargin
- } else {
- """
- |message root {
- | optional boolean _1;
- | optional int32 _2(INT_8);
- | optional int32 _3(INT_16);
- | optional int32 _4;
- | optional int64 _5;
- | optional float _6;
- | optional double _7;
- | optional binary _8(UTF8);
- | optional int32 _9(UINT_8);
- | optional int32 _10(UINT_16);
- | optional int32 _11(UINT_32);
- | optional int64 _12(UINT_64);
- | optional binary _13(ENUM);
- | optional binary _14(UTF8);
- |}
- """.stripMargin
- }
+ """
+ |message root {
+ | optional boolean _1;
+ | optional int32 _2(INT_8);
+ | optional int32 _3(INT_16);
+ | optional int32 _4;
+ | optional int64 _5;
+ | optional float _6;
+ | optional double _7;
+ | optional binary _8(UTF8);
+ | optional int32 _9(UINT_8);
+ | optional int32 _10(UINT_16);
+ | optional int32 _11(UINT_32);
+ | optional int64 _12(UINT_64);
+ | optional binary _13(ENUM);
+ | optional FIXED_LEN_BYTE_ARRAY(3) _14;
+ |}
+ """.stripMargin
}
val schema = MessageTypeParser.parseMessageType(schemaStr)
@@ -441,11 +420,7 @@ abstract class ParquetReadSuite extends CometTestBase {
Row(null, null, null, null, null, null, null, null, null,
null, null, null, null,
null)
case Some(i) =>
- val flba_field = if (isSpark34Plus) {
- Array.fill(3)(i % 10 + 48) // char '0' is 48 in ascii
- } else {
- (i % 10).toString * 3
- }
+ val flba_field = Array.fill(3)(i % 10 + 48) // char '0' is 48
in ascii
Row(
i % 2 == 0,
i.toByte,
@@ -964,7 +939,6 @@ abstract class ParquetReadSuite extends CometTestBase {
}
test("FIXED_LEN_BYTE_ARRAY support") {
- assume(isSpark34Plus)
Seq(true, false).foreach { dictionaryEnabled =>
def makeRawParquetFile(path: Path): Unit = {
val schemaStr =
@@ -1194,7 +1168,7 @@ abstract class ParquetReadSuite extends CometTestBase {
test("row group skipping doesn't overflow when reading into larger type") {
// Spark 4.0 no longer fails for widening types SPARK-40876
//
https://github.com/apache/spark/commit/3361f25dc0ff6e5233903c26ee105711b79ba967
- assume(isSpark34Plus && !isSpark40Plus &&
!usingDataFusionParquetExec(conf))
+ assume(!isSpark40Plus && !usingDataFusionParquetExec(conf))
withTempPath { path =>
Seq(0).toDF("a").write.parquet(path.toString)
// Reading integer 'a' as a long isn't supported. Check that an
exception is raised instead
diff --git
a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
index b9fc56b94..6c5951426 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
@@ -32,7 +32,6 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.TestSparkSession
import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
import org.apache.comet.shims.ShimCometTPCHQuerySuite
/**
@@ -259,9 +258,6 @@ class CometTPCHQuerySuite extends QueryTest with TPCBase
with ShimCometTPCHQuery
s"tpch/$name.sql",
classLoader = Thread.currentThread().getContextClassLoader)
test(name) {
- // Only run the tests in Spark 3.4+
- assume(isSpark34Plus)
-
val goldenFile = new File(s"$baseResourcePath", s"$name.sql.out")
joinConfs.foreach { conf =>
System.gc() // Workaround for GitHub Actions memory limitation,
see also SPARK-37368
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 10c5ca210..7bdec4215 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -47,7 +47,6 @@ import org.apache.spark.sql.test._
import org.apache.spark.sql.types.{DecimalType, StructType}
import org.apache.comet._
-import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
import org.apache.comet.shims.ShimCometSparkSessionExtensions
/**
@@ -440,114 +439,57 @@ abstract class CometTestBase
// here: https://github.com/apache/parquet-java/issues/3142
// here: https://github.com/apache/arrow-rs/issues/7040
// and here: https://github.com/apache/datafusion-comet/issues/1348
- if (isSpark34Plus) {
- """
- |message root {
- | optional boolean _1;
- | optional int32 _2(INT_8);
- | optional int32 _3(INT_16);
- | optional int32 _4;
- | optional int64 _5;
- | optional float _6;
- | optional double _7;
- | optional binary _8(UTF8);
- | optional int32 _9(UINT_32);
- | optional int32 _10(UINT_32);
- | optional int32 _11(UINT_32);
- | optional int64 _12(UINT_64);
- | optional binary _13(ENUM);
- | optional FIXED_LEN_BYTE_ARRAY(3) _14;
- | optional int32 _15(DECIMAL(5, 2));
- | optional int64 _16(DECIMAL(18, 10));
- | optional FIXED_LEN_BYTE_ARRAY(16) _17(DECIMAL(38, 37));
- | optional INT64 _18(TIMESTAMP(MILLIS,true));
- | optional INT64 _19(TIMESTAMP(MICROS,true));
- | optional INT32 _20(DATE);
- | optional INT32 _id;
- |}
- """.stripMargin
- } else {
- """
- |message root {
- | optional boolean _1;
- | optional int32 _2(INT_8);
- | optional int32 _3(INT_16);
- | optional int32 _4;
- | optional int64 _5;
- | optional float _6;
- | optional double _7;
- | optional binary _8(UTF8);
- | optional int32 _9(UINT_32);
- | optional int32 _10(UINT_32);
- | optional int32 _11(UINT_32);
- | optional int64 _12(UINT_64);
- | optional binary _13(ENUM);
- | optional binary _14(UTF8);
- | optional int32 _15(DECIMAL(5, 2));
- | optional int64 _16(DECIMAL(18, 10));
- | optional FIXED_LEN_BYTE_ARRAY(16) _17(DECIMAL(38, 37));
- | optional INT64 _18(TIMESTAMP(MILLIS,true));
- | optional INT64 _19(TIMESTAMP(MICROS,true));
- | optional INT32 _20(DATE);
- | optional INT32 _id;
- |}
- """.stripMargin
- }
+ """
+ |message root {
+ | optional boolean _1;
+ | optional int32 _2(INT_8);
+ | optional int32 _3(INT_16);
+ | optional int32 _4;
+ | optional int64 _5;
+ | optional float _6;
+ | optional double _7;
+ | optional binary _8(UTF8);
+ | optional int32 _9(UINT_32);
+ | optional int32 _10(UINT_32);
+ | optional int32 _11(UINT_32);
+ | optional int64 _12(UINT_64);
+ | optional binary _13(ENUM);
+ | optional FIXED_LEN_BYTE_ARRAY(3) _14;
+ | optional int32 _15(DECIMAL(5, 2));
+ | optional int64 _16(DECIMAL(18, 10));
+ | optional FIXED_LEN_BYTE_ARRAY(16) _17(DECIMAL(38, 37));
+ | optional INT64 _18(TIMESTAMP(MILLIS,true));
+ | optional INT64 _19(TIMESTAMP(MICROS,true));
+ | optional INT32 _20(DATE);
+ | optional INT32 _id;
+ |}
+ """.stripMargin
} else {
-
- if (isSpark34Plus) {
- """
- |message root {
- | optional boolean _1;
- | optional int32 _2(INT_8);
- | optional int32 _3(INT_16);
- | optional int32 _4;
- | optional int64 _5;
- | optional float _6;
- | optional double _7;
- | optional binary _8(UTF8);
- | optional int32 _9(UINT_8);
- | optional int32 _10(UINT_16);
- | optional int32 _11(UINT_32);
- | optional int64 _12(UINT_64);
- | optional binary _13(ENUM);
- | optional FIXED_LEN_BYTE_ARRAY(3) _14;
- | optional int32 _15(DECIMAL(5, 2));
- | optional int64 _16(DECIMAL(18, 10));
- | optional FIXED_LEN_BYTE_ARRAY(16) _17(DECIMAL(38, 37));
- | optional INT64 _18(TIMESTAMP(MILLIS,true));
- | optional INT64 _19(TIMESTAMP(MICROS,true));
- | optional INT32 _20(DATE);
- | optional INT32 _id;
- |}
- """.stripMargin
- } else {
- """
- |message root {
- | optional boolean _1;
- | optional int32 _2(INT_8);
- | optional int32 _3(INT_16);
- | optional int32 _4;
- | optional int64 _5;
- | optional float _6;
- | optional double _7;
- | optional binary _8(UTF8);
- | optional int32 _9(UINT_8);
- | optional int32 _10(UINT_16);
- | optional int32 _11(UINT_32);
- | optional int64 _12(UINT_64);
- | optional binary _13(ENUM);
- | optional binary _14(UTF8);
- | optional int32 _15(DECIMAL(5, 2));
- | optional int64 _16(DECIMAL(18, 10));
- | optional FIXED_LEN_BYTE_ARRAY(16) _17(DECIMAL(38, 37));
- | optional INT64 _18(TIMESTAMP(MILLIS,true));
- | optional INT64 _19(TIMESTAMP(MICROS,true));
- | optional INT32 _20(DATE);
- | optional INT32 _id;
- |}
- """.stripMargin
- }
+ """
+ |message root {
+ | optional boolean _1;
+ | optional int32 _2(INT_8);
+ | optional int32 _3(INT_16);
+ | optional int32 _4;
+ | optional int64 _5;
+ | optional float _6;
+ | optional double _7;
+ | optional binary _8(UTF8);
+ | optional int32 _9(UINT_8);
+ | optional int32 _10(UINT_16);
+ | optional int32 _11(UINT_32);
+ | optional int64 _12(UINT_64);
+ | optional binary _13(ENUM);
+ | optional FIXED_LEN_BYTE_ARRAY(3) _14;
+ | optional int32 _15(DECIMAL(5, 2));
+ | optional int64 _16(DECIMAL(18, 10));
+ | optional FIXED_LEN_BYTE_ARRAY(16) _17(DECIMAL(38, 37));
+ | optional INT64 _18(TIMESTAMP(MILLIS,true));
+ | optional INT64 _19(TIMESTAMP(MICROS,true));
+ | optional INT32 _20(DATE);
+ | optional INT32 _id;
+ |}
+ """.stripMargin
}
}
diff --git
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
index f4c310d8f..2855e553c 100644
---
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.TestSparkSession
import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.{isSpark34Plus,
isSpark35Plus, isSpark40Plus}
+import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus,
isSpark40Plus}
/**
* Similar to [[org.apache.spark.sql.PlanStabilitySuite]], checks that TPC-DS
Comet plans don't
@@ -255,9 +255,6 @@ trait CometPlanStabilitySuite extends
DisableAdaptiveExecutionSuite with TPCDSBa
* matches a golden file or it will create a new golden file.
*/
protected def testQuery(tpcdsGroup: String, query: String, suffix: String =
""): Unit = {
- // Only run the tests in Spark 3.4+
- assume(isSpark34Plus)
-
val queryString = resourceToString(
s"$tpcdsGroup/$query.sql",
classLoader = Thread.currentThread().getContextClassLoader)
diff --git
a/spark/src/test/spark-3.3-plus/org/apache/comet/CometExpression3_3PlusSuite.scala
b/spark/src/test/spark-3.3-plus/org/apache/comet/CometExpression3_3PlusSuite.scala
deleted file mode 100644
index d54c8dad0..000000000
---
a/spark/src/test/spark-3.3-plus/org/apache/comet/CometExpression3_3PlusSuite.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet
-
-import org.apache.spark.sql.{Column, CometTestBase}
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.spark.sql.catalyst.FunctionIdentifier
-import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain,
Expression, ExpressionInfo}
-import org.apache.spark.sql.functions.{col, lit}
-import org.apache.spark.util.sketch.BloomFilter
-import java.io.ByteArrayOutputStream
-import scala.util.Random
-
-class CometExpression3_3PlusSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
- import testImplicits._
-
- val func_might_contain = new FunctionIdentifier("might_contain")
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- // Register 'might_contain' to builtin.
- spark.sessionState.functionRegistry.registerFunction(func_might_contain,
- new ExpressionInfo(classOf[BloomFilterMightContain].getName,
"might_contain"),
- (children: Seq[Expression]) => BloomFilterMightContain(children.head,
children(1)))
- }
-
- override def afterAll(): Unit = {
- spark.sessionState.functionRegistry.dropFunction(func_might_contain)
- super.afterAll()
- }
-
- test("test BloomFilterMightContain can take a constant value input") {
- val table = "test"
-
- withTable(table) {
- sql(s"create table $table(col1 long, col2 int) using parquet")
- sql(s"insert into $table values (201, 1)")
- checkSparkAnswerAndOperator(
- s"""
- |SELECT might_contain(
-
|X'00000001000000050000000343A2EC6EA8C117E2D3CDB767296B144FC5BFBCED9737F267',
col1) FROM $table
- |""".stripMargin)
- }
- }
-
- test("test NULL inputs for BloomFilterMightContain") {
- val table = "test"
-
- withTable(table) {
- sql(s"create table $table(col1 long, col2 int) using parquet")
- sql(s"insert into $table values (201, 1), (null, 2)")
- checkSparkAnswerAndOperator(
- s"""
- |SELECT might_contain(null, null) both_null,
- | might_contain(null, 1L) null_bf,
- | might_contain(
- |
X'00000001000000050000000343A2EC6EA8C117E2D3CDB767296B144FC5BFBCED9737F267',
col1) null_value
- | FROM $table
- |""".stripMargin)
- }
- }
-
- test("test BloomFilterMightContain from random input") {
- val (longs, bfBytes) = bloomFilterFromRandomInput(10000, 10000)
- val table = "test"
-
- withTable(table) {
- sql(s"create table $table(col1 long, col2 binary) using parquet")
- spark.createDataset(longs).map(x => (x, bfBytes)).toDF("col1",
"col2").write.insertInto(table)
- val df = spark.table(table).select(new
Column(BloomFilterMightContain(lit(bfBytes).expr, col("col1").expr)))
- checkSparkAnswerAndOperator(df)
- // check with scalar subquery
- checkSparkAnswerAndOperator(
- s"""
- |SELECT might_contain((select first(col2) as col2 from $table),
col1) FROM $table
- |""".stripMargin)
- }
- }
-
- private def bloomFilterFromRandomInput(expectedItems: Long, expectedBits:
Long): (Seq[Long], Array[Byte]) = {
- val bf = BloomFilter.create(expectedItems, expectedBits)
- val longs = (0 until expectedItems.toInt).map(_ => Random.nextLong())
- longs.foreach(bf.put)
- val os = new ByteArrayOutputStream()
- bf.writeTo(os)
- (longs, os.toByteArray)
- }
-}
diff --git
a/spark/src/test/spark-3.4-plus/org/apache/comet/exec/CometExec3_4PlusSuite.scala
b/spark/src/test/spark-3.4-plus/org/apache/comet/exec/CometExec3_4PlusSuite.scala
index 764f7b18d..931f6900a 100644
---
a/spark/src/test/spark-3.4-plus/org/apache/comet/exec/CometExec3_4PlusSuite.scala
+++
b/spark/src/test/spark-3.4-plus/org/apache/comet/exec/CometExec3_4PlusSuite.scala
@@ -19,18 +19,37 @@
package org.apache.comet.exec
+import java.io.ByteArrayOutputStream
+import scala.util.Random
+import org.apache.spark.sql.{Column, CometTestBase}
+import org.apache.comet.CometConf
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain,
Expression, ExpressionInfo}
+import org.apache.spark.sql.functions.{col, lit}
+import org.apache.spark.util.sketch.BloomFilter
import org.scalactic.source.Position
import org.scalatest.Tag
-
-import org.apache.spark.sql.CometTestBase
-import org.apache.comet.CometConf
-
/**
* This test suite contains tests for only Spark 3.4+.
*/
class CometExec3_4PlusSuite extends CometTestBase {
import testImplicits._
+ val func_might_contain = new FunctionIdentifier("might_contain")
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ // Register 'might_contain' to builtin.
+ spark.sessionState.functionRegistry.registerFunction(func_might_contain,
+ new ExpressionInfo(classOf[BloomFilterMightContain].getName,
"might_contain"),
+ (children: Seq[Expression]) => BloomFilterMightContain(children.head,
children(1)))
+ }
+
+ override def afterAll(): Unit = {
+ spark.sessionState.functionRegistry.dropFunction(func_might_contain)
+ super.afterAll()
+ }
+
override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any)(implicit
pos: Position): Unit = {
super.test(testName, testTags: _*) {
@@ -100,4 +119,62 @@ class CometExec3_4PlusSuite extends CometTestBase {
checkSparkAnswer(mapData.toDF().offset(99))
}
}
+
+ test("test BloomFilterMightContain can take a constant value input") {
+ val table = "test"
+
+ withTable(table) {
+ sql(s"create table $table(col1 long, col2 int) using parquet")
+ sql(s"insert into $table values (201, 1)")
+ checkSparkAnswerAndOperator(
+ s"""
+ |SELECT might_contain(
+
|X'00000001000000050000000343A2EC6EA8C117E2D3CDB767296B144FC5BFBCED9737F267',
col1) FROM $table
+ |""".stripMargin)
+ }
+ }
+
+ test("test NULL inputs for BloomFilterMightContain") {
+ val table = "test"
+
+ withTable(table) {
+ sql(s"create table $table(col1 long, col2 int) using parquet")
+ sql(s"insert into $table values (201, 1), (null, 2)")
+ checkSparkAnswerAndOperator(
+ s"""
+ |SELECT might_contain(null, null) both_null,
+ | might_contain(null, 1L) null_bf,
+ | might_contain(
+ |
X'00000001000000050000000343A2EC6EA8C117E2D3CDB767296B144FC5BFBCED9737F267',
col1) null_value
+ | FROM $table
+ |""".stripMargin)
+ }
+ }
+
+ test("test BloomFilterMightContain from random input") {
+ val (longs, bfBytes) = bloomFilterFromRandomInput(10000, 10000)
+ val table = "test"
+
+ withTable(table) {
+ sql(s"create table $table(col1 long, col2 binary) using parquet")
+ spark.createDataset(longs).map(x => (x, bfBytes)).toDF("col1",
"col2").write.insertInto(table)
+ val df = spark.table(table).select(new
Column(BloomFilterMightContain(lit(bfBytes).expr, col("col1").expr)))
+ checkSparkAnswerAndOperator(df)
+ // check with scalar subquery
+ checkSparkAnswerAndOperator(
+ s"""
+ |SELECT might_contain((select first(col2) as col2 from $table),
col1) FROM $table
+ |""".stripMargin)
+ }
+ }
+
+ private def bloomFilterFromRandomInput(expectedItems: Long, expectedBits:
Long): (Seq[Long], Array[Byte]) = {
+ val bf = BloomFilter.create(expectedItems, expectedBits)
+ val longs = (0 until expectedItems.toInt).map(_ => Random.nextLong())
+ longs.foreach(bf.put)
+ val os = new ByteArrayOutputStream()
+ bf.writeTo(os)
+ (longs, os.toByteArray)
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]