This is an automated email from the ASF dual-hosted git repository. changchen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
commit ed5b65e183765135b9f25435b57e9689caba986a Author: Chang chen <[email protected]> AuthorDate: Thu Jan 8 09:51:41 2026 +0800 Spark: Initial support for 4.1.0 UT ## Changes | Cause | Type | Category | Description | Affected Files | |-------|------|----------|-------------|----------------| | N/A | Feat | Build | Update build configuration to support Spark 4.1 UT | `.github/workflows/velox_backend_x86.yml`, `gluten-ut/pom.xml`, `gluten-ut/spark41/pom.xml`, `tools/gluten-it/pom.xml` | | [#52165](https://github.com/apache/spark/pull/52165) | Fix | Dependency | Update Parquet dependency version to 1.16.0 to avoid NoSuchMethodError issue | `gluten-ut/spark41/pom.xml` | | [#51477](https://github.com/apache/spark/pull/51477) | Fix | Compatibility | Update imports to reflect streaming runtime package refactoring in Apache Spark | `gluten-ut/spark41/.../GlutenDynamicPartitionPruningSuite.scala`, `gluten-ut/spark41/.../GlutenStreamingQuerySuite.scala` | | [#50674](https://github.com/apache/spark/pull/50674) | Fix | Compatibility | Fix compatibility issue introduced by `TypedConfigBuilder` | `gluten-substrait/.../ExpressionConverter.scala`, `gluten-ut/spark41/.../GlutenCSVSuite.scala`, `gluten-ut/spark41/.../GlutenJsonSuite.scala` | | [#49766](https://github.com/apache/spark/pull/49766) | Fix | Compatibility | Disable V2 bucketing in GlutenDynamicPartitionPruningSuite since spark.sql.sources.v2.bucketing.enabled is now enabled by default | `gluten-ut/spark41/.../GlutenDynamicPartitionPruningSuite.scala` | | [#42414](https://github.com/apache/spark/pull/42414), [#53038](https://github.com/apache/spark/pull/53038) | Fix | Bug Fix | Resolve an issue introduced by SPARK-42414, as identified in SPARK-53038 | `backends-velox/.../VeloxBloomFilterAggregate.scala` | | N/A | Fix | Bug Fix | Enforce row fallback for unsupported cached batches - keep columnar execution only when schema validation succeeds | `backends-velox/.../ColumnarCachedBatchSerializer.scala` | | [SPARK-53132](https://github.com/apache/spark/pull/53132), [SPARK-53142](https://github.com/apache/spark/pull/53142) | 4.1.0 | Test Exclusion | Exclude additional Spark 4.1 KeyGroupedPartitioningSuite tests. Excluded tests: `SPARK-53322*`, `SPARK-54439*` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [SPARK-53535](https://issues.apache.org/jira/browse/SPARK-53535), [SPARK-54220](https://issues.apache.org/jira/browse/SPARK-54220) | 4.1.0 | Test Exclusion | Exclude additional Spark 4.1 GlutenParquetIOSuite tests. Excluded tests: `SPARK-53535*`, `vectorized reader: missing all struct fields*`, `SPARK-54220*` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [#52645](https://github.com/apache/spark/pull/52645) | 4.1.0 | Test Exclusion | Exclude additional Spark 4.1 GlutenStreamingQuerySuite tests. Excluded tests: `SPARK-53942: changing the number of stateless shuffle partitions via config`, `SPARK-53942: stateful shuffle partitions are retained from old checkpoint` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [#47856](https://github.com/apache/spark/pull/47856) | 4.1.0 | Test Exclusion | Exclude additional Spark 4.1 GlutenDataFrameWindowFunctionsSuite and GlutenJoinSuite tests. Excluded tests: `SPARK-49386: Window spill with more than the inMemoryThreshold and spillSizeThreshold`, `SPARK-49386: test SortMergeJoin (with spill by size threshold)` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [#52157](https://github.com/apache/spark/pull/52157) | 4.1.0 | Test Exclusion | Exclude additional Spark 4.1 GlutenQueryExecutionSuite tests. Excluded test: `#53413: Cleanup shuffle dependencies for commands` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [#48470](https://github.com/apache/spark/pull/48470) | 4.1.0 | Test Exclusion | Exclude split test in GlutenRegexpExpressionsSuite. Excluded test: `GlutenRegexpExpressionsSuite.SPLIT` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [#51623](https://github.com/apache/spark/pull/51623) | 4.1.0 | Test Exclusion | Add `spark.sql.unionOutputPartitioning=false` to Maven test args. Excluded tests: `GlutenBroadcastExchangeSuite.SPARK-52962`, `GlutenDataFrameSetOperationsSuite.SPARK-52921*` | `.github/workflows/velox_backend_x86.yml`, `gluten-ut/spark41/.../VeloxTestSettings.scala`, `tools/gluten-it/common/.../Suite.scala` | | N/A | 4.1.0 | Test Exclusion | Excludes failed SQL tests that need to be fixed for Spark 4.1 compatibility. Excluded tests: `decimalArithmeticOperations.sql`, `identifier-clause.sql`, `keywords.sql`, `literals.sql`, `operators.sql`, `exists-orderby-limit.sql`, `postgreSQL/date.sql`, `nonansi/keywords.sql`, `nonansi/literals.sql`, `datetime-legacy.sql`, `datetime-parsing-invalid.sql`, `misc-functions.sql` | `gluten-ut/spark41/.../VeloxSQLQueryTestSettings.scala` | | https://github.com/apache/incubator-gluten/pull/11252 | 4.1.0 | Test Exclusion | Exclude Gluten test for SPARK-47939: Explain should work with parameterized queries | `gluten-ut/spark41/.../VeloxTestSettings.scala` | --- .github/workflows/velox_backend_x86.yml | 42 ++++-- .../aggregate/VeloxBloomFilterAggregate.scala | 45 ++++++- .../execution/ColumnarCachedBatchSerializer.scala | 146 +++++++++++---------- .../gluten/expression/ExpressionConverter.scala | 9 +- gluten-ut/pom.xml | 6 + gluten-ut/spark41/pom.xml | 12 +- .../utils/velox/VeloxSQLQueryTestSettings.scala | 24 ++-- .../gluten/utils/velox/VeloxTestSettings.scala | 26 +++- .../sql/GlutenDynamicPartitionPruningSuite.scala | 6 +- .../sql/execution/GlutenStreamingQuerySuite.scala | 2 +- .../execution/datasources/csv/GlutenCSVSuite.scala | 4 +- .../datasources/json/GlutenJsonSuite.scala | 4 +- .../org/apache/gluten/integration/Suite.scala | 1 + tools/gluten-it/pom.xml | 10 ++ 14 files changed, 229 insertions(+), 108 deletions(-) diff --git a/.github/workflows/velox_backend_x86.yml b/.github/workflows/velox_backend_x86.yml index 5ef70a79bc..51f71e829c 100644 --- a/.github/workflows/velox_backend_x86.yml +++ b/.github/workflows/velox_backend_x86.yml @@ -107,7 +107,7 @@ jobs: fail-fast: false matrix: os: [ "ubuntu:20.04", "ubuntu:22.04" ] - spark: [ "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5", "spark-4.0" ] + spark: [ "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5", "spark-4.0", "spark-4.1" ] java: [ "java-8", "java-11", "java-17", "java-21" ] # Spark supports JDK17 since 3.3. exclude: @@ -141,6 +141,10 @@ jobs: java: java-8 - spark: spark-4.0 java: java-11 + - spark: spark-4.1 + java: java-8 + - spark: spark-4.1 + java: java-11 runs-on: ubuntu-22.04 container: ${{ matrix.os }} @@ -182,11 +186,14 @@ jobs: cd $GITHUB_WORKSPACE/ export JAVA_HOME=/usr/lib/jvm/${{ matrix.java }}-openjdk-amd64 echo "JAVA_HOME: $JAVA_HOME" - if [ "${{ matrix.spark }}" = "spark-4.0" ]; then - $MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pscala-2.13 -Pbackends-velox -DskipTests - else - $MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pbackends-velox -DskipTests - fi + case "${{ matrix.spark }}" in + spark-4.0|spark-4.1) + $MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pscala-2.13 -Pbackends-velox -DskipTests + ;; + *) + $MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pbackends-velox -DskipTests + ;; + esac cd $GITHUB_WORKSPACE/tools/gluten-it $GITHUB_WORKSPACE/$MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \ @@ -200,7 +207,7 @@ jobs: fail-fast: false matrix: os: [ "centos:8" ] - spark: [ "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5", "spark-4.0" ] + spark: [ "spark-3.2", "spark-3.3", "spark-3.4", "spark-3.5", "spark-4.0", "spark-4.1" ] java: [ "java-8", "java-11", "java-17" ] # Spark supports JDK17 since 3.3. exclude: @@ -220,6 +227,10 @@ jobs: java: java-8 - spark: spark-4.0 java: java-11 + - spark: spark-4.1 + java: java-8 + - spark: spark-4.1 + java: java-11 runs-on: ubuntu-22.04 container: ${{ matrix.os }} @@ -263,11 +274,14 @@ jobs: run: | echo "JAVA_HOME: $JAVA_HOME" cd $GITHUB_WORKSPACE/ - if [ "${{ matrix.spark }}" = "spark-4.0" ]; then - $MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pscala-2.13 -Pbackends-velox -DskipTests - else - $MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pbackends-velox -DskipTests - fi + case "${{ matrix.spark }}" in + spark-4.0|spark-4.1) + $MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pscala-2.13 -Pbackends-velox -DskipTests + ;; + *) + $MVN_CMD clean install -P${{ matrix.spark }} -P${{ matrix.java }} -Pbackends-velox -DskipTests + ;; + esac cd $GITHUB_WORKSPACE/tools/gluten-it $GITHUB_WORKSPACE/build/mvn clean install -P${{ matrix.spark }} -P${{ matrix.java }} - name: Run TPC-H / TPC-DS @@ -1521,7 +1535,7 @@ jobs: export PATH=$JAVA_HOME/bin:$PATH java -version $MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \ - -Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \ + -Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/ -Dspark.sql.unionOutputPartitioning=false" \ -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest - name: Upload test report if: always() @@ -1570,7 +1584,7 @@ jobs: export PATH=$JAVA_HOME/bin:$PATH java -version $MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox -Pspark-ut \ - -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \ + -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/ -Dspark.sql.unionOutputPartitioning=false" \ -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest - name: Upload test report if: always() diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxBloomFilterAggregate.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxBloomFilterAggregate.scala index 976abb9e21..a3d6f738a2 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxBloomFilterAggregate.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxBloomFilterAggregate.scala @@ -25,10 +25,14 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate import org.apache.spark.sql.catalyst.trees.TernaryLike import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.types.DataType import org.apache.spark.task.TaskResources +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.sketch.BloomFilter +import java.io.Serializable + /** * Velox's bloom-filter implementation uses different algorithms internally comparing to vanilla * Spark so produces different intermediate aggregate data. Thus we use different filter function / @@ -61,6 +65,15 @@ case class VeloxBloomFilterAggregate( .toLong ) + // Mark as lazy so that `updater` is not evaluated during tree transformation. + private lazy val updater: BloomFilterUpdater = child.dataType match { + case LongType => LongUpdater + case IntegerType => IntUpdater + case ShortType => ShortUpdater + case ByteType => ByteUpdater + case _: StringType => BinaryUpdater + } + override def first: Expression = child override def second: Expression = estimatedNumItemsExpression @@ -97,7 +110,7 @@ case class VeloxBloomFilterAggregate( if (value == null) { return buffer } - buffer.putLong(value.asInstanceOf[Long]) + updater.update(buffer, value) buffer } @@ -128,3 +141,33 @@ case class VeloxBloomFilterAggregate( copy(inputAggBufferOffset = newOffset) } + +// see https://github.com/apache/spark/pull/42414 +private trait BloomFilterUpdater { + def update(bf: BloomFilter, v: Any): Boolean +} + +private object LongUpdater extends BloomFilterUpdater with Serializable { + override def update(bf: BloomFilter, v: Any): Boolean = + bf.putLong(v.asInstanceOf[Long]) +} + +private object IntUpdater extends BloomFilterUpdater with Serializable { + override def update(bf: BloomFilter, v: Any): Boolean = + bf.putLong(v.asInstanceOf[Int]) +} + +private object ShortUpdater extends BloomFilterUpdater with Serializable { + override def update(bf: BloomFilter, v: Any): Boolean = + bf.putLong(v.asInstanceOf[Short]) +} + +private object ByteUpdater extends BloomFilterUpdater with Serializable { + override def update(bf: BloomFilter, v: Any): Boolean = + bf.putLong(v.asInstanceOf[Byte]) +} + +private object BinaryUpdater extends BloomFilterUpdater with Serializable { + override def update(bf: BloomFilter, v: Any): Boolean = + bf.putBinary(v.asInstanceOf[UTF8String].getBytes) +} diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala index a04e7d68fb..c05eb4a2fa 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala @@ -115,24 +115,24 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with Logging { conf: SQLConf): RDD[CachedBatch] = { val localSchema = toStructType(schema) if (!validateSchema(localSchema)) { - // we can not use columnar cache here, as the `RowToColumnar` does not support this schema - return rowBasedCachedBatchSerializer.convertInternalRowToCachedBatch( + // we cannot use columnar cache here, as the `RowToColumnar` does not support this schema + rowBasedCachedBatchSerializer.convertInternalRowToCachedBatch( input, schema, storageLevel, conf) + } else { + val numRows = conf.columnBatchSize + val rddColumnarBatch = input.mapPartitions { + it => + RowToVeloxColumnarExec.toColumnarBatchIterator( + it, + localSchema, + numRows, + VeloxConfig.get.veloxPreferredBatchBytes) + } + convertColumnarBatchToCachedBatch(rddColumnarBatch, schema, storageLevel, conf) } - - val numRows = conf.columnBatchSize - val rddColumnarBatch = input.mapPartitions { - it => - RowToVeloxColumnarExec.toColumnarBatchIterator( - it, - localSchema, - numRows, - VeloxConfig.get.veloxPreferredBatchBytes) - } - convertColumnarBatchToCachedBatch(rddColumnarBatch, schema, storageLevel, conf) } override def convertCachedBatchToInternalRow( @@ -141,18 +141,18 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with Logging { selectedAttributes: Seq[Attribute], conf: SQLConf): RDD[InternalRow] = { if (!validateSchema(cacheAttributes)) { - // if we do not support this schema that means we are using row-based serializer, + // if we do not support this schema, that means we are using row-based serializer, // see `convertInternalRowToCachedBatch`, so fallback to vanilla Spark serializer - return rowBasedCachedBatchSerializer.convertCachedBatchToInternalRow( + rowBasedCachedBatchSerializer.convertCachedBatchToInternalRow( input, cacheAttributes, selectedAttributes, conf) + } else { + val rddColumnarBatch = + convertCachedBatchToColumnarBatch(input, cacheAttributes, selectedAttributes, conf) + rddColumnarBatch.mapPartitions(it => VeloxColumnarToRowExec.toRowIterator(it)) } - - val rddColumnarBatch = - convertCachedBatchToColumnarBatch(input, cacheAttributes, selectedAttributes, conf) - rddColumnarBatch.mapPartitions(it => VeloxColumnarToRowExec.toRowIterator(it)) } override def convertColumnarBatchToCachedBatch( @@ -190,58 +190,68 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with Logging { cacheAttributes: Seq[Attribute], selectedAttributes: Seq[Attribute], conf: SQLConf): RDD[ColumnarBatch] = { - // Find the ordinals and data types of the requested columns. - val requestedColumnIndices = selectedAttributes.map { - a => cacheAttributes.map(_.exprId).indexOf(a.exprId) - } - val shouldSelectAttributes = cacheAttributes != selectedAttributes - val localSchema = toStructType(cacheAttributes) - val timezoneId = SQLConf.get.sessionLocalTimeZone - input.mapPartitions { - it => - val runtime = Runtimes.contextInstance( - BackendsApiManager.getBackendName, - "ColumnarCachedBatchSerializer#read") - val jniWrapper = ColumnarBatchSerializerJniWrapper - .create(runtime) - val schema = SparkArrowUtil.toArrowSchema(localSchema, timezoneId) - val arrowAlloc = ArrowBufferAllocators.contextInstance() - val cSchema = ArrowSchema.allocateNew(arrowAlloc) - ArrowAbiUtil.exportSchema(arrowAlloc, schema, cSchema) - val deserializerHandle = jniWrapper - .init(cSchema.memoryAddress()) - cSchema.close() - - Iterators - .wrap(new Iterator[ColumnarBatch] { - override def hasNext: Boolean = it.hasNext - - override def next(): ColumnarBatch = { - val cachedBatch = it.next().asInstanceOf[CachedColumnarBatch] - val batchHandle = - jniWrapper - .deserialize(deserializerHandle, cachedBatch.bytes) - val batch = ColumnarBatches.create(batchHandle) - if (shouldSelectAttributes) { - try { - ColumnarBatches.select( - BackendsApiManager.getBackendName, - batch, - requestedColumnIndices.toArray) - } finally { - batch.close() + if (!validateSchema(cacheAttributes)) { + // if we do not support this schema, that means we are using row-based serializer, + // see `convertInternalRowToCachedBatch`, so fallback to vanilla Spark serializer + rowBasedCachedBatchSerializer.convertCachedBatchToColumnarBatch( + input, + cacheAttributes, + selectedAttributes, + conf) + } else { + // Find the ordinals and data types of the requested columns. + val requestedColumnIndices = selectedAttributes.map { + a => cacheAttributes.map(_.exprId).indexOf(a.exprId) + } + val shouldSelectAttributes = cacheAttributes != selectedAttributes + val localSchema = toStructType(cacheAttributes) + val timezoneId = SQLConf.get.sessionLocalTimeZone + input.mapPartitions { + it => + val runtime = Runtimes.contextInstance( + BackendsApiManager.getBackendName, + "ColumnarCachedBatchSerializer#read") + val jniWrapper = ColumnarBatchSerializerJniWrapper + .create(runtime) + val schema = SparkArrowUtil.toArrowSchema(localSchema, timezoneId) + val arrowAlloc = ArrowBufferAllocators.contextInstance() + val cSchema = ArrowSchema.allocateNew(arrowAlloc) + ArrowAbiUtil.exportSchema(arrowAlloc, schema, cSchema) + val deserializerHandle = jniWrapper + .init(cSchema.memoryAddress()) + cSchema.close() + + Iterators + .wrap(new Iterator[ColumnarBatch] { + override def hasNext: Boolean = it.hasNext + + override def next(): ColumnarBatch = { + val cachedBatch = it.next().asInstanceOf[CachedColumnarBatch] + val batchHandle = + jniWrapper + .deserialize(deserializerHandle, cachedBatch.bytes) + val batch = ColumnarBatches.create(batchHandle) + if (shouldSelectAttributes) { + try { + ColumnarBatches.select( + BackendsApiManager.getBackendName, + batch, + requestedColumnIndices.toArray) + } finally { + batch.close() + } + } else { + batch } - } else { - batch } + }) + .protectInvocationFlow() + .recycleIterator { + jniWrapper.close(deserializerHandle) } - }) - .protectInvocationFlow() - .recycleIterator { - jniWrapper.close(deserializerHandle) - } - .recyclePayload(_.close()) - .create() + .recyclePayload(_.close()) + .create() + } } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index 418de8578f..a810a4ef1d 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -831,7 +831,14 @@ object ExpressionConverter extends SQLConfHelper with Logging { case t: TransformKeys => // default is `EXCEPTION` val mapKeyDedupPolicy = SQLConf.get.getConf(SQLConf.MAP_KEY_DEDUP_POLICY) - if (mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { + + // Calling `.toString` on both sides ensures compatibility across all Spark versions. + // Starting from Spark 4.1, `SQLConf.get.getConf(SQLConf.MAP_KEY_DEDUP_POLICY)` returns + // an enum instead of a String. Without `.toString`, the comparison + // `mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.LAST_WIN.toString` would silently fail + // in tests, producing only a "Comparing unrelated types" warning in IntelliJ IDEA, + // but no compile-time error. + if (mapKeyDedupPolicy.toString == SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { // TODO: Remove after fix ready for // https://github.com/facebookincubator/velox/issues/10219 throw new GlutenNotSupportException( diff --git a/gluten-ut/pom.xml b/gluten-ut/pom.xml index ec0158c41e..58b4e6d658 100644 --- a/gluten-ut/pom.xml +++ b/gluten-ut/pom.xml @@ -230,5 +230,11 @@ <module>spark40</module> </modules> </profile> + <profile> + <id>spark-4.1</id> + <modules> + <module>spark41</module> + </modules> + </profile> </profiles> </project> diff --git a/gluten-ut/spark41/pom.xml b/gluten-ut/spark41/pom.xml index d169abb888..838649a21d 100644 --- a/gluten-ut/spark41/pom.xml +++ b/gluten-ut/spark41/pom.xml @@ -8,9 +8,13 @@ <relativePath>../pom.xml</relativePath> </parent> - <artifactId>gluten-ut-spark40</artifactId> + <artifactId>gluten-ut-spark41</artifactId> <packaging>jar</packaging> - <name>Gluten Unit Test Spark40</name> + <name>Gluten Unit Test Spark41</name> + + <properties> + <parquet.version>1.16.0</parquet.version> + </properties> <dependencies> <dependency> @@ -23,14 +27,14 @@ <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-column</artifactId> - <version>1.15.2</version> + <version>${parquet.version}</version> <classifier>tests</classifier> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-common</artifactId> - <version>1.15.2</version> + <version>${parquet.version}</version> <classifier>tests</classifier> <scope>test</scope> </dependency> diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxSQLQueryTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxSQLQueryTestSettings.scala index b085257f70..92c62a4ed3 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxSQLQueryTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxSQLQueryTestSettings.scala @@ -56,7 +56,7 @@ object VeloxSQLQueryTestSettings extends SQLQueryTestSettings { "current_database_catalog.sql", // "datetime-formatting-invalid.sql", "datetime-special.sql", - "decimalArithmeticOperations.sql", + // TODO: fix on Spark-4.1 "decimalArithmeticOperations.sql", "describe.sql", "describe-part-after-analyze.sql", "describe-table-after-alter-table.sql", @@ -75,7 +75,7 @@ object VeloxSQLQueryTestSettings extends SQLQueryTestSettings { "grouping_set.sql", "having.sql", "higher-order-functions.sql", - "identifier-clause.sql", + // TODO: fix on Spark-4.1 "identifier-clause.sql", "ignored.sql", "ilike.sql", "ilike-all.sql", @@ -86,11 +86,11 @@ object VeloxSQLQueryTestSettings extends SQLQueryTestSettings { "join-empty-relation.sql", "join-lateral.sql", "json-functions.sql", - "keywords.sql", + // TODO: fix on Spark-4.1 "keywords.sql", "like-all.sql", "like-any.sql", // "limit.sql", - "literals.sql", + // TODO: fix on Spark-4.1 "literals.sql", "map.sql", "mask-functions.sql", "math.sql", @@ -99,7 +99,7 @@ object VeloxSQLQueryTestSettings extends SQLQueryTestSettings { "non-excludable-rule.sql", "null-handling.sql", "null-propagation.sql", - "operators.sql", + // TODO: fix on Spark-4.1 "operators.sql", "order-by-all.sql", // "order-by-nulls-ordering.sql", "order-by-ordinal.sql", @@ -127,7 +127,7 @@ object VeloxSQLQueryTestSettings extends SQLQueryTestSettings { "subquery/exists-subquery/exists-cte.sql", "subquery/exists-subquery/exists-having.sql", "subquery/exists-subquery/exists-joins-and-set-ops.sql", - "subquery/exists-subquery/exists-orderby-limit.sql", + // TODO: fix on Spark-4.1 "subquery/exists-subquery/exists-orderby-limit.sql", "subquery/exists-subquery/exists-outside-filter.sql", "subquery/exists-subquery/exists-within-and-or.sql", "subquery/in-subquery/in-basic.sql", @@ -163,7 +163,7 @@ object VeloxSQLQueryTestSettings extends SQLQueryTestSettings { "postgreSQL/case.sql", "postgreSQL/comments.sql", "postgreSQL/create_view.sql", - "postgreSQL/date.sql", + // TODO: fix on Spark-4.1 "postgreSQL/date.sql", "postgreSQL/float4.sql", "postgreSQL/float8.sql", "postgreSQL/groupingsets.sql", @@ -237,8 +237,8 @@ object VeloxSQLQueryTestSettings extends SQLQueryTestSettings { "nonansi/double-quoted-identifiers.sql", "nonansi/higher-order-functions.sql", // "nonansi/interval.sql", - "nonansi/keywords.sql", - "nonansi/literals.sql", + // TODO: fix on Spark-4.1 "nonansi/keywords.sql", + // TODO: fix on Spark-4.1 "nonansi/literals.sql", "nonansi/map.sql", "nonansi/math.sql", "nonansi/parse-schema-string.sql", @@ -273,20 +273,20 @@ object VeloxSQLQueryTestSettings extends SQLQueryTestSettings { // Enable ConstantFolding rule for "typeof(...)". "cte.sql", // Removed some result mismatch cases. - "datetime-legacy.sql", + // TODO: fix on Spark-4.1 "datetime-legacy.sql", // Removed some result mismatch cases. "datetime-parsing.sql", // Removed some result mismatch cases. "datetime-parsing-legacy.sql", // Removed some result mismatch cases. - "datetime-parsing-invalid.sql", + // TODO: fix on Spark-4.1 "datetime-parsing-invalid.sql", // Overwrite exception message. See Spark-46550. "hll.sql", // Overwrite exception message. // TODO: Disable due to schema & ANSI gap // "interval.sql", // Enable ConstantFolding rule for "typeof(...)". - "misc-functions.sql", + // TODO: fix on Spark-4.1 "misc-functions.sql", // Removed some result mismatch cases. "regexp-functions.sql", // Removed some result mismatch cases. diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 24b84d58ae..540479a11f 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -75,6 +75,9 @@ class VeloxTestSettings extends BackendTestSettings { .excludeByPrefix("SPARK-41413: partitioned join:") .excludeByPrefix("SPARK-42038: partially clustered:") .exclude("SPARK-44641: duplicated records when SPJ is not triggered") + // TODO: fix on Spark-4.1 + .excludeByPrefix("SPARK-53322") // see https://github.com/apache/spark/pull/53132 + .excludeByPrefix("SPARK-54439") // see https://github.com/apache/spark/pull/53142 enableSuite[GlutenLocalScanSuite] enableSuite[GlutenMetadataColumnSuite] enableSuite[GlutenSupportsCatalogOptionsSuite] @@ -196,6 +199,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("random") .exclude("SPARK-9127 codegen with long seed") enableSuite[GlutenRegexpExpressionsSuite] + // TODO: fix on Spark-4.1 introduced by https://github.com/apache/spark/pull/48470 + .exclude("SPLIT") enableSuite[GlutenSortShuffleSuite] enableSuite[GlutenSortOrderExpressionsSuite] enableSuite[GlutenStringExpressionsSuite] @@ -471,6 +476,10 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings") // TODO: fix in Spark-4.0 .exclude("explode nested lists crossing a rowgroup boundary") + // TODO: fix on Spark-4.1 + .excludeByPrefix("SPARK-53535") // see https://issues.apache.org/jira/browse/SPARK-53535 + .excludeByPrefix("vectorized reader: missing all struct fields") + .excludeByPrefix("SPARK-54220") // https://issues.apache.org/jira/browse/SPARK-54220 enableSuite[GlutenParquetV1PartitionDiscoverySuite] enableSuite[GlutenParquetV2PartitionDiscoverySuite] enableSuite[GlutenParquetProtobufCompatibilitySuite] @@ -582,6 +591,8 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenOuterJoinSuiteForceShjOff] enableSuite[GlutenFallbackStrategiesSuite] enableSuite[GlutenBroadcastExchangeSuite] + // TODO: fix on Spark-4.1 introduced by see https://github.com/apache/spark/pull/51623 + .exclude("SPARK-52962: broadcast exchange should not reset metrics") enableSuite[GlutenLocalBroadcastExchangeSuite] enableSuite[GlutenCoalesceShufflePartitionsSuite] // Rewrite for Gluten. Change details are in the inline comments in individual tests. @@ -722,6 +733,8 @@ class VeloxTestSettings extends BackendTestSettings { // Result depends on the implementation for nondeterministic expression rand. // Not really an issue. .exclude("SPARK-10740: handle nondeterministic expressions correctly for set operations") + // TODO: fix on Spark-4.1 + .excludeByPrefix("SPARK-52921") // see https://github.com/apache/spark/pull/51623 enableSuite[GlutenDataFrameStatSuite] enableSuite[GlutenDataFrameSuite] // Rewrite these tests because it checks Spark's physical operators. @@ -755,6 +768,9 @@ class VeloxTestSettings extends BackendTestSettings { // rewrite `WindowExec -> WindowExecTransformer` .exclude( "SPARK-38237: require all cluster keys for child required distribution for window query") + // TODO: fix on Spark-4.1 introduced by https://github.com/apache/spark/pull/47856 + .exclude( + "SPARK-49386: Window spill with more than the inMemoryThreshold and spillSizeThreshold") enableSuite[GlutenDataFrameWindowFramesSuite] enableSuite[GlutenDataFrameWriterV2Suite] enableSuite[GlutenDatasetAggregatorSuite] @@ -823,6 +839,8 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenJoinSuite] // exclude as it check spark plan .exclude("SPARK-36794: Ignore duplicated key when building relation for semi/anti hash join") + // TODO: fix on Spark-4.1 introduced by https://github.com/apache/spark/pull/47856 + .exclude("SPARK-49386: test SortMergeJoin (with spill by size threshold)") enableSuite[GlutenMathFunctionsSuite] enableSuite[GlutenMetadataCacheSuite] .exclude("SPARK-16336,SPARK-27961 Suggest fixing FileNotFoundException") @@ -860,6 +878,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-38173: Quoted column cannot be recognized correctly when quotedRegexColumnNames is true") // Rewrite with Gluten's explained result. .exclude("SPARK-47939: Explain should work with parameterized queries") + // TODO: fix on Spark-4.1 based on https://github.com/apache/incubator-gluten/pull/11252 + .excludeGlutenTest("SPARK-47939: Explain should work with parameterized queries") enableSuite[GlutenSQLQueryTestSuite] enableSuite[GlutenStatisticsCollectionSuite] // The output byte size of Velox is different @@ -942,6 +962,9 @@ class VeloxTestSettings extends BackendTestSettings { .excludeByPrefix("SPARK-51187") // Rewrite for the query plan check .excludeByPrefix("SPARK-49905") + // TODO: fix on Spark-4.1 introduced by https://github.com/apache/spark/pull/52645 + .exclude("SPARK-53942: changing the number of stateless shuffle partitions via config") + .exclude("SPARK-53942: stateful shuffle partitions are retained from old checkpoint") enableSuite[GlutenQueryExecutionSuite] // Rewritten to set root logger level to INFO so that logs can be parsed .exclude("Logging plan changes for execution") @@ -949,7 +972,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("dumping query execution info to a file - explainMode=formatted") // TODO: fix in Spark-4.0 .exclude("SPARK-47289: extended explain info") - + // TODO: fix on Spark-4.1 introduced by https://github.com/apache/spark/pull/52157 + .exclude("SPARK-53413: Cleanup shuffle dependencies for commands") override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings } // scalastyle:on line.size.limit diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala index dc96c09bc2..d2222aa8a6 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec} import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper} +import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream, StreamingQueryWrapper} import org.apache.spark.sql.internal.SQLConf abstract class GlutenDynamicPartitionPruningSuiteBase @@ -155,7 +155,9 @@ abstract class GlutenDynamicPartitionPruningSuiteBase "SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + "canonicalization and exchange reuse") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.V2_BUCKETING_ENABLED.key -> "false") { val df = sql(""" WITH view1 as ( | SELECT f.store_id FROM fact_stats f WHERE f.units_sold = 70 | ) diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala index bda9c97eb5..d59c046937 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.GlutenSQLTestsTrait import org.apache.spark.sql.execution.exchange.REQUIRED_BY_STATEFUL_OPERATOR -import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.streaming._ class GlutenStreamingQuerySuite extends StreamingQuerySuite with GlutenSQLTestsTrait { diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala index 6cfa9f2028..63f0327e07 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/csv/GlutenCSVSuite.scala @@ -21,7 +21,7 @@ import org.apache.gluten.exception.GlutenException import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, Row} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.types.{DateType, IntegerType, StructType, TimestampType} import org.scalatest.exceptions.TestFailedException @@ -131,5 +131,5 @@ class GlutenCSVv2Suite extends GlutenCSVSuite { class GlutenCSVLegacyTimeParserSuite extends GlutenCSVSuite { override def sparkConf: SparkConf = super.sparkConf - .set(SQLConf.LEGACY_TIME_PARSER_POLICY, "legacy") + .set(SQLConf.LEGACY_TIME_PARSER_POLICY, LegacyBehaviorPolicy.LEGACY) } diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/json/GlutenJsonSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/json/GlutenJsonSuite.scala index 45e8e3e9e7..3fefc9bdd4 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/json/GlutenJsonSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/json/GlutenJsonSuite.scala @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.{sources, GlutenSQLTestsBaseTrait, Row} import org.apache.spark.sql.execution.datasources.{InMemoryFileIndex, NoopCache} import org.apache.spark.sql.execution.datasources.v2.json.JsonScanBuilder -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.types.{DateType, IntegerType, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -136,5 +136,5 @@ class GlutenJsonLegacyTimeParserSuite extends GlutenJsonSuite with GlutenSQLTest override def sparkConf: SparkConf = super.sparkConf - .set(SQLConf.LEGACY_TIME_PARSER_POLICY, "legacy") + .set(SQLConf.LEGACY_TIME_PARSER_POLICY, LegacyBehaviorPolicy.LEGACY) } diff --git a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala index 2ea814df27..53280a72cb 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala @@ -69,6 +69,7 @@ abstract class Suite( sessionSwitcher.addDefaultConf("spark.sql.broadcastTimeout", "1800") sessionSwitcher.addDefaultConf("spark.network.io.preferDirectBufs", "false") sessionSwitcher.addDefaultConf("spark.unsafe.exceptionOnMemoryLeak", s"$errorOnMemLeak") + sessionSwitcher.addDefaultConf("spark.sql.unionOutputPartitioning", "false") if (dataSource() == "delta") { sessionSwitcher.addDefaultConf( diff --git a/tools/gluten-it/pom.xml b/tools/gluten-it/pom.xml index 27be4a1b5b..77e4eab9f6 100644 --- a/tools/gluten-it/pom.xml +++ b/tools/gluten-it/pom.xml @@ -327,6 +327,16 @@ <delta.version>4.0.0</delta.version> </properties> </profile> + <profile> + <id>spark-4.1</id> + <properties> + <spark.version>4.1.0</spark.version> + <scala.version>2.13.17</scala.version> + <scala.binary.version>2.13</scala.binary.version> + <delta.package.name>delta-spark</delta.package.name> + <delta.version>4.0.0</delta.version> + </properties> + </profile> <profile> <id>celeborn-0.5</id> <properties> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
