This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new b8954029c chore: [FOLLOWUP] Drop support for Spark 3.3 (EOL) (#1534)
b8954029c is described below
commit b8954029c9d205d10206df8772c8677752e92155
Author: KAZUYUKI TANIMURA <[email protected]>
AuthorDate: Mon Mar 17 11:13:02 2025 -0700
chore: [FOLLOWUP] Drop support for Spark 3.3 (EOL) (#1534)
---
common/pom.xml | 1 -
.../apache/comet/parquet/ConstantColumnReader.java | 7 +-
.../spark/sql/comet/CastOverflowException.scala | 14 +++-
.../org/apache/comet/shims/ShimFileFormat.scala} | 14 ++--
.../comet/shims/ShimCastOverflowException.scala | 37 -----------
.../org/apache/comet/shims/ShimFileFormat.scala | 20 +++---
.../comet/shims/ShimCastOverflowException.scala | 37 -----------
.../org/apache/comet/shims/ShimFileFormat.scala | 50 --------------
.../comet/shims/ShimResolveDefaultColumns.scala | 38 -----------
.../comet/shims/ShimCastOverflowException.scala | 37 -----------
dev/release/build-release-comet.sh | 2 -
docs/source/user-guide/installation.md | 3 -
pom.xml | 9 ---
spark/pom.xml | 4 --
.../apache/comet/CometSparkSessionExtensions.scala | 9 ++-
.../comet/parquet/CometParquetFileFormat.scala | 2 +-
.../CometParquetPartitionReaderFactory.scala | 2 +-
.../org/apache/comet/serde/QueryPlanSerde.scala | 4 +-
.../scala/org/apache/comet/serde/aggregates.scala | 6 +-
.../src/main/scala/org/apache/spark/Plugins.scala | 4 +-
.../spark/sql/comet/CometBatchScanExec.scala | 5 +-
.../sql/comet/CometBroadcastExchangeExec.scala | 22 ++-----
.../org/apache/spark/sql/comet/CometScanExec.scala | 3 +-
.../sql/comet/CometTakeOrderedAndProjectExec.scala | 7 +-
.../shuffle/CometShuffleExchangeExec.scala | 15 -----
.../org/apache/spark/sql/comet/operators.scala | 15 +++--
.../comet/plans/AliasAwareOutputExpression.scala | 64 ++----------------
.../PartitioningPreservingUnaryExecNode.scala | 76 ----------------------
.../org/apache/comet/shims/ShimSQLConf.scala} | 7 +-
.../spark/sql/comet/shims/ShimCometScanExec.scala | 55 +++-------------
.../org/apache/comet/shims/ShimSQLConf.scala | 9 ---
.../spark/sql/comet/shims/ShimCometScanExec.scala | 4 --
.../comet/shims/ShimCometBatchScanExec.scala | 33 ----------
.../shims/ShimCometBroadcastHashJoinExec.scala | 39 -----------
.../comet/shims/ShimCometShuffleExchangeExec.scala | 2 +-
.../shims/ShimCometSparkSessionExtensions.scala | 14 +---
.../apache/comet/shims/ShimQueryPlanSerde.scala | 18 -----
.../shims/ShimCometBroadcastExchangeExec.scala | 51 ---------------
.../spark/comet/shims/ShimCometDriverPlugin.scala | 7 --
.../comet/shims/ShimCometBatchScanExec.scala | 29 ---------
.../shims/ShimCometBroadcastHashJoinExec.scala | 31 ---------
.../shims/ShimCometSparkSessionExtensions.scala | 6 +-
.../apache/comet/shims/ShimQueryPlanSerde.scala | 5 +-
.../org/apache/comet/shims/ShimSQLConf.scala | 4 --
.../shims/ShimCometBroadcastExchangeExec.scala | 30 ---------
.../spark/comet/shims/ShimCometDriverPlugin.scala | 4 --
.../spark/sql/comet/shims/ShimCometScanExec.scala | 4 --
.../org/apache/comet/shims/ShimSQLConf.scala | 46 -------------
.../scala/org/apache/comet/CometCastSuite.scala | 1 -
.../org/apache/comet/CometExpressionSuite.scala | 46 ++++++-------
.../apache/comet/exec/CometExec3_4PlusSuite.scala | 38 +++++++----
.../org/apache/comet/exec/CometExecSuite.scala | 3 +-
.../org/apache/spark/sql/CometTPCHQuerySuite.scala | 8 +--
.../spark/sql/comet/CometPlanStabilitySuite.scala | 6 +-
.../comet/shims/ShimCometTPCHQuerySuite.scala | 0
55 files changed, 137 insertions(+), 870 deletions(-)
diff --git a/common/pom.xml b/common/pom.xml
index 52939dd7d..9b0927b40 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -181,7 +181,6 @@ under the License.
<sources>
<source>src/main/${shims.majorVerSrc}</source>
<source>src/main/${shims.minorVerSrc}</source>
- <source>src/main/${shims.pre35Src}</source>
</sources>
</configuration>
</execution>
diff --git
a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java
b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java
index 5fd348eeb..e010f8ab7 100644
--- a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java
@@ -23,11 +23,10 @@ import java.math.BigInteger;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.types.UTF8String;
-import org.apache.comet.shims.ShimResolveDefaultColumns;
-
/**
* A column reader that always return constant vectors. Used for reading
partition columns, for
* instance.
@@ -41,7 +40,9 @@ public class ConstantColumnReader extends
MetadataColumnReader {
public ConstantColumnReader(StructField field, int batchSize, boolean
useDecimal128) {
this(field.dataType(), TypeUtil.convertToParquet(field), batchSize,
useDecimal128);
- this.value = ShimResolveDefaultColumns.getExistenceDefaultValue(field);
+ this.value =
+ ResolveDefaultColumns.getExistenceDefaultValues(new StructType(new
StructField[] {field}))[
+ 0];
init(value);
}
diff --git
a/common/src/main/scala/org/apache/spark/sql/comet/CastOverflowException.scala
b/common/src/main/scala/org/apache/spark/sql/comet/CastOverflowException.scala
index a5d674de2..ccd525939 100644
---
a/common/src/main/scala/org/apache/spark/sql/comet/CastOverflowException.scala
+++
b/common/src/main/scala/org/apache/spark/sql/comet/CastOverflowException.scala
@@ -19,7 +19,17 @@
package org.apache.spark.sql.comet
-import org.apache.spark.sql.comet.shims.ShimCastOverflowException
+import org.apache.spark.SparkArithmeticException
+import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLConf
+import org.apache.spark.sql.internal.SQLConf
class CastOverflowException(t: String, from: String, to: String)
- extends ShimCastOverflowException(t, from, to) {}
+ extends SparkArithmeticException(
+ "CAST_OVERFLOW",
+ Map(
+ "value" -> t,
+ "sourceType" -> s""""$from"""",
+ "targetType" -> s""""$to"""",
+ "ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key)),
+ Array.empty,
+ "") {}
diff --git
a/common/src/main/spark-4.0/org/apache/comet/shims/ShimResolveDefaultColumns.scala
b/common/src/main/spark-3.4/org/apache/comet/shims/ShimFileFormat.scala
similarity index 61%
rename from
common/src/main/spark-4.0/org/apache/comet/shims/ShimResolveDefaultColumns.scala
rename to common/src/main/spark-3.4/org/apache/comet/shims/ShimFileFormat.scala
index 60e21765b..7b4911e81 100644
---
a/common/src/main/spark-4.0/org/apache/comet/shims/ShimResolveDefaultColumns.scala
+++ b/common/src/main/spark-3.4/org/apache/comet/shims/ShimFileFormat.scala
@@ -19,11 +19,15 @@
package org.apache.comet.shims
+import org.apache.spark.sql.execution.datasources.{FileFormat, RowIndexUtil}
+import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
-import org.apache.spark.sql.types.{StructField, StructType}
+object ShimFileFormat {
-object ShimResolveDefaultColumns {
- def getExistenceDefaultValue(field: StructField): Any =
-
ResolveDefaultColumns.getExistenceDefaultValues(StructType(Seq(field))).head
+ // A name for a temporary column that holds row indexes computed by the file
format reader
+ // until they can be placed in the _metadata struct.
+ val ROW_INDEX_TEMPORARY_COLUMN_NAME: String =
FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+
+ def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int =
+ RowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema)
}
diff --git
a/common/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
b/common/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
deleted file mode 100644
index f641b192b..000000000
---
a/common/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.spark.SparkArithmeticException
-import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLConf
-import org.apache.spark.sql.internal.SQLConf
-
-// TODO: Only the Spark 3.3 version of this class is different from the others.
-// Remove this class after dropping Spark 3.3 support.
-class ShimCastOverflowException(t: String, from: String, to: String)
- extends SparkArithmeticException(
- "CAST_OVERFLOW",
- Map(
- "value" -> t,
- "sourceType" -> s""""$from"""",
- "targetType" -> s""""$to"""",
- "ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key)),
- Array.empty,
- "") {}
diff --git
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
b/common/src/main/spark-3.5/org/apache/comet/shims/ShimFileFormat.scala
similarity index 58%
rename from
spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
rename to common/src/main/spark-3.5/org/apache/comet/shims/ShimFileFormat.scala
index 983e099f8..1702db135 100644
---
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
+++ b/common/src/main/spark-3.5/org/apache/comet/shims/ShimFileFormat.scala
@@ -19,17 +19,15 @@
package org.apache.comet.shims
-import org.apache.spark.sql.execution.TakeOrderedAndProjectExec
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.ParquetRowIndexUtil
+import org.apache.spark.sql.types.StructType
-trait ShimCometTakeOrderedAndProjectExec {
+object ShimFileFormat {
+ // A name for a temporary column that holds row indexes computed by the file
format reader
+ // until they can be placed in the _metadata struct.
+ val ROW_INDEX_TEMPORARY_COLUMN_NAME =
ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
- /**
- * TODO: delete after dropping Spark 3.3 support
- */
- protected def getOffset(plan: TakeOrderedAndProjectExec): Option[Int] = {
- plan.getClass.getDeclaredFields
- .filter(_.getName == "offset")
- .map { a => a.setAccessible(true); a.get(plan).asInstanceOf[Int] }
- .headOption
- }
+ def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int =
+ ParquetRowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema)
}
diff --git
a/common/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
b/common/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
deleted file mode 100644
index f641b192b..000000000
---
a/common/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.spark.SparkArithmeticException
-import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLConf
-import org.apache.spark.sql.internal.SQLConf
-
-// TODO: Only the Spark 3.3 version of this class is different from the others.
-// Remove this class after dropping Spark 3.3 support.
-class ShimCastOverflowException(t: String, from: String, to: String)
- extends SparkArithmeticException(
- "CAST_OVERFLOW",
- Map(
- "value" -> t,
- "sourceType" -> s""""$from"""",
- "targetType" -> s""""$to"""",
- "ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key)),
- Array.empty,
- "") {}
diff --git
a/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala
b/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala
deleted file mode 100644
index c34c947b5..000000000
--- a/common/src/main/spark-3.x/org/apache/comet/shims/ShimFileFormat.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.types.{LongType, StructField, StructType}
-
-object ShimFileFormat {
-
- // TODO: remove after dropping Spark 3.3 support and directly use
FileFormat.ROW_INDEX
- val ROW_INDEX = "row_index"
-
- // A name for a temporary column that holds row indexes computed by the file
format reader
- // until they can be placed in the _metadata struct.
- // TODO: remove after dropping Spark 3.3 support and directly use
- // FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
- val ROW_INDEX_TEMPORARY_COLUMN_NAME: String = s"_tmp_metadata_$ROW_INDEX"
-
- // TODO: remove after dropping Spark 3.3 support and directly use
- // RowIndexUtil.findRowIndexColumnIndexInSchema
- def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int = {
- sparkSchema.fields.zipWithIndex.find { case (field: StructField, _: Int) =>
- field.name == ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
- } match {
- case Some((field: StructField, idx: Int)) =>
- if (field.dataType != LongType) {
- throw new RuntimeException(
- s"${ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} must be of
LongType")
- }
- idx
- case _ => -1
- }
- }
-}
diff --git
a/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala
b/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala
deleted file mode 100644
index 4f7d49831..000000000
---
a/common/src/main/spark-3.x/org/apache/comet/shims/ShimResolveDefaultColumns.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import scala.util.Try
-
-import org.apache.spark.sql.types.{StructField, StructType}
-
-object ShimResolveDefaultColumns {
- // TODO: remove after dropping Spark 3.3 support and directly use
ResolveDefaultColumns
- def getExistenceDefaultValue(field: StructField): Any =
- Try {
- // scalastyle:off classforname
-
Class.forName("org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$")
- // scalastyle:on classforname
- }.map { objClass =>
- val objInstance = objClass.getField("MODULE$").get(null)
- val method = objClass.getMethod("getExistenceDefaultValues",
classOf[StructType])
- method.invoke(objInstance,
StructType(Seq(field))).asInstanceOf[Array[Any]].head
- }.getOrElse(null)
-}
diff --git
a/common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
b/common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
deleted file mode 100644
index f641b192b..000000000
---
a/common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCastOverflowException.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.shims
-
-import org.apache.spark.SparkArithmeticException
-import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLConf
-import org.apache.spark.sql.internal.SQLConf
-
-// TODO: Only the Spark 3.3 version of this class is different from the others.
-// Remove this class after dropping Spark 3.3 support.
-class ShimCastOverflowException(t: String, from: String, to: String)
- extends SparkArithmeticException(
- "CAST_OVERFLOW",
- Map(
- "value" -> t,
- "sourceType" -> s""""$from"""",
- "targetType" -> s""""$to"""",
- "ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key)),
- Array.empty,
- "") {}
diff --git a/dev/release/build-release-comet.sh
b/dev/release/build-release-comet.sh
index bd5f43e09..e8339213e 100755
--- a/dev/release/build-release-comet.sh
+++ b/dev/release/build-release-comet.sh
@@ -192,8 +192,6 @@ LOCAL_REPO=$(mktemp -d /tmp/comet-staging-repo-XXXXX)
./mvnw "-Dmaven.repo.local=${LOCAL_REPO}" -P spark-3.4 -P scala-2.12
-DskipTests install
./mvnw "-Dmaven.repo.local=${LOCAL_REPO}" -P spark-3.4 -P scala-2.13
-DskipTests install
-./mvnw "-Dmaven.repo.local=${LOCAL_REPO}" -P spark-3.3 -P scala-2.12
-DskipTests install
-./mvnw "-Dmaven.repo.local=${LOCAL_REPO}" -P spark-3.3 -P scala-2.13
-DskipTests install
./mvnw "-Dmaven.repo.local=${LOCAL_REPO}" -P spark-3.5 -P scala-2.12
-DskipTests install
./mvnw "-Dmaven.repo.local=${LOCAL_REPO}" -P spark-3.5 -P scala-2.13
-DskipTests install
diff --git a/docs/source/user-guide/installation.md
b/docs/source/user-guide/installation.md
index 937280ae4..2a2fc9c8c 100644
--- a/docs/source/user-guide/installation.md
+++ b/docs/source/user-guide/installation.md
@@ -32,7 +32,6 @@ Make sure the following requirements are met and software
installed on your mach
Comet currently supports the following versions of Apache Spark:
-- 3.3.x (Java 8/11/17, Scala 2.12/2.13)
- 3.4.x (Java 8/11/17, Scala 2.12/2.13)
- 3.5.x (Java 8/11/17, Scala 2.12/2.13)
@@ -51,8 +50,6 @@ is currently necessary to build from source.
Here are the direct links for downloading the Comet 0.6.0 jar file.
-- [Comet plugin for Spark 3.3 / Scala
2.12](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.3_2.12/0.6.0/comet-spark-spark3.3_2.12-0.6.0.jar)
-- [Comet plugin for Spark 3.3 / Scala
2.13](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.3_2.13/0.6.0/comet-spark-spark3.3_2.13-0.6.0.jar)
- [Comet plugin for Spark 3.4 / Scala
2.12](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.4_2.12/0.6.0/comet-spark-spark3.4_2.12-0.6.0.jar)
- [Comet plugin for Spark 3.4 / Scala
2.13](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.4_2.13/0.6.0/comet-spark-spark3.4_2.13-0.6.0.jar)
- [Comet plugin for Spark 3.5 / Scala
2.12](https://repo1.maven.org/maven2/org/apache/datafusion/comet-spark-spark3.5_2.12/0.6.0/comet-spark-spark3.5_2.12-0.6.0.jar)
diff --git a/pom.xml b/pom.xml
index cf82935cd..259760d51 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,12 +95,8 @@ under the License.
-Djdk.reflect.useDirectMethodHandle=false
</extraJavaTestArgs>
<argLine>-ea -Xmx4g -Xss4m ${extraJavaTestArgs}</argLine>
- <additional.3_4.test.source>spark-3.4-plus</additional.3_4.test.source>
- <additional.3_5.test.source>not-needed</additional.3_5.test.source>
- <additional.pre35.test.source>spark-pre-3.5</additional.pre35.test.source>
<shims.majorVerSrc>spark-3.x</shims.majorVerSrc>
<shims.minorVerSrc>spark-3.4</shims.minorVerSrc>
- <shims.pre35Src>spark-pre-3.5</shims.pre35Src>
</properties>
<dependencyManagement>
@@ -565,9 +561,6 @@ under the License.
<parquet.version>1.13.1</parquet.version>
<slf4j.version>2.0.7</slf4j.version>
<shims.minorVerSrc>spark-3.5</shims.minorVerSrc>
- <shims.pre35Src>not-needed</shims.pre35Src>
- <additional.pre35.test.source>not-needed</additional.pre35.test.source>
- <additional.3_5.test.source>spark-3.5</additional.3_5.test.source>
</properties>
</profile>
@@ -585,8 +578,6 @@ under the License.
<slf4j.version>2.0.13</slf4j.version>
<shims.majorVerSrc>spark-4.0</shims.majorVerSrc>
<shims.minorVerSrc>not-needed-yet</shims.minorVerSrc>
- <shims.pre35Src>not-needed</shims.pre35Src>
- <additional.pre35.test.source>not-needed</additional.pre35.test.source>
<!-- Use jdk17 by default -->
<java.version>17</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
diff --git a/spark/pom.xml b/spark/pom.xml
index 7cd0b5057..d28b5d29e 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -260,9 +260,6 @@ under the License.
</goals>
<configuration>
<sources>
- <source>src/test/${additional.3_4.test.source}</source>
- <source>src/test/${additional.3_5.test.source}</source>
- <source>src/test/${additional.pre35.test.source}</source>
<source>src/test/${shims.majorVerSrc}</source>
<source>src/test/${shims.minorVerSrc}</source>
</sources>
@@ -278,7 +275,6 @@ under the License.
<sources>
<source>src/main/${shims.majorVerSrc}</source>
<source>src/main/${shims.minorVerSrc}</source>
- <source>src/main/${shims.pre35Src}</source>
</sources>
</configuration>
</execution>
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 613b3ac00..800cab832 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -438,7 +438,7 @@ class CometSparkSessionExtensions
op
}
- case op: LocalLimitExec if getOffset(op) == 0 =>
+ case op: LocalLimitExec =>
val newOp = transform1(op)
newOp match {
case Some(nativeOp) =>
@@ -447,7 +447,7 @@ class CometSparkSessionExtensions
op
}
- case op: GlobalLimitExec if getOffset(op) == 0 =>
+ case op: GlobalLimitExec if op.offset == 0 =>
val newOp = transform1(op)
newOp match {
case Some(nativeOp) =>
@@ -459,10 +459,10 @@ class CometSparkSessionExtensions
case op: CollectLimitExec
if isCometNative(op.child) &&
CometConf.COMET_EXEC_COLLECT_LIMIT_ENABLED.get(conf)
&& isCometShuffleEnabled(conf)
- && getOffset(op) == 0 =>
+ && op.offset == 0 =>
QueryPlanSerde.operator2Proto(op) match {
case Some(nativeOp) =>
- val offset = getOffset(op)
+ val offset = op.offset
val cometOp =
CometCollectLimitExec(op, op.limit, offset, op.child)
CometSinkPlaceHolder(nativeOp, op, cometOp)
@@ -742,7 +742,6 @@ class CometSparkSessionExtensions
// exchange. It is only used for Comet native execution. We only
transform Spark broadcast
// exchange to Comet broadcast exchange if its downstream is a Comet
native plan or if the
// broadcast exchange is forced to be enabled by Comet config.
- // Note that `CometBroadcastExchangeExec` is only supported for Spark
3.4+.
case plan if
plan.children.exists(_.isInstanceOf[BroadcastExchangeExec]) =>
val newChildren = plan.children.map {
case b: BroadcastExchangeExec
diff --git
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
index 4b775db02..a3e917f79 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
@@ -91,7 +91,7 @@ class CometParquetFileFormat extends ParquetFileFormat with
MetricsSupport with
val pushDownDate = sqlConf.parquetFilterPushDownDate
val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
- val pushDownStringPredicate = getPushDownStringPredicate(sqlConf)
+ val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
val pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold
val optionsMap = CaseInsensitiveMap[String](options)
val parquetOptions = new ParquetOptions(optionsMap, sqlConf)
diff --git
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
index 3a430d4c6..01ccde94b 100644
---
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
+++
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala
@@ -63,7 +63,7 @@ case class CometParquetPartitionReaderFactory(
private val pushDownDate = sqlConf.parquetFilterPushDownDate
private val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
- private val pushDownStringPredicate = getPushDownStringPredicate(sqlConf)
+ private val pushDownStringPredicate =
sqlConf.parquetFilterPushDownStringPredicate
private val pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold
private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
private val parquetFilterPushDown = sqlConf.parquetFilterPushDown
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index a1cb5a732..ec94ffd55 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -1997,9 +1997,7 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
}
case _: ArrayRemove => convert(CometArrayRemove)
case _: ArrayContains => convert(CometArrayContains)
- // Function introduced in 3.4.0. Refer by name to provide compatibility
- // with older Spark builds
- case _ if expr.prettyName == "array_append" => convert(CometArrayAppend)
+ case _: ArrayAppend => convert(CometArrayAppend)
case _: ArrayIntersect => convert(CometArrayIntersect)
case _: ArrayJoin => convert(CometArrayJoin)
case _: ArraysOverlap => convert(CometArraysOverlap)
diff --git a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
index da5e9ff53..0284e553c 100644
--- a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala
@@ -21,7 +21,7 @@ package org.apache.comet.serde
import scala.collection.JavaConverters.asJavaIterableConverter
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, EvalMode,
Expression}
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
Average, BitAndAgg, BitOrAgg, BitXorAgg, BloomFilterAggregate,
CentralMomentAgg, Corr, Covariance, CovPopulation, CovSample, First, Last,
StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ByteType, DecimalType, IntegerType,
LongType, ShortType, StringType}
@@ -140,7 +140,7 @@ object CometAverage extends CometAggregateExpressionSerde
with ShimQueryPlanSerd
return None
}
- if (!isLegacyMode(avg)) {
+ if (avg.evalMode != EvalMode.LEGACY) {
withInfo(aggExpr, "Average is only supported in legacy mode")
return None
}
@@ -195,7 +195,7 @@ object CometSum extends CometAggregateExpressionSerde with
ShimQueryPlanSerde {
return None
}
- if (!isLegacyMode(sum)) {
+ if (sum.evalMode != EvalMode.LEGACY) {
withInfo(aggExpr, "Sum is only supported in legacy mode")
return None
}
diff --git a/spark/src/main/scala/org/apache/spark/Plugins.scala
b/spark/src/main/scala/org/apache/spark/Plugins.scala
index c4adf48e9..835fa515b 100644
--- a/spark/src/main/scala/org/apache/spark/Plugins.scala
+++ b/spark/src/main/scala/org/apache/spark/Plugins.scala
@@ -25,7 +25,7 @@ import java.util.Collections
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin,
PluginContext, SparkPlugin}
import org.apache.spark.comet.shims.ShimCometDriverPlugin
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{EXECUTOR_MEMORY,
EXECUTOR_MEMORY_OVERHEAD}
+import org.apache.spark.internal.config.{EXECUTOR_MEMORY,
EXECUTOR_MEMORY_OVERHEAD, EXECUTOR_MEMORY_OVERHEAD_FACTOR}
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.comet.{CometConf, CometSparkSessionExtensions}
@@ -57,7 +57,7 @@ class CometDriverPlugin extends DriverPlugin with Logging
with ShimCometDriverPl
// By default, executorMemory * spark.executor.memoryOverheadFactor,
with minimum of 384MB
val executorMemory =
sc.getConf.getSizeAsMb(EXECUTOR_MEMORY.key, EXECUTOR_MEMORY_DEFAULT)
- val memoryOverheadFactor = getMemoryOverheadFactor(sc.getConf)
+ val memoryOverheadFactor =
sc.getConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
val memoryOverheadMinMib = getMemoryOverheadMinMib(sc.getConf)
Math.max((executorMemory * memoryOverheadFactor).toLong,
memoryOverheadMinMib)
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
index dc1f2db8b..220df65ac 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
@@ -21,7 +21,7 @@ package org.apache.spark.sql.comet
import org.apache.spark.rdd._
import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
DynamicPruningExpression, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
DynamicPruningExpression, Expression, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.read._
@@ -33,12 +33,11 @@ import org.apache.spark.sql.vectorized._
import com.google.common.base.Objects
import org.apache.comet.{DataTypeSupport, MetricsSupport}
-import org.apache.comet.shims.ShimCometBatchScanExec
case class CometBatchScanExec(wrapped: BatchScanExec, runtimeFilters:
Seq[Expression])
extends DataSourceV2ScanExecBase
- with ShimCometBatchScanExec
with CometPlan {
+ def ordering: Option[Seq[SortOrder]] = wrapped.ordering
wrapped.logicalLink.foreach(setLogicalLink)
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
index 6bc519ab9..3285159be 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
@@ -29,9 +29,7 @@ import scala.concurrent.duration.NANOSECONDS
import scala.util.control.NonFatal
import org.apache.spark.{broadcast, Partition, SparkContext, SparkEnv,
TaskContext}
-import org.apache.spark.comet.shims.ShimCometBroadcastExchangeExec
import org.apache.spark.io.CompressionCodec
-import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -61,16 +59,12 @@ import org.apache.comet.CometRuntimeException
* Note that this class cannot extend `CometExec` as usual similar to other
Comet operators. As
* the trait `BroadcastExchangeLike` in Spark extends abstract class
`Exchange`, it limits the
* flexibility to extend `CometExec` and `Exchange` at the same time.
- *
- * Note that this only supports Spark 3.4 and later, because the serialization
class
- * `ChunkedByteBuffer` is only serializable in Spark 3.4 and later.
*/
case class CometBroadcastExchangeExec(
originalPlan: SparkPlan,
override val output: Seq[Attribute],
override val child: SparkPlan)
- extends BroadcastExchangeLike
- with ShimCometBroadcastExchangeExec {
+ extends BroadcastExchangeLike {
import CometBroadcastExchangeExec._
override val runId: UUID = UUID.randomUUID
@@ -181,7 +175,8 @@ case class CometBroadcastExchangeExec(
longMetric("buildTime") += NANOSECONDS.toMillis(beforeBroadcast -
beforeBuild)
// (3.4 only) SPARK-39983 - Broadcast the relation without caching the
unserialized object.
- val broadcasted = doBroadcast(sparkContext, batches)
+ val broadcasted = sparkContext
+ .broadcastInternal(batches, true)
.asInstanceOf[broadcast.Broadcast[Any]]
longMetric("broadcastTime") += NANOSECONDS.toMillis(System.nanoTime()
- beforeBroadcast)
val executionId =
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
@@ -193,16 +188,7 @@ case class CometBroadcastExchangeExec(
// SparkFatalException, which is a subclass of Exception.
ThreadUtils.awaitResult
// will catch this exception and re-throw the wrapped fatal throwable.
case oe: OutOfMemoryError =>
- // Spark 3.4 has two parameters for
`notEnoughMemoryToBuildAndBroadcastTableError`, which
- // is different to Spark 3.3. We simply create the error message
here.
- val error =
- new OutOfMemoryError(
- "Not enough memory to build and broadcast the table to all " +
- "worker nodes. As a workaround, you can either disable
broadcast by setting " +
- s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 or
increase the spark " +
- s"driver memory by setting ${SparkLauncher.DRIVER_MEMORY} to a
higher value.")
- .initCause(oe.getCause)
- val ex = new SparkFatalException(error)
+ val ex = new SparkFatalException(oe)
promise.tryFailure(ex)
throw ex
case e if !NonFatal(e) =>
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
index 7c8db1ab9..9e97c06d5 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.comet.shims.ShimCometScanExec
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
ParquetOptions}
@@ -307,7 +308,7 @@ case class CometScanExec(
.groupBy { f =>
BucketingUtils
.getBucketId(new Path(f.filePath.toString()).getName)
- .getOrElse(throw invalidBucketFile(f.filePath.toString(),
sparkContext.version))
+ .getOrElse(throw
QueryExecutionErrors.invalidBucketFile(f.filePath.toString()))
}
val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
index 19586628a..885253868 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.comet.serde.QueryPlanSerde.exprToProto
import org.apache.comet.serde.QueryPlanSerde.supportedSortType
-import org.apache.comet.shims.ShimCometTakeOrderedAndProjectExec
/**
* Comet physical plan node for Spark `TakeOrderedAndProjectExec`.
@@ -130,12 +129,12 @@ case class CometTakeOrderedAndProjectExec(
this.copy(child = newChild)
}
-object CometTakeOrderedAndProjectExec extends
ShimCometTakeOrderedAndProjectExec {
+object CometTakeOrderedAndProjectExec {
// TODO: support offset for Spark 3.4
def isSupported(plan: TakeOrderedAndProjectExec): Boolean = {
val exprs = plan.projectList.map(exprToProto(_, plan.child.output))
val sortOrders = plan.sortOrder.map(exprToProto(_, plan.child.output))
- exprs.forall(_.isDefined) && sortOrders.forall(_.isDefined) &&
getOffset(plan).getOrElse(
- 0) == 0 && supportedSortType(plan, plan.sortOrder)
+ exprs.forall(_.isDefined) && sortOrders.forall(_.isDefined) && plan.offset
== 0 &&
+ supportedSortType(plan, plan.sortOrder)
}
}
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
index 7c27fb196..f2af4402d 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
@@ -617,18 +617,3 @@ class CometShuffleWriteProcessor(
}
}
}
-
-/**
- * Copied from Spark `PartitionIdPassthrough` as it is private in Spark 3.3.
- */
-private[spark] class PartitionIdPassthrough(override val numPartitions: Int)
extends Partitioner {
- override def getPartition(key: Any): Int = key.asInstanceOf[Int]
-}
-
-/**
- * Copied from Spark `ConstantPartitioner` as it doesn't exist in Spark 3.3.
- */
-private[spark] class ConstantPartitioner extends Partitioner {
- override def numPartitions: Int = 1
- override def getPartition(key: Any): Int = 0
-}
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index f36b41aa6..3b9c6bdbc 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -31,11 +31,11 @@ import
org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expre
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
AggregateMode}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight,
BuildSide}
import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning, PartitioningCollection, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
HashPartitioningLike, Partitioning, PartitioningCollection, UnknownPartitioning}
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
-import org.apache.spark.sql.comet.plans.PartitioningPreservingUnaryExecNode
import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.execution.{BinaryExecNode, ColumnarToRowExec,
ExecSubqueryExpression, ExplainUtils, LeafExecNode, ScalarSubquery, SparkPlan,
UnaryExecNode}
+import org.apache.spark.sql.execution.PartitioningPreservingUnaryExecNode
import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec,
BroadcastQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -47,7 +47,6 @@ import com.google.common.base.Objects
import org.apache.comet.{CometConf, CometExecIterator, CometRuntimeException}
import org.apache.comet.serde.OperatorOuterClass.Operator
-import org.apache.comet.shims.ShimCometBroadcastHashJoinExec
/**
* A Comet physical operator
@@ -793,8 +792,7 @@ case class CometBroadcastHashJoinExec(
override val left: SparkPlan,
override val right: SparkPlan,
override val serializedPlanOpt: SerializedPlan)
- extends CometBinaryExec
- with ShimCometBroadcastHashJoinExec {
+ extends CometBinaryExec {
// The following logic of `outputPartitioning` is copied from Spark
`BroadcastHashJoinExec`.
protected lazy val streamedPlan: SparkPlan = buildSide match {
@@ -883,8 +881,13 @@ case class CometBroadcastHashJoinExec(
}
}
+ val hashPartitioningLikeExpressions =
+ partitioning match {
+ case p: HashPartitioningLike => p.expressions
+ case _ => Seq()
+ }
PartitioningCollection(
-
generateExprCombinations(getHashPartitioningLikeExpressions(partitioning), Nil)
+ generateExprCombinations(hashPartitioningLikeExpressions, Nil)
.map(exprs =>
partitioning.withNewChildren(exprs).asInstanceOf[Partitioning]))
}
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala
index f28ca2bba..1388f7d97 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/plans/AliasAwareOutputExpression.scala
@@ -23,16 +23,13 @@ import scala.collection.mutable
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeSet, Expression, NamedExpression}
-import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+import
org.apache.spark.sql.internal.SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT
/**
* A trait that provides functionality to handle aliases in the
`outputExpressions`.
*/
trait AliasAwareOutputExpression extends SQLConfHelper {
- // `SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT` is Spark 3.4+ only.
- // Use a default value for now.
- protected val aliasCandidateLimit: Int =
-
conf.getConfString("spark.sql.optimizer.expressionProjectionCandidateLimit",
"100").toInt
+ protected val aliasCandidateLimit: Int =
conf.getConf(EXPRESSION_PROJECTION_CANDIDATE_LIMIT)
protected def outputExpressions: Seq[NamedExpression]
/**
@@ -76,7 +73,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper {
*/
protected def projectExpression(expr: Expression): Stream[Expression] = {
val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
- multiTransformDown(expr) {
+ expr.multiTransformDown {
// Mapping with aliases
case e: Expression if aliasMap.contains(e.canonicalized) =>
aliasMap(e.canonicalized).toSeq ++ (if (e.containsChild.nonEmpty)
Seq(e) else Seq.empty)
@@ -85,60 +82,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper {
// This prune will go up to the closest `multiTransformDown()` call and
returns `Stream.empty`
// there.
case a: Attribute if !outputSet.contains(a) => Seq.empty
- }
- }
-
- // Copied from Spark 3.4+ to make it available in Spark 3.3+.
- def multiTransformDown(expr: Expression)(
- rule: PartialFunction[Expression, Seq[Expression]]): Stream[Expression]
= {
-
- // We could return `Seq(this)` if the `rule` doesn't apply and handle both
- // - the doesn't apply
- // - and the rule returns a one element `Seq(originalNode)`
- // cases together. The returned `Seq` can be a `Stream` and unfortunately
it doesn't seem like
- // there is a way to match on a one element stream without eagerly
computing the tail's head.
- // This contradicts with the purpose of only taking the necessary elements
from the
- // alternatives. I.e. the "multiTransformDown is lazy" test case in
`TreeNodeSuite` would fail.
- // Please note that this behaviour has a downside as well that we can only
mark the rule on the
- // original node ineffective if the rule didn't match.
- var ruleApplied = true
- val afterRules = CurrentOrigin.withOrigin(expr.origin) {
- rule.applyOrElse(
- expr,
- (_: Expression) => {
- ruleApplied = false
- Seq.empty
- })
- }
-
- val afterRulesStream = if (afterRules.isEmpty) {
- if (ruleApplied) {
- // If the rule returned with empty alternatives then prune
- Stream.empty
- } else {
- // If the rule was not applied then keep the original node
- Stream(expr)
- }
- } else {
- // If the rule was applied then use the returned alternatives
- afterRules.toStream.map { afterRule =>
- if (expr fastEquals afterRule) {
- expr
- } else {
- afterRule.copyTagsFrom(expr)
- afterRule
- }
- }
- }
-
- afterRulesStream.flatMap { afterRule =>
- if (afterRule.containsChild.nonEmpty) {
- generateCartesianProduct(afterRule.children.map(c => () =>
multiTransformDown(c)(rule)))
- .map(afterRule.withNewChildren)
- } else {
- Stream(afterRule)
- }
- }
+ }.toStream
}
def generateCartesianProduct[T](elementSeqs: Seq[() => Seq[T]]):
Stream[Seq[T]] = {
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala
deleted file mode 100644
index d584e8609..000000000
---
a/spark/src/main/scala/org/apache/spark/sql/comet/plans/PartitioningPreservingUnaryExecNode.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.comet.plans
-
-import scala.collection.mutable
-
-import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression}
-import org.apache.spark.sql.catalyst.plans.physical.{Partitioning,
PartitioningCollection, UnknownPartitioning}
-import org.apache.spark.sql.execution.UnaryExecNode
-
-/**
- * A trait that handles aliases in the `outputExpressions` to produce
`outputPartitioning` that
- * satisfies distribution requirements.
- *
- * This is copied from Spark's `PartitioningPreservingUnaryExecNode` because
it is only available
- * in Spark 3.4+. This is a workaround to make it available in Spark 3.3+.
- */
-trait PartitioningPreservingUnaryExecNode extends UnaryExecNode with
AliasAwareOutputExpression {
- final override def outputPartitioning: Partitioning = {
- val partitionings: Seq[Partitioning] = if (hasAlias) {
- flattenPartitioning(child.outputPartitioning).flatMap {
- case e: Expression =>
- // We need unique partitionings but if the input partitioning is
- // `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id ->
b` aliases then after
- // the projection we have 4 partitionings:
- // `HashPartitioning(Seq(a + a))`, `HashPartitioning(Seq(a + b))`,
- // `HashPartitioning(Seq(b + a))`, `HashPartitioning(Seq(b + b))`,
but
- // `HashPartitioning(Seq(a + b))` is the same as
`HashPartitioning(Seq(b + a))`.
- val partitioningSet = mutable.Set.empty[Expression]
- projectExpression(e)
- .filter(e => partitioningSet.add(e.canonicalized))
- .take(aliasCandidateLimit)
- .asInstanceOf[Stream[Partitioning]]
- case o => Seq(o)
- }
- } else {
- // Filter valid partitiongs (only reference output attributes of the
current plan node)
- val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
- flattenPartitioning(child.outputPartitioning).filter {
- case e: Expression => e.references.subsetOf(outputSet)
- case _ => true
- }
- }
- partitionings match {
- case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions)
- case Seq(p) => p
- case ps => PartitioningCollection(ps)
- }
- }
-
- private def flattenPartitioning(partitioning: Partitioning):
Seq[Partitioning] = {
- partitioning match {
- case PartitioningCollection(childPartitionings) =>
- childPartitionings.flatMap(flattenPartitioning)
- case rest =>
- rest +: Nil
- }
- }
-}
diff --git
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
b/spark/src/main/spark-3.4/org/apache/comet/shims/ShimSQLConf.scala
similarity index 80%
rename from
spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
rename to spark/src/main/spark-3.4/org/apache/comet/shims/ShimSQLConf.scala
index 5a8ac97b3..0bff426c2 100644
---
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometTakeOrderedAndProjectExec.scala
+++ b/spark/src/main/spark-3.4/org/apache/comet/shims/ShimSQLConf.scala
@@ -19,8 +19,9 @@
package org.apache.comet.shims
-import org.apache.spark.sql.execution.TakeOrderedAndProjectExec
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
-trait ShimCometTakeOrderedAndProjectExec {
- protected def getOffset(plan: TakeOrderedAndProjectExec): Option[Int] =
Some(plan.offset)
+trait ShimSQLConf {
+ protected val LEGACY = LegacyBehaviorPolicy.LEGACY
+ protected val CORRECTED = LegacyBehaviorPolicy.CORRECTED
}
diff --git
a/spark/src/main/spark-pre-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
similarity index 59%
rename from
spark/src/main/spark-pre-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
rename to
spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
index a25be3bc0..1bd3c7c98 100644
---
a/spark/src/main/spark-pre-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
+++
b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
@@ -20,8 +20,9 @@
package org.apache.spark.sql.comet.shims
import org.apache.comet.shims.ShimFileFormat
+
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.spark.SparkException
+
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
@@ -34,57 +35,21 @@ import org.apache.spark.sql.types.StructType
trait ShimCometScanExec {
def wrapped: FileSourceScanExec
- // TODO: remove after dropping Spark 3.3 support
- lazy val metadataColumns: Seq[AttributeReference] =
wrapped.getClass.getDeclaredMethods
- .filter(_.getName == "metadataColumns")
- .map { a => a.setAccessible(true); a }
- .flatMap(_.invoke(wrapped).asInstanceOf[Seq[AttributeReference]])
-
- // TODO: remove after dropping Spark 3.3 support and directly call
- // wrapped.fileConstantMetadataColumns
lazy val fileConstantMetadataColumns: Seq[AttributeReference] =
- wrapped.getClass.getDeclaredMethods
- .filter(_.getName == "fileConstantMetadataColumns")
- .map { a => a.setAccessible(true); a }
- .flatMap(_.invoke(wrapped).asInstanceOf[Seq[AttributeReference]])
+ wrapped.fileConstantMetadataColumns
- // TODO: remove after dropping Spark 3.3 and 3.4 support and directly call
new FileScanRDD
protected def newFileScanRDD(
fsRelation: HadoopFsRelation,
readFunction: PartitionedFile => Iterator[InternalRow],
filePartitions: Seq[FilePartition],
readSchema: StructType,
- options: ParquetOptions): FileScanRDD =
- classOf[FileScanRDD].getDeclaredConstructors
- // Prevent to pick up incorrect constructors from any custom Spark forks.
- .filter(c => List(5, 6).contains(c.getParameterCount()))
- .map { c =>
- c.getParameterCount match {
- case 5 =>
- c.newInstance(fsRelation.sparkSession, readFunction,
filePartitions, readSchema, metadataColumns)
- case 6 =>
- c.newInstance(
- fsRelation.sparkSession,
- readFunction,
- filePartitions,
- readSchema,
- fileConstantMetadataColumns,
- options)
- }
- }
- .last
- .asInstanceOf[FileScanRDD]
-
- // TODO: remove after dropping Spark 3.3 support and directly call
- // QueryExecutionErrors.SparkException
- protected def invalidBucketFile(path: String, sparkVersion: String):
Throwable = {
- val messageParameters = if (sparkVersion >= "3.4") Map("path" -> path)
else Array(path)
- classOf[SparkException].getDeclaredConstructors
- .filter(_.getParameterCount == 3)
- .map(_.newInstance("INVALID_BUCKET_FILE", messageParameters, null))
- .last
- .asInstanceOf[SparkException]
- }
+ options: ParquetOptions): FileScanRDD = new FileScanRDD(
+ fsRelation.sparkSession,
+ readFunction,
+ filePartitions,
+ readSchema,
+ fileConstantMetadataColumns,
+ options)
protected def isNeededForSchema(sparkSchema: StructType): Boolean = {
// TODO: remove after PARQUET-2161 becomes available in Parquet (tracked
in SPARK-39634)
diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/ShimSQLConf.scala
b/spark/src/main/spark-3.5/org/apache/comet/shims/ShimSQLConf.scala
index aafe4aa7c..bdb273946 100644
--- a/spark/src/main/spark-3.5/org/apache/comet/shims/ShimSQLConf.scala
+++ b/spark/src/main/spark-3.5/org/apache/comet/shims/ShimSQLConf.scala
@@ -20,17 +20,8 @@
package org.apache.comet.shims
import org.apache.spark.sql.internal.LegacyBehaviorPolicy
-import org.apache.spark.sql.internal.SQLConf
trait ShimSQLConf {
-
- /**
- * Spark 3.4 renamed parquetFilterPushDownStringStartWith to
- * parquetFilterPushDownStringPredicate
- */
- protected def getPushDownStringPredicate(sqlConf: SQLConf): Boolean =
- sqlConf.parquetFilterPushDownStringPredicate
-
protected val LEGACY = LegacyBehaviorPolicy.LEGACY
protected val CORRECTED = LegacyBehaviorPolicy.CORRECTED
}
diff --git
a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
index e4a5584aa..cee444e3f 100644
---
a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
+++
b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
-import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
@@ -52,9 +51,6 @@ trait ShimCometScanExec {
fsRelation.fileFormat.fileConstantMetadataExtractors,
options)
- protected def invalidBucketFile(path: String, sparkVersion: String):
Throwable =
- QueryExecutionErrors.invalidBucketFile(path)
-
// see SPARK-39634
protected def isNeededForSchema(sparkSchema: StructType): Boolean = false
diff --git
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala
deleted file mode 100644
index 49e931110..000000000
---
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBatchScanExec.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.catalyst.expressions.SortOrder
-import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
-
-trait ShimCometBatchScanExec {
- def wrapped: BatchScanExec
-
- // Only for Spark 3.4+
- def ordering: Option[Seq[SortOrder]] = wrapped.getClass.getDeclaredMethods
- .filter(_.getName == "ordering")
- .flatMap(_.invoke(wrapped).asInstanceOf[Option[Seq[SortOrder]]])
- .headOption
-}
diff --git
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
deleted file mode 100644
index 442bb9e58..000000000
---
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-
-trait ShimCometBroadcastHashJoinExec {
-
- /**
- * Returns the expressions that are used for hash partitioning including
`HashPartitioning` and
- * `CoalescedHashPartitioning`. They shares same trait
`HashPartitioningLike` since Spark 3.4,
- * but Spark 3.3 doesn't have `HashPartitioningLike` and
`CoalescedHashPartitioning`.
- *
- * TODO: remove after dropping Spark 3.3 support.
- */
- def getHashPartitioningLikeExpressions(partitioning: Partitioning):
Seq[Expression] = {
- partitioning.getClass.getDeclaredMethods
- .filter(_.getName == "expressions")
- .flatMap(_.invoke(partitioning).asInstanceOf[Seq[Expression]])
- }
-}
diff --git
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
index 965b6851e..319803544 100644
---
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
+++
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala
@@ -26,7 +26,7 @@ import
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.types.{StructField, StructType}
trait ShimCometShuffleExchangeExec {
- // TODO: remove after dropping Spark 3.3 support
+ // TODO: remove after dropping Spark 3.4 support
def apply(s: ShuffleExchangeExec, shuffleType: ShuffleType):
CometShuffleExchangeExec = {
val advisoryPartitionSize = s.getClass.getDeclaredMethods
.filter(_.getName == "advisoryPartitionSize")
diff --git
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
index c8aeacf2a..ab20cd7bd 100644
---
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
+++
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
@@ -19,27 +19,15 @@
package org.apache.comet.shims
-import org.apache.spark.sql.execution.{LimitExec, QueryExecution, SparkPlan}
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
trait ShimCometSparkSessionExtensions {
- /**
- * TODO: delete after dropping Spark 3.3 support
- */
- def getOffset(limit: LimitExec): Int = getOffsetOpt(limit).getOrElse(0)
-
/**
* TODO: delete after dropping Spark 3.x support and directly call
* SQLConf.EXTENDED_EXPLAIN_PROVIDERS.key
*/
protected val EXTENDED_EXPLAIN_PROVIDERS_KEY =
"spark.sql.extendedExplainProviders"
- private def getOffsetOpt(plan: SparkPlan): Option[Int] =
plan.getClass.getDeclaredFields
- .filter(_.getName == "offset")
- .map { a => a.setAccessible(true); a.get(plan) }
- .filter(_.isInstanceOf[Int])
- .map(_.asInstanceOf[Int])
- .headOption
-
// Extended info is available only since Spark 4.0.0
// (https://issues.apache.org/jira/browse/SPARK-47289)
def supportsExtendedExplainInfo(qe: QueryExecution): Boolean = {
diff --git
a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala
b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala
index 1b0996d9d..c47b399cf 100644
--- a/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala
+++ b/spark/src/main/spark-3.x/org/apache/comet/shims/ShimQueryPlanSerde.scala
@@ -44,22 +44,4 @@ trait ShimQueryPlanSerde {
failOnError.head
}
}
-
- // TODO: delete after drop Spark 3.3 support
- // This method is used to check if the aggregate function is in legacy mode.
- // EvalMode is an enum object in Spark 3.4.
- def isLegacyMode(aggregate: DeclarativeAggregate): Boolean = {
- val evalMode = aggregate.getClass.getDeclaredMethods
- .flatMap(m =>
- m.getName match {
- case "evalMode" => Some(m.invoke(aggregate))
- case _ => None
- })
-
- if (evalMode.isEmpty) {
- true
- } else {
- "legacy".equalsIgnoreCase(evalMode.head.toString)
- }
- }
}
diff --git
a/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
b/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
deleted file mode 100644
index afcf653b4..000000000
---
a/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.comet.shims
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.SparkContext
-import org.apache.spark.broadcast.Broadcast
-
-trait ShimCometBroadcastExchangeExec {
- // TODO: remove after dropping Spark 3.3 support
- protected def doBroadcast[T: ClassTag](sparkContext: SparkContext, value:
T): Broadcast[Any] = {
- // Spark 3.4 has new API `broadcastInternal` to broadcast the relation
without caching the
- // unserialized object.
- val classTag = implicitly[ClassTag[T]]
- val broadcasted = sparkContext.getClass.getDeclaredMethods
- .filter(_.getName == "broadcastInternal")
- .map { a => a.setAccessible(true); a }
- .map { method =>
- method
- .invoke(
- sparkContext.asInstanceOf[Object],
- value.asInstanceOf[Object],
- true.asInstanceOf[Object],
- classTag.asInstanceOf[Object])
- .asInstanceOf[Broadcast[Any]]
- }
- .headOption
- // Fallback to the old API if the new API is not available.
- broadcasted
- .getOrElse(sparkContext.broadcast(value.asInstanceOf[Object]))
- .asInstanceOf[Broadcast[Any]]
- }
-}
diff --git
a/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
b/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
index cfb6a0088..d6a7e2d7f 100644
---
a/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
+++
b/spark/src/main/spark-3.x/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
@@ -22,17 +22,10 @@ package org.apache.spark.comet.shims
import org.apache.spark.SparkConf
trait ShimCometDriverPlugin {
- // `org.apache.spark.internal.config.EXECUTOR_MEMORY_OVERHEAD_FACTOR` was
added since Spark 3.3.0
- private val EXECUTOR_MEMORY_OVERHEAD_FACTOR =
"spark.executor.memoryOverheadFactor"
- private val EXECUTOR_MEMORY_OVERHEAD_FACTOR_DEFAULT = 0.1
// `org.apache.spark.internal.config.EXECUTOR_MIN_MEMORY_OVERHEAD` was added
since Spark 4.0.0
private val EXECUTOR_MIN_MEMORY_OVERHEAD = "spark.executor.minMemoryOverhead"
private val EXECUTOR_MIN_MEMORY_OVERHEAD_DEFAULT = 384L
- def getMemoryOverheadFactor(sc: SparkConf): Double =
- sc.getDouble(
- EXECUTOR_MEMORY_OVERHEAD_FACTOR,
- EXECUTOR_MEMORY_OVERHEAD_FACTOR_DEFAULT)
def getMemoryOverheadMinMib(sc: SparkConf): Long =
sc.getLong(EXECUTOR_MIN_MEMORY_OVERHEAD,
EXECUTOR_MIN_MEMORY_OVERHEAD_DEFAULT)
}
diff --git
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala
b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala
deleted file mode 100644
index d41502f6e..000000000
---
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBatchScanExec.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.catalyst.expressions.SortOrder
-import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
-
-trait ShimCometBatchScanExec {
- def wrapped: BatchScanExec
-
- def ordering: Option[Seq[SortOrder]] = wrapped.ordering
-}
diff --git
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
deleted file mode 100644
index 1f689b400..000000000
---
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBroadcastHashJoinExec.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioningLike,
Partitioning}
-
-trait ShimCometBroadcastHashJoinExec {
- protected def getHashPartitioningLikeExpressions(partitioning:
Partitioning): Seq[Expression] =
- partitioning match {
- case p: HashPartitioningLike => p.expressions
- case _ => Seq()
- }
-}
diff --git
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
index 9fb7355ee..e68c0cb3e 100644
---
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
+++
b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala
@@ -20,17 +20,13 @@
package org.apache.comet.shims
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
-import org.apache.spark.sql.execution.{CollectLimitExec, GlobalLimitExec,
LocalLimitExec, QueryExecution}
+import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.internal.SQLConf
trait ShimCometSparkSessionExtensions {
protected def getPushedAggregate(scan: ParquetScan): Option[Aggregation] =
scan.pushedAggregate
- protected def getOffset(limit: LocalLimitExec): Int = 0
- protected def getOffset(limit: GlobalLimitExec): Int = limit.offset
- protected def getOffset(limit: CollectLimitExec): Int = limit.offset
-
protected def supportsExtendedExplainInfo(qe: QueryExecution): Boolean = true
protected val EXTENDED_EXPLAIN_PROVIDERS_KEY =
SQLConf.EXTENDED_EXPLAIN_PROVIDERS.key
diff --git
a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimQueryPlanSerde.scala
b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimQueryPlanSerde.scala
index 4d261f3c2..10821881b 100644
--- a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimQueryPlanSerde.scala
+++ b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimQueryPlanSerde.scala
@@ -19,7 +19,7 @@
package org.apache.comet.shims
-import org.apache.spark.sql.catalyst.expressions.{BinaryArithmetic,
BinaryExpression, BloomFilterMightContain, EvalMode}
+import org.apache.spark.sql.catalyst.expressions.{BinaryArithmetic,
BinaryExpression, BloomFilterMightContain}
import org.apache.spark.sql.catalyst.expressions.aggregate.{Average, Sum}
trait ShimQueryPlanSerde {
@@ -29,9 +29,6 @@ trait ShimQueryPlanSerde {
protected def getFailOnError(aggregate: Sum): Boolean =
aggregate.initQueryContext().isDefined
protected def getFailOnError(aggregate: Average): Boolean =
aggregate.initQueryContext().isDefined
- protected def isLegacyMode(aggregate: Sum): Boolean =
aggregate.evalMode.equals(EvalMode.LEGACY)
- protected def isLegacyMode(aggregate: Average): Boolean =
aggregate.evalMode.equals(EvalMode.LEGACY)
-
protected def isBloomFilterMightContain(binary: BinaryExpression): Boolean =
binary.isInstanceOf[BloomFilterMightContain]
}
diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimSQLConf.scala
b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimSQLConf.scala
index 574967767..bdb273946 100644
--- a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimSQLConf.scala
+++ b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimSQLConf.scala
@@ -19,13 +19,9 @@
package org.apache.comet.shims
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.LegacyBehaviorPolicy
trait ShimSQLConf {
- protected def getPushDownStringPredicate(sqlConf: SQLConf): Boolean =
- sqlConf.parquetFilterPushDownStringPredicate
-
protected val LEGACY = LegacyBehaviorPolicy.LEGACY
protected val CORRECTED = LegacyBehaviorPolicy.CORRECTED
}
diff --git
a/spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
b/spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
deleted file mode 100644
index ba87a2515..000000000
---
a/spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometBroadcastExchangeExec.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.comet.shims
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.SparkContext
-import org.apache.spark.broadcast.Broadcast
-
-trait ShimCometBroadcastExchangeExec {
- protected def doBroadcast[T: ClassTag](sparkContext: SparkContext, value:
T): Broadcast[Any] =
- sparkContext.broadcastInternal(value, true)
-}
diff --git
a/spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
b/spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
index f7a57a642..de3263bea 100644
---
a/spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
+++
b/spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala
@@ -20,13 +20,9 @@
package org.apache.spark.comet.shims
import org.apache.spark.SparkConf
-import org.apache.spark.internal.config.EXECUTOR_MEMORY_OVERHEAD_FACTOR
import org.apache.spark.internal.config.EXECUTOR_MIN_MEMORY_OVERHEAD
trait ShimCometDriverPlugin {
- protected def getMemoryOverheadFactor(sparkConf: SparkConf): Double =
sparkConf.get(
- EXECUTOR_MEMORY_OVERHEAD_FACTOR)
-
protected def getMemoryOverheadMinMib(sparkConf: SparkConf): Long =
sparkConf.get(
EXECUTOR_MIN_MEMORY_OVERHEAD)
}
diff --git
a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
index 3edc43278..7fe9ea53a 100644
---
a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
+++
b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala
@@ -24,7 +24,6 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression, FileSourceConstantMetadataAttribute, Literal}
-import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.{FileSourceScanExec,
PartitionedFileUtil, ScalarSubquery}
@@ -53,9 +52,6 @@ trait ShimCometScanExec {
options)
}
- protected def invalidBucketFile(path: String, sparkVersion: String):
Throwable =
- QueryExecutionErrors.invalidBucketFile(path)
-
// see SPARK-39634
protected def isNeededForSchema(sparkSchema: StructType): Boolean = false
diff --git
a/spark/src/main/spark-pre-3.5/org/apache/comet/shims/ShimSQLConf.scala
b/spark/src/main/spark-pre-3.5/org/apache/comet/shims/ShimSQLConf.scala
deleted file mode 100644
index 579db51c3..000000000
--- a/spark/src/main/spark-pre-3.5/org/apache/comet/shims/ShimSQLConf.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.shims
-
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
-
-trait ShimSQLConf {
-
- /**
- * Spark 3.4 renamed parquetFilterPushDownStringStartWith to
- * parquetFilterPushDownStringPredicate
- *
- * TODO: delete after dropping Spark 3.3 support and simply use
- * parquetFilterPushDownStringPredicate
- */
- protected def getPushDownStringPredicate(sqlConf: SQLConf): Boolean =
- sqlConf.getClass.getMethods
- .flatMap(m =>
- m.getName match {
- case "parquetFilterPushDownStringStartWith" |
"parquetFilterPushDownStringPredicate" =>
- Some(m.invoke(sqlConf).asInstanceOf[Boolean])
- case _ => None
- })
- .head
-
- protected val LEGACY = LegacyBehaviorPolicy.LEGACY
- protected val CORRECTED = LegacyBehaviorPolicy.CORRECTED
-}
diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
index 7531a9b9a..68febbab2 100644
--- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
@@ -1163,7 +1163,6 @@ class CometCastSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
hasIncompatibleType: Boolean = false,
testAnsi: Boolean = true): Unit = {
- // we now support the TryCast expression in Spark 3.3
withTempPath { dir =>
val data = roundtripParquet(input, dir).coalesce(1)
data.createOrReplaceTempView("t")
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 48da86a74..c3bd2efef 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -2689,32 +2689,26 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
randomSize = 10000)
withParquetTable(path1.toString, "tbl1") {
withParquetTable(path2.toString, "tbl2") {
- // disable broadcast, as comet on spark 3.3 does not support
broadcast exchange
- withSQLConf(
- SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
- SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
- checkSparkAnswerAndOperator("""
- |select
- | t1._2 div t2._2, div(t1._2, t2._2),
- | t1._3 div t2._3, div(t1._3, t2._3),
- | t1._4 div t2._4, div(t1._4, t2._4),
- | t1._5 div t2._5, div(t1._5, t2._5),
- | t1._9 div t2._9, div(t1._9, t2._9),
- | t1._10 div t2._10, div(t1._10, t2._10),
- | t1._11 div t2._11, div(t1._11, t2._11)
- | from tbl1 t1 join tbl2 t2 on t1._id = t2._id
- | order by t1._id""".stripMargin)
-
- // decimal support requires Spark 3.4 or later
- checkSparkAnswerAndOperator("""
- |select
- | t1._12 div t2._12, div(t1._12, t2._12),
- | t1._15 div t2._15, div(t1._15, t2._15),
- | t1._16 div t2._16, div(t1._16, t2._16),
- | t1._17 div t2._17, div(t1._17, t2._17)
- | from tbl1 t1 join tbl2 t2 on t1._id = t2._id
- | order by t1._id""".stripMargin)
- }
+ checkSparkAnswerAndOperator("""
+ |select
+ | t1._2 div t2._2, div(t1._2, t2._2),
+ | t1._3 div t2._3, div(t1._3, t2._3),
+ | t1._4 div t2._4, div(t1._4, t2._4),
+ | t1._5 div t2._5, div(t1._5, t2._5),
+ | t1._9 div t2._9, div(t1._9, t2._9),
+ | t1._10 div t2._10, div(t1._10, t2._10),
+ | t1._11 div t2._11, div(t1._11, t2._11)
+ | from tbl1 t1 join tbl2 t2 on t1._id = t2._id
+ | order by t1._id""".stripMargin)
+
+ checkSparkAnswerAndOperator("""
+ |select
+ | t1._12 div t2._12, div(t1._12, t2._12),
+ | t1._15 div t2._15, div(t1._15, t2._15),
+ | t1._16 div t2._16, div(t1._16, t2._16),
+ | t1._17 div t2._17, div(t1._17, t2._17)
+ | from tbl1 t1 join tbl2 t2 on t1._id = t2._id
+ | order by t1._id""".stripMargin)
}
}
}
diff --git
a/spark/src/test/spark-3.4-plus/org/apache/comet/exec/CometExec3_4PlusSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometExec3_4PlusSuite.scala
similarity index 93%
rename from
spark/src/test/spark-3.4-plus/org/apache/comet/exec/CometExec3_4PlusSuite.scala
rename to spark/src/test/scala/org/apache/comet/exec/CometExec3_4PlusSuite.scala
index 931f6900a..71060de64 100644
---
a/spark/src/test/spark-3.4-plus/org/apache/comet/exec/CometExec3_4PlusSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExec3_4PlusSuite.scala
@@ -20,15 +20,20 @@
package org.apache.comet.exec
import java.io.ByteArrayOutputStream
+
import scala.util.Random
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
import org.apache.spark.sql.{Column, CometTestBase}
-import org.apache.comet.CometConf
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain,
Expression, ExpressionInfo}
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.util.sketch.BloomFilter
-import org.scalactic.source.Position
-import org.scalatest.Tag
+
+import org.apache.comet.CometConf
+
/**
* This test suite contains tests for only Spark 3.4+.
*/
@@ -40,7 +45,8 @@ class CometExec3_4PlusSuite extends CometTestBase {
override def beforeAll(): Unit = {
super.beforeAll()
// Register 'might_contain' to builtin.
- spark.sessionState.functionRegistry.registerFunction(func_might_contain,
+ spark.sessionState.functionRegistry.registerFunction(
+ func_might_contain,
new ExpressionInfo(classOf[BloomFilterMightContain].getName,
"might_contain"),
(children: Seq[Expression]) => BloomFilterMightContain(children.head,
children(1)))
}
@@ -126,8 +132,7 @@ class CometExec3_4PlusSuite extends CometTestBase {
withTable(table) {
sql(s"create table $table(col1 long, col2 int) using parquet")
sql(s"insert into $table values (201, 1)")
- checkSparkAnswerAndOperator(
- s"""
+ checkSparkAnswerAndOperator(s"""
|SELECT might_contain(
|X'00000001000000050000000343A2EC6EA8C117E2D3CDB767296B144FC5BFBCED9737F267',
col1) FROM $table
|""".stripMargin)
@@ -140,8 +145,7 @@ class CometExec3_4PlusSuite extends CometTestBase {
withTable(table) {
sql(s"create table $table(col1 long, col2 int) using parquet")
sql(s"insert into $table values (201, 1), (null, 2)")
- checkSparkAnswerAndOperator(
- s"""
+ checkSparkAnswerAndOperator(s"""
|SELECT might_contain(null, null) both_null,
| might_contain(null, 1L) null_bf,
| might_contain(
@@ -157,18 +161,26 @@ class CometExec3_4PlusSuite extends CometTestBase {
withTable(table) {
sql(s"create table $table(col1 long, col2 binary) using parquet")
- spark.createDataset(longs).map(x => (x, bfBytes)).toDF("col1",
"col2").write.insertInto(table)
- val df = spark.table(table).select(new
Column(BloomFilterMightContain(lit(bfBytes).expr, col("col1").expr)))
+ spark
+ .createDataset(longs)
+ .map(x => (x, bfBytes))
+ .toDF("col1", "col2")
+ .write
+ .insertInto(table)
+ val df = spark
+ .table(table)
+ .select(new Column(BloomFilterMightContain(lit(bfBytes).expr,
col("col1").expr)))
checkSparkAnswerAndOperator(df)
// check with scalar subquery
- checkSparkAnswerAndOperator(
- s"""
+ checkSparkAnswerAndOperator(s"""
|SELECT might_contain((select first(col2) as col2 from $table),
col1) FROM $table
|""".stripMargin)
}
}
- private def bloomFilterFromRandomInput(expectedItems: Long, expectedBits:
Long): (Seq[Long], Array[Byte]) = {
+ private def bloomFilterFromRandomInput(
+ expectedItems: Long,
+ expectedBits: Long): (Seq[Long], Array[Byte]) = {
val bf = BloomFilter.create(expectedItems, expectedBits)
val longs = (0 until expectedItems.toInt).map(_ => Random.nextLong())
longs.foreach(bf.put)
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 3b4a78690..91a8bdf8b 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -371,8 +371,7 @@ class CometExecSuite extends CometTestBase {
Seq("true", "false").foreach { aqeEnabled =>
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled,
- // `REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION` is a new config in
Spark 3.3+.
- "spark.sql.requireAllClusterKeysForDistribution" -> "true",
+ SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key -> "true",
CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
val df =
Seq(("a", 1, 1), ("a", 2, 2), ("b", 1, 3), ("b", 1, 4)).toDF("key1",
"key2", "value")
diff --git
a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
index 6c5951426..5128a44f4 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala
@@ -146,7 +146,7 @@ class CometTPCHQuerySuite extends QueryTest with TPCBase
with ShimCometTPCHQuery
val (schema, output) =
handleExceptions(getNormalizedQueryExecutionResult(spark, query))
val queryString = query.trim
val outputString = output.mkString("\n").replaceAll("\\s+$", "")
- if (shouldRegenerateGoldenFiles) {
+ if (regenerateGoldenFiles) {
val goldenOutput = {
s"-- Automatically generated by ${getClass.getSimpleName}\n\n" +
"-- !query schema\n" +
@@ -227,7 +227,7 @@ class CometTPCHQuerySuite extends QueryTest with TPCBase
with ShimCometTPCHQuery
val allJoinConfCombinations: Seq[Map[String, String]] =
Seq(sortMergeJoinConf, broadcastHashJoinConf, shuffledHashJoinConf)
- val joinConfs: Seq[Map[String, String]] = if (shouldRegenerateGoldenFiles) {
+ val joinConfs: Seq[Map[String, String]] = if (regenerateGoldenFiles) {
require(
!sys.env.contains("SPARK_TPCH_JOIN_CONF"),
"'SPARK_TPCH_JOIN_CONF' cannot be set together with
'SPARK_GENERATE_GOLDEN_FILES'")
@@ -269,8 +269,4 @@ class CometTPCHQuerySuite extends QueryTest with TPCBase
with ShimCometTPCHQuery
} else {
ignore("skipped because env `SPARK_TPCH_DATA` is not set") {}
}
-
- // TODO: remove once Spark 3.3 is no longer supported
- private def shouldRegenerateGoldenFiles: Boolean =
- System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1"
}
diff --git
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
index 2855e553c..9f611612f 100644
---
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
@@ -282,7 +282,7 @@ trait CometPlanStabilitySuite extends
DisableAdaptiveExecutionSuite with TPCDSBa
assert(ValidateRequirements.validate(plan))
- if (shouldRegenerateGoldenFiles) {
+ if (regenerateGoldenFiles) {
generateGoldenFile(plan, query + suffix, explain)
} else {
checkWithApproved(plan, query + suffix, explain)
@@ -306,10 +306,6 @@ trait CometPlanStabilitySuite extends
DisableAdaptiveExecutionSuite with TPCDSBa
new TestSparkSession(new SparkContext("local[1]",
this.getClass.getCanonicalName, conf))
}
-
- // TODO: remove once Spark 3.3 is no longer supported
- private val shouldRegenerateGoldenFiles: Boolean =
- System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1"
}
class CometTPCDSV1_4_PlanStabilitySuite extends CometPlanStabilitySuite {
diff --git
a/spark/src/test/spark-pre-3.5/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala
b/spark/src/test/spark-3.4/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala
similarity index 100%
rename from
spark/src/test/spark-pre-3.5/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala
rename to
spark/src/test/spark-3.4/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]