This is an automated email from the ASF dual-hosted git repository. changchen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
commit 9026029bc1903f5e88849c0479abd738295e898c Author: Chang chen <[email protected]> AuthorDate: Tue Jan 6 16:52:34 2026 +0800 Spark: Initial support for 4.1.0 | Cause | Type | Category | Description | Affected Files | |-------|------|----------|-------------|----------------| | - | Feat | Feature | Introduce Spark41Shims and update build configuration to support Spark 4.1. | pom.xml<br>shims/pom.xml<br>shims/spark41/pom.xml<br>shims/spark41/.../META-INF/services/org.apache.gluten.sql.shims.SparkShimProvider<br>shims/spark41/.../spark41/Spark41Shims.scala<br>shims/spark41/.../spark41/SparkShimProvider.scala | | [#51477](https://github.com/apache/spark/pull/51477) | Fix | Compatibility | Use class name instead of class object for streaming call detection to ensure Spark 4.1 compatibility. | gluten-core/.../caller/CallerInfo.scala | | [#50852](https://github.com/apache/spark/pull/50852) | Fix | Compatibility | Add printOutputColumns parameter to generateTreeString methods | shims/spark41/.../GenerateTreeStringShim.scala | | [#51775](https://github.com/apache/spark/pull/51775) | Fix | Compatibility | Remove unused MDC import in FileSourceScanExecShim.scala | shims/spark41/.../FileSourceScanExecShim.scala | | [#51979](https://github.com/apache/spark/pull/51979) | Fix | Compatibility | Add missing StoragePartitionJoinParams import in BatchScanExecShim and AbstractBatchScanExec | shims/spark41/.../v2/AbstractBatchScanExec.scala<br>shims/spark41/.../v2/BatchScanExecShim.scala | | [#51302](https://github.com/apache/spark/pull/51302) | Fix | Compatibility | Remove TimeAdd from ExpressionConverter and ExpressionMappings for test | gluten-substrait/.../ExpressionConverter.scala<br>gluten-substrait/.../ExpressionMappings.scala | | [#50598](https://github.com/apache/spark/pull/50598) | Fix | Compatibility | Adapt to QueryExecution.createSparkPlan interface change | gluten-substrait/.../GlutenImplicits.scala<br>shims/spark\*/.../shims/spark\*/Spark*Shims.scala | | [#52599](https://github.com/apache/spark/pull/52599) | Fix | Compatibility | Adapt to DataSourceV2Relation interface change | backends-velox/.../ArrowConvertorRule.scala | | [#52384](https://github.com/apache/spark/pull/52384) | Fix | Compatibility | Using new interface of ParquetFooterReader | backends-velox/.../ParquetMetadataUtils.scala<br>gluten-ut/spark40/.../parquet/GlutenParquetRowIndexSuite.scala<br>shims/spark*/.../parquet/ParquetFooterReaderShim.scala | | [#52509](https://github.com/apache/spark/pull/52509) | Fix | Build | Update Scala version to 2.13.17 in pom.xml to fix `java.lang.NoSuchMethodError: 'java.lang.String scala.util.hashing.MurmurHash3$.caseClassHash$default$2()'` | pom.xml | | - | Fix | Test | Refactor Spark version checks in VeloxHashJoinSuite to improve readability and maintainability | backends-velox/.../VeloxHashJoinSuite.scala | | [#50849](https://github.com/apache/spark/pull/50849) | Fix | Test | Fix MiscOperatorSuite to support OneRowRelationExec plan Spark 4.1 | backends-velox/.../MiscOperatorSuite.scala | | [#52723](https://github.com/apache/spark/pull/52723) | Fix | Compatibility | Add GeographyVal and GeometryVal support in ColumnarArrayShim | shims/spark41/.../vectorized/ColumnarArrayShim.java | | [#48470](https://github.com/apache/spark/pull/48470) | 4.1.0 | Exclude | Exclude split test in VeloxStringFunctionsSuite | backends-velox/.../VeloxStringFunctionsSuite.scala | | [#51259](https://github.com/apache/spark/pull/51259) | 4.1.0 | Exclude | Only Run ArrowEvalPythonExecSuite tests up to Spark 4.0, we need update ci python to 3.10 | backends-velox/.../python/ArrowEvalPythonExecSuite.scala | --- .github/workflows/util/install-spark-resources.sh | 5 + .github/workflows/velox_backend_x86.yml | 106 +++++++++++++++++++++ .../gluten/extension/ArrowConvertorRule.scala | 44 +++++---- .../apache/gluten/utils/ParquetMetadataUtils.scala | 4 +- .../gluten/execution/MiscOperatorSuite.scala | 6 +- .../gluten/execution/VeloxHashJoinSuite.scala | 7 +- .../execution/VeloxStringFunctionsSuite.scala | 3 +- .../python/ArrowEvalPythonExecSuite.scala | 6 +- dev/format-scala-code.sh | 2 +- .../gluten/extension/caller/CallerInfo.scala | 9 +- .../org/apache/spark/util/SparkVersionUtil.scala | 1 + .../gluten/expression/ExpressionConverter.scala | 7 -- .../gluten/expression/ExpressionMappings.scala | 1 - .../spark/sql/execution/GlutenImplicits.scala | 8 +- .../parquet/GlutenParquetRowIndexSuite.scala | 2 +- pom.xml | 82 +++++++++++++++- .../org/apache/gluten/sql/shims/SparkShims.scala | 10 ++ shims/pom.xml | 6 ++ .../gluten/sql/shims/spark32/Spark32Shims.scala | 14 ++- .../parquet/ParquetFooterReaderShim.scala | 42 ++++++++ .../gluten/sql/shims/spark33/Spark33Shims.scala | 14 ++- .../parquet/ParquetFooterReaderShim.scala | 42 ++++++++ .../gluten/sql/shims/spark34/Spark34Shims.scala | 12 +++ .../parquet/ParquetFooterReaderShim.scala | 42 ++++++++ .../gluten/sql/shims/spark35/Spark35Shims.scala | 12 +++ .../parquet/ParquetFooterReaderShim.scala | 42 ++++++++ .../gluten/sql/shims/spark40/Spark40Shims.scala | 13 +++ .../parquet/ParquetFooterReaderShim.scala | 42 ++++++++ .../sql/execution/datasources/v2/Spark35Scan.scala | 50 ---------- shims/spark41/pom.xml | 4 +- .../execution/vectorized/ColumnarArrayShim.java | 12 +++ .../org.apache.gluten.sql.shims.SparkShimProvider | 2 +- .../gluten/execution/GenerateTreeStringShim.scala | 6 +- .../Spark41Shims.scala} | 16 +++- .../{spark40 => spark41}/SparkShimProvider.scala | 4 +- .../sql/execution/FileSourceScanExecShim.scala | 1 - .../parquet/ParquetFooterReaderShim.scala | 43 +++++++++ .../datasources/v2/AbstractBatchScanExec.scala | 1 + .../datasources/v2/BatchScanExecShim.scala | 1 + .../sql/execution/datasources/v2/Spark35Scan.scala | 50 ---------- 40 files changed, 616 insertions(+), 158 deletions(-) diff --git a/.github/workflows/util/install-spark-resources.sh b/.github/workflows/util/install-spark-resources.sh index 4d1dd27a9c..d245dbb4ac 100755 --- a/.github/workflows/util/install-spark-resources.sh +++ b/.github/workflows/util/install-spark-resources.sh @@ -119,6 +119,11 @@ case "$1" in cd ${INSTALL_DIR} && \ install_spark "4.0.1" "3" "2.12" ;; +4.1) + # Spark-4.x, scala 2.12 // using 2.12 as a hack as 4.0 does not have 2.13 suffix + cd ${INSTALL_DIR} && \ + install_spark "4.1.0" "3" "2.12" + ;; *) echo "Spark version is expected to be specified." exit 1 diff --git a/.github/workflows/velox_backend_x86.yml b/.github/workflows/velox_backend_x86.yml index 9adde6c5ce..5ef70a79bc 100644 --- a/.github/workflows/velox_backend_x86.yml +++ b/.github/workflows/velox_backend_x86.yml @@ -1481,3 +1481,109 @@ jobs: **/target/*.log **/gluten-ut/**/hs_err_*.log **/gluten-ut/**/core.* + + spark-test-spark41: + needs: build-native-lib-centos-7 + runs-on: ubuntu-22.04 + env: + SPARK_TESTING: true + container: apache/gluten:centos-8-jdk17 + steps: + - uses: actions/checkout@v2 + - name: Download All Artifacts + uses: actions/download-artifact@v4 + with: + name: velox-native-lib-centos-7-${{github.sha}} + path: ./cpp/build/releases + - name: Download Arrow Jars + uses: actions/download-artifact@v4 + with: + name: arrow-jars-centos-7-${{github.sha}} + path: /root/.m2/repository/org/apache/arrow/ + - name: Prepare + run: | + dnf module -y install python39 && \ + alternatives --set python3 /usr/bin/python3.9 && \ + pip3 install setuptools==77.0.3 && \ + pip3 install pyspark==3.5.5 cython && \ + pip3 install pandas==2.2.3 pyarrow==20.0.0 + - name: Prepare Spark Resources for Spark 4.1.0 #TODO remove after image update + run: | + rm -rf /opt/shims/spark41 + bash .github/workflows/util/install-spark-resources.sh 4.1 + mv /opt/shims/spark41/spark_home/assembly/target/scala-2.12 /opt/shims/spark41/spark_home/assembly/target/scala-2.13 + - name: Build and Run unit test for Spark 4.1.0 with scala-2.13 (other tests) + run: | + cd $GITHUB_WORKSPACE/ + export SPARK_SCALA_VERSION=2.13 + yum install -y java-17-openjdk-devel + export JAVA_HOME=/usr/lib/jvm/java-17-openjdk + export PATH=$JAVA_HOME/bin:$PATH + java -version + $MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \ + -Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \ + -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest + - name: Upload test report + if: always() + uses: actions/upload-artifact@v4 + with: + name: ${{ github.job }}-report + path: '**/surefire-reports/TEST-*.xml' + - name: Upload unit tests log files + if: ${{ !success() }} + uses: actions/upload-artifact@v4 + with: + name: ${{ github.job }}-test-log + path: | + **/target/*.log + **/gluten-ut/**/hs_err_*.log + **/gluten-ut/**/core.* + + spark-test-spark41-slow: + needs: build-native-lib-centos-7 + runs-on: ubuntu-22.04 + env: + SPARK_TESTING: true + container: apache/gluten:centos-8-jdk17 + steps: + - uses: actions/checkout@v2 + - name: Download All Artifacts + uses: actions/download-artifact@v4 + with: + name: velox-native-lib-centos-7-${{github.sha}} + path: ./cpp/build/releases + - name: Download Arrow Jars + uses: actions/download-artifact@v4 + with: + name: arrow-jars-centos-7-${{github.sha}} + path: /root/.m2/repository/org/apache/arrow/ + - name: Prepare Spark Resources for Spark 4.1.0 #TODO remove after image update + run: | + rm -rf /opt/shims/spark41 + bash .github/workflows/util/install-spark-resources.sh 4.1 + mv /opt/shims/spark41/spark_home/assembly/target/scala-2.12 /opt/shims/spark41/spark_home/assembly/target/scala-2.13 + - name: Build and Run unit test for Spark 4.0 (slow tests) + run: | + cd $GITHUB_WORKSPACE/ + yum install -y java-17-openjdk-devel + export JAVA_HOME=/usr/lib/jvm/java-17-openjdk + export PATH=$JAVA_HOME/bin:$PATH + java -version + $MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox -Pspark-ut \ + -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \ + -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest + - name: Upload test report + if: always() + uses: actions/upload-artifact@v4 + with: + name: ${{ github.job }}-report + path: '**/surefire-reports/TEST-*.xml' + - name: Upload unit tests log files + if: ${{ !success() }} + uses: actions/upload-artifact@v4 + with: + name: ${{ github.job }}-test-log + path: | + **/target/*.log + **/gluten-ut/**/hs_err_*.log + **/gluten-ut/**/core.* diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala index 25371be8d1..925f2a6be9 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala @@ -38,6 +38,24 @@ import java.nio.charset.StandardCharsets import scala.collection.convert.ImplicitConversions.`map AsScala` +/** + * Extracts a CSVTable from a DataSourceV2Relation. + * + * Only the table variable of DataSourceV2Relation is accessed to improve compatibility across + * different Spark versions. + * @since Spark + * 4.1 + */ +private object CSVTableExtractor { + def unapply(relation: DataSourceV2Relation): Option[(DataSourceV2Relation, CSVTable)] = { + relation.table match { + case t: CSVTable => + Some((relation, t)) + case _ => None + } + } +} + @Experimental case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { @@ -56,25 +74,15 @@ case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] { l.copy(relation = r.copy(fileFormat = new ArrowCSVFileFormat(csvOptions))(session)) case _ => l } - case d @ DataSourceV2Relation( - t @ CSVTable( - name, - sparkSession, - options, - paths, - userSpecifiedSchema, - fallbackFileFormat), - _, - _, - _, - _) if validate(session, t.dataSchema, options.asCaseSensitiveMap().toMap) => + case CSVTableExtractor(d, t) + if validate(session, t.dataSchema, t.options.asCaseSensitiveMap().toMap) => d.copy(table = ArrowCSVTable( - "arrow" + name, - sparkSession, - options, - paths, - userSpecifiedSchema, - fallbackFileFormat)) + "arrow" + t.name, + t.sparkSession, + t.options, + t.paths, + t.userSpecifiedSchema, + t.fallbackFileFormat)) case r => r } diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala index 6239ab5ad7..ab76cba4aa 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala @@ -21,7 +21,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.DataSourceUtils -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReader, ParquetOptions} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReaderShim, ParquetOptions} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path} @@ -135,7 +135,7 @@ object ParquetMetadataUtils extends Logging { parquetOptions: ParquetOptions): Option[String] = { val footer = try { - ParquetFooterReader.readFooter(conf, fileStatus, ParquetMetadataConverter.NO_FILTER) + ParquetFooterReaderShim.readFooter(conf, fileStatus, ParquetMetadataConverter.NO_FILTER) } catch { case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) => return Some("Encrypted Parquet footer detected.") diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala index c313242091..cfddfb8e21 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala @@ -753,7 +753,11 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa val df = sql("SELECT 1") checkAnswer(df, Row(1)) val plan = df.queryExecution.executedPlan - assert(plan.find(_.isInstanceOf[RDDScanExec]).isDefined) + if (isSparkVersionGE("4.1")) { + assert(plan.find(_.getClass.getSimpleName == "OneRowRelationExec").isDefined) + } else { + assert(plan.find(_.isInstanceOf[RDDScanExec]).isDefined) + } assert(plan.find(_.isInstanceOf[ProjectExecTransformer]).isDefined) assert(plan.find(_.isInstanceOf[RowToVeloxColumnarExec]).isDefined) } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala index 53f44a2ccc..5958baa377 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala @@ -92,12 +92,9 @@ class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite { // The computing is combined into one single whole stage transformer. val wholeStages = plan.collect { case wst: WholeStageTransformer => wst } - if (SparkShimLoader.getSparkVersion.startsWith("3.2.")) { + if (isSparkVersionLE("3.2")) { assert(wholeStages.length == 1) - } else if ( - SparkShimLoader.getSparkVersion.startsWith("3.5.") || - SparkShimLoader.getSparkVersion.startsWith("4.0.") - ) { + } else if (isSparkVersionGE("3.5")) { assert(wholeStages.length == 5) } else { assert(wholeStages.length == 3) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala index 06f0acb784..37f13bcea8 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala @@ -544,7 +544,8 @@ class VeloxStringFunctionsSuite extends VeloxWholeStageTransformerSuite { s"from $LINEITEM_TABLE limit 5") { _ => } } - testWithMinSparkVersion("split", "3.4") { + // TODO: fix on spark-4.1 + testWithSpecifiedSparkVersion("split", "3.4", "3.5") { runQueryAndCompare( s"select l_orderkey, l_comment, split(l_comment, '') " + s"from $LINEITEM_TABLE limit 5") { diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala index 52a17995f3..f8e2554da7 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala @@ -39,7 +39,8 @@ class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite { .set("spark.executor.cores", "1") } - test("arrow_udf test: without projection") { + // TODO: fix on spark-4.1 + testWithMaxSparkVersion("arrow_udf test: without projection", "4.0") { lazy val base = Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 1), ("3", 0)) .toDF("a", "b") @@ -59,7 +60,8 @@ class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite { checkAnswer(df2, expected) } - test("arrow_udf test: with unrelated projection") { + // TODO: fix on spark-4.1 + testWithMaxSparkVersion("arrow_udf test: with unrelated projection", "4.0") { lazy val base = Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 1), ("3", 0)) .toDF("a", "b") diff --git a/dev/format-scala-code.sh b/dev/format-scala-code.sh index 96a782405b..4d2fba5ca1 100755 --- a/dev/format-scala-code.sh +++ b/dev/format-scala-code.sh @@ -22,7 +22,7 @@ MVN_CMD="${BASEDIR}/../build/mvn" # If a new profile is introduced for new modules, please add it here to ensure # the new modules are covered. PROFILES="-Pbackends-velox -Pceleborn,uniffle -Piceberg,delta,hudi,paimon \ - -Pspark-3.2,spark-3.3,spark-3.4,spark-3.5,spark-4.0 -Pspark-ut" + -Pspark-3.2,spark-3.3,spark-3.4,spark-3.5,spark-4.0,spark-4.1 -Pspark-ut" COMMAND=$1 diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala index 732c898285..7dfaaaa774 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala @@ -18,7 +18,7 @@ package org.apache.gluten.extension.caller import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.columnar.InMemoryRelation -import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.util.SparkVersionUtil /** * Helper API that stores information about the call site of the columnar rule. Specific columnar @@ -70,7 +70,12 @@ object CallerInfo { } private def inStreamingCall(stack: Seq[StackTraceElement]): Boolean = { - stack.exists(_.getClassName.equals(StreamExecution.getClass.getName.split('$').head)) + val streamName = if (SparkVersionUtil.gteSpark41) { + "org.apache.spark.sql.execution.streaming.runtime.StreamExecution" + } else { + "org.apache.spark.sql.execution.streaming.StreamExecution" + } + stack.exists(_.getClassName.equals(streamName)) } private def inBloomFilterStatFunctionCall(stack: Seq[StackTraceElement]): Boolean = { diff --git a/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala b/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala index efa0c63dca..50114ab702 100644 --- a/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala +++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala @@ -25,6 +25,7 @@ object SparkVersionUtil { val gteSpark33: Boolean = comparedWithSpark33 >= 0 val gteSpark35: Boolean = comparedWithSpark35 >= 0 val gteSpark40: Boolean = compareMajorMinorVersion((4, 0)) >= 0 + val gteSpark41: Boolean = compareMajorMinorVersion((4, 1)) >= 0 // Returns X. X < 0 if one < other, x == 0 if one == other, x > 0 if one > other. def compareMajorMinorVersion(other: (Int, Int)): Int = { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index 52f6d31d1d..418de8578f 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -856,13 +856,6 @@ object ExpressionConverter extends SQLConfHelper with Logging { dateAdd.children, dateAdd ) - case timeAdd: TimeAdd => - BackendsApiManager.getSparkPlanExecApiInstance.genDateAddTransformer( - attributeSeq, - substraitExprName, - timeAdd.children, - timeAdd - ) case ss: StringSplit => BackendsApiManager.getSparkPlanExecApiInstance.genStringSplitTransformer( substraitExprName, diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala index b0b7c80793..b13aced2a6 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala @@ -179,7 +179,6 @@ object ExpressionMappings { Sig[Second](EXTRACT), Sig[FromUnixTime](FROM_UNIXTIME), Sig[DateAdd](DATE_ADD), - Sig[TimeAdd](TIMESTAMP_ADD), Sig[DateSub](DATE_SUB), Sig[DateDiff](DATE_DIFF), Sig[ToUnixTimestamp](TO_UNIX_TIMESTAMP), diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala index 474a271769..7267ce56ba 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.execution import org.apache.gluten.exception.GlutenException import org.apache.gluten.execution.{GlutenPlan, WholeStageTransformer} +import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.utils.PlanUtil - import org.apache.spark.sql.{Column, Dataset, SparkSession} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan} @@ -130,10 +130,8 @@ object GlutenImplicits { val (innerNumGlutenNodes, innerFallbackNodeToReason) = withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { // re-plan manually to skip cached data - val newSparkPlan = QueryExecution.createSparkPlan( - spark, - spark.sessionState.planner, - p.inputPlan.logicalLink.get) + val newSparkPlan = SparkShimLoader.getSparkShims.createSparkPlan( + spark, spark.sessionState.planner, p.inputPlan.logicalLink.get) val newExecutedPlan = QueryExecution.prepareExecutedPlan( spark, newSparkPlan diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala index 5cf41b7a9e..570b6d5e0c 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetRowIndexSuite.scala @@ -42,7 +42,7 @@ class GlutenParquetRowIndexSuite extends ParquetRowIndexSuite with GlutenSQLTest import testImplicits._ private def readRowGroupRowCounts(path: String): Seq[Long] = { - ParquetFooterReader + ParquetFooterReaderShim .readFooter( spark.sessionState.newHadoopConf(), new Path(path), diff --git a/pom.xml b/pom.xml index c87d46ceb5..b88fa8dfee 100644 --- a/pom.xml +++ b/pom.xml @@ -938,7 +938,7 @@ <profile> <id>scala-2.13</id> <properties> - <scala.version>2.13.16</scala.version> + <scala.version>2.13.17</scala.version> <scala.binary.version>2.13</scala.binary.version> <spotless.scalafmt.version>3.8.3</spotless.scalafmt.version> </properties> @@ -1270,6 +1270,86 @@ </plugins> </build> </profile> + <profile> + <id>spark-4.1</id> + <properties> + <sparkbundle.version>4.1</sparkbundle.version> + <sparkshim.artifactId>spark-sql-columnar-shims-spark41</sparkshim.artifactId> + <spark.version>4.1.0</spark.version> + <iceberg.version>1.10.0</iceberg.version> + <delta.package.name>delta-spark</delta.package.name> + <delta.version>4.0.0</delta.version> + <delta.binary.version>40</delta.binary.version> + <hudi.version>1.1.0</hudi.version> + <paimon.version>1.3.0</paimon.version> + <fasterxml.version>2.18.2</fasterxml.version> + <fasterxml.jackson.version>2.18.2</fasterxml.jackson.version> + <fasterxml.jackson.databind.version>2.18.2</fasterxml.jackson.databind.version> + <hadoop.version>3.4.1</hadoop.version> + <antlr4.version>4.13.1</antlr4.version> + <guava.version>33.4.0-jre</guava.version> + <slf4j.version>2.0.16</slf4j.version> + <log4j.version>2.24.3</log4j.version> + <commons-lang3.version>3.17.0</commons-lang3.version> + <arrow.version>18.1.0</arrow.version> + <arrow-gluten.version>18.1.0</arrow-gluten.version> + </properties> + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j2-impl</artifactId> + <version>${log4j.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <executions> + <execution> + <id>enforce-java-17+</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <rules> + <requireActiveProfile> + <!-- Spark 4.1 requires Java 17+ --> + <profiles>java-17,java-21</profiles> + <all>false</all> + <message>"-P spark-4.1" requires JDK 17+</message> + </requireActiveProfile> + </rules> + </configuration> + </execution> + <execution> + <id>enforce-scala-213</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <rules> + <requireActiveProfile> + <!-- Spark 4.1 requires Scala 2.13 --> + <profiles>scala-2.13</profiles> + <message>"-P spark-4.1" requires Scala 2.13</message> + </requireActiveProfile> + </rules> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> <profile> <id>hadoop-2.7.4</id> <properties> diff --git a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala index de220cab82..9164f4b7c4 100644 --- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala @@ -355,4 +355,14 @@ trait SparkShims { def unsupportedCodec: Seq[CompressionCodecName] = { Seq(CompressionCodecName.LZO, CompressionCodecName.BROTLI) } + + /** + * Shim layer for QueryExecution to maintain compatibility across different Spark versions. + * @since Spark + * 4.1 + */ + def createSparkPlan( + sparkSession: SparkSession, + planner: SparkPlanner, + plan: LogicalPlan): SparkPlan } diff --git a/shims/pom.xml b/shims/pom.xml index 9a8e639e04..8c10ce640f 100644 --- a/shims/pom.xml +++ b/shims/pom.xml @@ -98,6 +98,12 @@ <module>spark40</module> </modules> </profile> + <profile> + <id>spark-4.1</id> + <modules> + <module>spark41</module> + </modules> + </profile> <profile> <id>default</id> <activation> diff --git a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala index 367b66f9c4..d1bf07e64b 100644 --- a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala +++ b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil, SparkPlan} +import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil, QueryExecution, SparkPlan, SparkPlanner} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters @@ -303,4 +303,16 @@ class Spark32Shims extends SparkShims { override def getErrorMessage(raiseError: RaiseError): Option[Expression] = { Some(raiseError.child) } + + /** + * Shim layer for QueryExecution to maintain compatibility across different Spark versions. + * + * @since Spark + * 4.1 + */ + override def createSparkPlan( + sparkSession: SparkSession, + planner: SparkPlanner, + plan: LogicalPlan): SparkPlan = + QueryExecution.createSparkPlan(sparkSession, planner, plan) } diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala new file mode 100644 index 0000000000..b1419e5e62 --- /dev/null +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.metadata.ParquetMetadata + +/** Shim layer for ParquetFooterReader to maintain compatibility across different Spark versions. */ +object ParquetFooterReaderShim { + + /** @since Spark 4.1 */ + def readFooter( + configuration: Configuration, + fileStatus: FileStatus, + filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = { + ParquetFooterReader.readFooter(configuration, fileStatus, filter) + } + + /** @since Spark 4.1 */ + def readFooter( + configuration: Configuration, + file: Path, + filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = { + ParquetFooterReader.readFooter(configuration, file, filter) + } +} diff --git a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala index 2b6affcded..a18fb31719 100644 --- a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala +++ b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TimestampFormatte import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil, SparkPlan} +import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil, QueryExecution, SparkPlan, SparkPlanner} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters @@ -408,4 +408,16 @@ class Spark33Shims extends SparkShims { override def getErrorMessage(raiseError: RaiseError): Option[Expression] = { Some(raiseError.child) } + + /** + * Shim layer for QueryExecution to maintain compatibility across different Spark versions. + * + * @since Spark + * 4.1 + */ + override def createSparkPlan( + sparkSession: SparkSession, + planner: SparkPlanner, + plan: LogicalPlan): SparkPlan = + QueryExecution.createSparkPlan(sparkSession, planner, plan) } diff --git a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala new file mode 100644 index 0000000000..b1419e5e62 --- /dev/null +++ b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.metadata.ParquetMetadata + +/** Shim layer for ParquetFooterReader to maintain compatibility across different Spark versions. */ +object ParquetFooterReaderShim { + + /** @since Spark 4.1 */ + def readFooter( + configuration: Configuration, + fileStatus: FileStatus, + filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = { + ParquetFooterReader.readFooter(configuration, fileStatus, filter) + } + + /** @since Spark 4.1 */ + def readFooter( + configuration: Configuration, + file: Path, + filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = { + ParquetFooterReader.readFooter(configuration, file, filter) + } +} diff --git a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala index 39dd71a6bc..cdbeaa4783 100644 --- a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala +++ b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala @@ -651,4 +651,16 @@ class Spark34Shims extends SparkShims { override def getErrorMessage(raiseError: RaiseError): Option[Expression] = { Some(raiseError.child) } + + /** + * Shim layer for QueryExecution to maintain compatibility across different Spark versions. + * + * @since Spark + * 4.1 + */ + override def createSparkPlan( + sparkSession: SparkSession, + planner: SparkPlanner, + plan: LogicalPlan): SparkPlan = + QueryExecution.createSparkPlan(sparkSession, planner, plan) } diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala new file mode 100644 index 0000000000..b1419e5e62 --- /dev/null +++ b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.metadata.ParquetMetadata + +/** Shim layer for ParquetFooterReader to maintain compatibility across different Spark versions. */ +object ParquetFooterReaderShim { + + /** @since Spark 4.1 */ + def readFooter( + configuration: Configuration, + fileStatus: FileStatus, + filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = { + ParquetFooterReader.readFooter(configuration, fileStatus, filter) + } + + /** @since Spark 4.1 */ + def readFooter( + configuration: Configuration, + file: Path, + filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = { + ParquetFooterReader.readFooter(configuration, file, filter) + } +} diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index 78da08190f..d993cc0bfd 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -702,4 +702,16 @@ class Spark35Shims extends SparkShims { override def getErrorMessage(raiseError: RaiseError): Option[Expression] = { Some(raiseError.child) } + + /** + * Shim layer for QueryExecution to maintain compatibility across different Spark versions. + * + * @since Spark + * 4.1 + */ + override def createSparkPlan( + sparkSession: SparkSession, + planner: SparkPlanner, + plan: LogicalPlan): SparkPlan = + QueryExecution.createSparkPlan(sparkSession, planner, plan) } diff --git a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala new file mode 100644 index 0000000000..b1419e5e62 --- /dev/null +++ b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.metadata.ParquetMetadata + +/** Shim layer for ParquetFooterReader to maintain compatibility across different Spark versions. */ +object ParquetFooterReaderShim { + + /** @since Spark 4.1 */ + def readFooter( + configuration: Configuration, + fileStatus: FileStatus, + filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = { + ParquetFooterReader.readFooter(configuration, fileStatus, filter) + } + + /** @since Spark 4.1 */ + def readFooter( + configuration: Configuration, + file: Path, + filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = { + ParquetFooterReader.readFooter(configuration, file, filter) + } +} diff --git a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala index 80c22f2fad..e5258eafa4 100644 --- a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala +++ b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, InternalRowComparableWrapper, TimestampFormatter} import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec +import org.apache.spark.sql.classic.ClassicConversions._ import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, Scan} @@ -752,4 +753,16 @@ class Spark40Shims extends SparkShims { override def unsupportedCodec: Seq[CompressionCodecName] = { Seq(CompressionCodecName.LZO, CompressionCodecName.BROTLI, CompressionCodecName.LZ4_RAW) } + + /** + * Shim layer for QueryExecution to maintain compatibility across different Spark versions. + * + * @since Spark + * 4.1 + */ + override def createSparkPlan( + sparkSession: SparkSession, + planner: SparkPlanner, + plan: LogicalPlan): SparkPlan = + QueryExecution.createSparkPlan(sparkSession, planner, plan) } diff --git a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala new file mode 100644 index 0000000000..b1419e5e62 --- /dev/null +++ b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.metadata.ParquetMetadata + +/** Shim layer for ParquetFooterReader to maintain compatibility across different Spark versions. */ +object ParquetFooterReaderShim { + + /** @since Spark 4.1 */ + def readFooter( + configuration: Configuration, + fileStatus: FileStatus, + filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = { + ParquetFooterReader.readFooter(configuration, fileStatus, filter) + } + + /** @since Spark 4.1 */ + def readFooter( + configuration: Configuration, + file: Path, + filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = { + ParquetFooterReader.readFooter(configuration, file, filter) + } +} diff --git a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/Spark35Scan.scala b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/Spark35Scan.scala deleted file mode 100644 index 98fcfa5483..0000000000 --- a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/datasources/v2/Spark35Scan.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources.v2 - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, SortOrder} -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan} - -class Spark35Scan extends DataSourceV2ScanExecBase { - - override def scan: Scan = throw new UnsupportedOperationException("Spark35Scan") - - override def ordering: Option[Seq[SortOrder]] = throw new UnsupportedOperationException( - "Spark35Scan") - - override def readerFactory: PartitionReaderFactory = - throw new UnsupportedOperationException("Spark35Scan") - - override def keyGroupedPartitioning: Option[Seq[Expression]] = - throw new UnsupportedOperationException("Spark35Scan") - - override protected def inputPartitions: Seq[InputPartition] = - throw new UnsupportedOperationException("Spark35Scan") - - override def inputRDD: RDD[InternalRow] = throw new UnsupportedOperationException("Spark35Scan") - - override def output: Seq[Attribute] = throw new UnsupportedOperationException("Spark35Scan") - - override def productElement(n: Int): Any = throw new UnsupportedOperationException("Spark35Scan") - - override def productArity: Int = throw new UnsupportedOperationException("Spark35Scan") - - override def canEqual(that: Any): Boolean = throw new UnsupportedOperationException("Spark35Scan") - -} diff --git a/shims/spark41/pom.xml b/shims/spark41/pom.xml index c2433e8bdd..3d39f1a932 100644 --- a/shims/spark41/pom.xml +++ b/shims/spark41/pom.xml @@ -22,9 +22,9 @@ <relativePath>../pom.xml</relativePath> </parent> - <artifactId>spark-sql-columnar-shims-spark40</artifactId> + <artifactId>spark-sql-columnar-shims-spark41</artifactId> <packaging>jar</packaging> - <name>Gluten Shims for Spark 4.0</name> + <name>Gluten Shims for Spark 4.1</name> <dependencies> <dependency> diff --git a/shims/spark41/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArrayShim.java b/shims/spark41/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArrayShim.java index 25adf5d233..7d1347345a 100644 --- a/shims/spark41/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArrayShim.java +++ b/shims/spark41/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArrayShim.java @@ -27,6 +27,8 @@ import org.apache.spark.sql.vectorized.ColumnarArray; import org.apache.spark.sql.vectorized.ColumnarMap; import org.apache.spark.sql.vectorized.ColumnarRow; import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.GeographyVal; +import org.apache.spark.unsafe.types.GeometryVal; import org.apache.spark.unsafe.types.UTF8String; import org.apache.spark.unsafe.types.VariantVal; @@ -199,6 +201,16 @@ public class ColumnarArrayShim extends ArrayData { return data.getBinary(offset + ordinal); } + @Override + public GeographyVal getGeography(int ordinal) { + return data.getGeography(offset + ordinal); + } + + @Override + public GeometryVal getGeometry(int ordinal) { + return data.getGeometry(offset + ordinal); + } + @Override public CalendarInterval getInterval(int ordinal) { return data.getInterval(offset + ordinal); diff --git a/shims/spark41/src/main/resources/META-INF/services/org.apache.gluten.sql.shims.SparkShimProvider b/shims/spark41/src/main/resources/META-INF/services/org.apache.gluten.sql.shims.SparkShimProvider index b8136ed835..6c2e7e5181 100644 --- a/shims/spark41/src/main/resources/META-INF/services/org.apache.gluten.sql.shims.SparkShimProvider +++ b/shims/spark41/src/main/resources/META-INF/services/org.apache.gluten.sql.shims.SparkShimProvider @@ -1 +1 @@ -org.apache.gluten.sql.shims.spark40.SparkShimProvider \ No newline at end of file +org.apache.gluten.sql.shims.spark41.SparkShimProvider \ No newline at end of file diff --git a/shims/spark41/src/main/scala/org/apache/gluten/execution/GenerateTreeStringShim.scala b/shims/spark41/src/main/scala/org/apache/gluten/execution/GenerateTreeStringShim.scala index 7628b210f1..3d329a8f31 100644 --- a/shims/spark41/src/main/scala/org/apache/gluten/execution/GenerateTreeStringShim.scala +++ b/shims/spark41/src/main/scala/org/apache/gluten/execution/GenerateTreeStringShim.scala @@ -42,6 +42,7 @@ trait WholeStageTransformerGenerateTreeStringShim extends UnaryExecNode { addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, + printOutputColumns: Boolean, indent: Int = 0): Unit = { val prefix = if (printNodeId) "^ " else s"^($stageId) " child.generateTreeString( @@ -53,7 +54,8 @@ trait WholeStageTransformerGenerateTreeStringShim extends UnaryExecNode { addSuffix = false, maxFields, printNodeId = printNodeId, - indent) + printOutputColumns = printOutputColumns, + indent = indent) if (verbose && wholeStageTransformerContextDefined) { append(prefix + "Substrait plan:\n") @@ -74,6 +76,7 @@ trait InputAdapterGenerateTreeStringShim extends UnaryExecNode { addSuffix: Boolean = false, maxFields: Int, printNodeId: Boolean, + printOutputColumns: Boolean, indent: Int = 0): Unit = { child.generateTreeString( depth, @@ -84,6 +87,7 @@ trait InputAdapterGenerateTreeStringShim extends UnaryExecNode { addSuffix = false, maxFields, printNodeId, + printOutputColumns, indent) } } diff --git a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala similarity index 98% rename from shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala rename to shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala index 80c22f2fad..ea5f733614 100644 --- a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala +++ b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.sql.shims.spark40 +package org.apache.gluten.sql.shims.spark41 import org.apache.gluten.execution.PartitionedFileUtilShim import org.apache.gluten.expression.{ExpressionNames, Sig} @@ -69,7 +69,7 @@ import scala.collection.mutable import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag -class Spark40Shims extends SparkShims { +class Spark41Shims extends SparkShims { override def getDistribution( leftKeys: Seq[Expression], @@ -752,4 +752,16 @@ class Spark40Shims extends SparkShims { override def unsupportedCodec: Seq[CompressionCodecName] = { Seq(CompressionCodecName.LZO, CompressionCodecName.BROTLI, CompressionCodecName.LZ4_RAW) } + + /** + * Shim layer for QueryExecution to maintain compatibility across different Spark versions. + * + * @since Spark + * 4.1 + */ + override def createSparkPlan( + sparkSession: SparkSession, + planner: SparkPlanner, + plan: LogicalPlan): SparkPlan = + QueryExecution.createSparkPlan(planner, plan) } diff --git a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark40/SparkShimProvider.scala b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/SparkShimProvider.scala similarity index 93% rename from shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark40/SparkShimProvider.scala rename to shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/SparkShimProvider.scala index 7acf6589d4..9a5b6a6319 100644 --- a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark40/SparkShimProvider.scala +++ b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/SparkShimProvider.scala @@ -14,12 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.sql.shims.spark40 +package org.apache.gluten.sql.shims.spark41 import org.apache.gluten.sql.shims.SparkShims class SparkShimProvider extends org.apache.gluten.sql.shims.SparkShimProvider { def createShim: SparkShims = { - new Spark40Shims() + new Spark41Shims() } } diff --git a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala index 2dd8ff3867..d9a50f65c7 100644 --- a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala +++ b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala @@ -21,7 +21,6 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.Partition import org.apache.spark.internal.LogKeys.{COUNT, MAX_SPLIT_BYTES, OPEN_COST_IN_BYTES} -import org.apache.spark.internal.MDC import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec diff --git a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala new file mode 100644 index 0000000000..f4cc013ad4 --- /dev/null +++ b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReaderShim.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.metadata.ParquetMetadata +import org.apache.parquet.hadoop.util.HadoopInputFile + +/** Shim layer for ParquetFooterReader to maintain compatibility across different Spark versions. */ +object ParquetFooterReaderShim { + + /** @since Spark 4.1 */ + def readFooter( + configuration: Configuration, + fileStatus: FileStatus, + filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = { + ParquetFooterReader.readFooter(HadoopInputFile.fromStatus(fileStatus, configuration), filter) + } + + /** @since Spark 4.1 */ + def readFooter( + configuration: Configuration, + file: Path, + filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = { + ParquetFooterReader.readFooter(HadoopInputFile.fromPath(file, configuration), filter) + } +} diff --git a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala index 0e03fa0ffe..a6a46b10f4 100644 --- a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala +++ b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, Key import org.apache.spark.sql.catalyst.util.{truncatedString, InternalRowComparableWrapper} import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.execution.joins.StoragePartitionJoinParams import org.apache.spark.util.ArrayImplicits._ import com.google.common.base.Objects diff --git a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala index 242241254e..046c42ea7b 100644 --- a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala +++ b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.connector.expressions.aggregate.Aggregation import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, Scan, SupportsRuntimeV2Filtering} import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan +import org.apache.spark.sql.execution.joins.StoragePartitionJoinParams import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ArrayImplicits._ diff --git a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/Spark35Scan.scala b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/Spark35Scan.scala deleted file mode 100644 index 98fcfa5483..0000000000 --- a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/datasources/v2/Spark35Scan.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources.v2 - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, SortOrder} -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan} - -class Spark35Scan extends DataSourceV2ScanExecBase { - - override def scan: Scan = throw new UnsupportedOperationException("Spark35Scan") - - override def ordering: Option[Seq[SortOrder]] = throw new UnsupportedOperationException( - "Spark35Scan") - - override def readerFactory: PartitionReaderFactory = - throw new UnsupportedOperationException("Spark35Scan") - - override def keyGroupedPartitioning: Option[Seq[Expression]] = - throw new UnsupportedOperationException("Spark35Scan") - - override protected def inputPartitions: Seq[InputPartition] = - throw new UnsupportedOperationException("Spark35Scan") - - override def inputRDD: RDD[InternalRow] = throw new UnsupportedOperationException("Spark35Scan") - - override def output: Seq[Attribute] = throw new UnsupportedOperationException("Spark35Scan") - - override def productElement(n: Int): Any = throw new UnsupportedOperationException("Spark35Scan") - - override def productArity: Int = throw new UnsupportedOperationException("Spark35Scan") - - override def canEqual(that: Any): Boolean = throw new UnsupportedOperationException("Spark35Scan") - -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
