This is an automated email from the ASF dual-hosted git repository. lwz9103 pushed a commit to branch liquid in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
commit 5d522ed380a4eafa0bdf05cd000d6cb9ffd32f19 Author: Wenzheng Liu <[email protected]> AuthorDate: Mon Apr 21 19:10:39 2025 +0800 Both support kyspark-3.3 and kyspark-3.5 (#49) (cherry picked from commit 44b34d1b1298ff6aba55ea5462a7d3ea6aaa65b6) --- .../sql/catalyst/expressions/KapExpressions.scala | 9 ++++- ep/build-clickhouse/src/package.sh | 43 +++++++++++++++------- gluten-ut/pom.xml | 4 +- .../utils/clickhouse/ClickHouseTestSettings.scala | 11 ++++++ .../sql/execution/joins/GlutenOuterJoinSuite.scala | 28 ++++++++++++-- pom.xml | 20 ++++++---- shims/pom.xml | 4 +- shims/spark33/pom.xml | 2 +- shims/spark35/pom.xml | 2 +- .../gluten/sql/shims/spark35/Spark35Shims.scala | 4 +- .../sql/execution/AbstractFileSourceScanExec.scala | 4 +- 11 files changed, 97 insertions(+), 34 deletions(-) diff --git a/backends-clickhouse/src/test/scala/org/apache/spark/sql/catalyst/expressions/KapExpressions.scala b/backends-clickhouse/src/test/scala/org/apache/spark/sql/catalyst/expressions/KapExpressions.scala index 497ebf0e1a..87c4eb2c91 100644 --- a/backends-clickhouse/src/test/scala/org/apache/spark/sql/catalyst/expressions/KapExpressions.scala +++ b/backends-clickhouse/src/test/scala/org/apache/spark/sql/catalyst/expressions/KapExpressions.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.util.{KapDateTimeUtils, TimeUtil, TypeUtils} +import org.apache.spark.sql.catalyst.util.{KapDateTimeUtils, TimeUtil} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -39,7 +39,12 @@ case class Sum0(child: Expression) extends DeclarativeAggregate with ImplicitCas override def inputTypes: Seq[AbstractDataType] = Seq(NumericType) override def checkInputDataTypes(): TypeCheckResult = - TypeUtils.checkForNumericExpr(child.dataType, "function sum") + if (child.dataType.isInstanceOf[NumericType] || child.dataType == NullType) { + TypeCheckResult.TypeCheckSuccess + } else { + TypeCheckResult.TypeCheckFailure( + s"Function sum0 requires numeric types, not ${child.dataType.catalogString}") + } private lazy val resultType = child.dataType match { case DecimalType.Fixed(precision, scale) => diff --git a/ep/build-clickhouse/src/package.sh b/ep/build-clickhouse/src/package.sh index 2ceab18dd9..93de5b9806 100755 --- a/ep/build-clickhouse/src/package.sh +++ b/ep/build-clickhouse/src/package.sh @@ -33,7 +33,7 @@ function detect_os_version() { } detect_os_version -DEFAULT_SPARK_PROFILE="kyspark" +DEFAULT_SPARK_PROFILE="kyspark-3.3" function get_project_version() { cd "${GLUTEN_SOURCE}" # use mvn command to get project version @@ -48,6 +48,8 @@ OS_ARCH=$(uname -m) PACKAGE_NAME=gluten-${BUILD_VERSION}-${OS_VERSION}-${OS_ARCH} PACKAGE_DIR_PATH="${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}" +spark_scala_versions=("3.3_2.12" "3.5_2.13") + # cleanup working directory [[ -d "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}" ]] && rm -rf "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}" [[ -d "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}".tar.gz ]] && rm -f "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}".tar.gz @@ -61,11 +63,15 @@ mkdir -p "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}" mkdir "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}"/bin mkdir "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}"/conf mkdir "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}"/jars -mkdir "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}"/jars/spark32 -mkdir "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}"/jars/spark33 mkdir "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}"/libs mkdir "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}"/logs +for ssv in "${spark_scala_versions[@]}" +do + spark_version=$(echo ${ssv%_*} | tr -d '.') + mkdir "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}"/jars/spark"$spark_version" +done + # create BUILD_INFO { echo "BUILD_VERSION=${BUILD_VERSION}" @@ -78,16 +84,28 @@ mkdir "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}"/logs cp "${GLUTEN_SOURCE}"/LICENSE "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}" cp "${GLUTEN_SOURCE}"/README.md "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}" -# build gluten with kyspark -mvn clean install -Pbackends-clickhouse -Pkyspark -Pceleborn -Piceberg -Pdelta -DskipTests -Dcheckstyle.skip -cp "${GLUTEN_SOURCE}"/backends-clickhouse/target/gluten-*-spark-3.3-jar-with-dependencies.jar "${PACKAGE_DIR_PATH}"/jars/spark33/gluten.jar -delta_version_33=$(mvn -q -Dexec.executable="echo" -Dexec.args='${delta.version}' -Pkyspark --non-recursive exec:exec) -wget https://repo1.maven.org/maven2/io/delta/delta-core_2.12/${delta_version_33}/delta-core_2.12-${delta_version_33}.jar -P "${PACKAGE_DIR_PATH}"/jars/spark33 -wget https://repo1.maven.org/maven2/io/delta/delta-storage/${delta_version_33}/delta-storage-${delta_version_33}.jar -P "${PACKAGE_DIR_PATH}"/jars/spark33 +function build_gluten_by_spark_version() { + spark_profile=$1 + scala_version=$2 + sv=$(echo "$spark_profile" | tr -d '.') + echo "build gluten with kyspark ${spark_profile}, scala ${scala_version}" + + mvn clean install -Pbackends-clickhouse -Pkyspark-"${spark_profile}" -Pscala-"${scala_version}" -Pceleborn -Piceberg -Pdelta -DskipTests -Dcheckstyle.skip + cp "${GLUTEN_SOURCE}"/backends-clickhouse/target/gluten-*-spark-"${spark_profile}"-jar-with-dependencies.jar "${PACKAGE_DIR_PATH}"/jars/spark"${sv}"/gluten.jar + delta_version=$(mvn -q -Dexec.executable="echo" -Dexec.args='${delta.version}' -Pkyspark-"${spark_profile}" --non-recursive exec:exec) + delta_package_name=$(mvn -q -Dexec.executable="echo" -Dexec.args='${delta.package.name}' -Pkyspark-"${spark_profile}" --non-recursive exec:exec) + wget https://repo1.maven.org/maven2/io/delta/"${delta_package_name}"_${scala_version}/"${delta_version}"/"${delta_package_name}"_${scala_version}-"${delta_version}".jar -P "${PACKAGE_DIR_PATH}"/jars/spark"${sv}" + wget https://repo1.maven.org/maven2/io/delta/delta-storage/"${delta_version}"/delta-storage-"${delta_version}".jar -P "${PACKAGE_DIR_PATH}"/jars/spark"${sv}" + celeborn_version=$(mvn -q -P${DEFAULT_SPARK_PROFILE} -Dexec.executable="echo" -Dexec.args='${celeborn.version}' --non-recursive exec:exec) + wget https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_${scala_version}/${celeborn_version}/celeborn-client-spark-3-shaded_${scala_version}-${celeborn_version}.jar -P "${PACKAGE_DIR_PATH}"/jars/spark"${sv}" +} -# download common 3rd party jars -celeborn_version=$(mvn -q -P${DEFAULT_SPARK_PROFILE} -Dexec.executable="echo" -Dexec.args='${celeborn.version}' --non-recursive exec:exec) -wget https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/${celeborn_version}/celeborn-client-spark-3-shaded_2.12-${celeborn_version}.jar -P "${PACKAGE_DIR_PATH}"/jars/spark33 +for ssv in "${spark_scala_versions[@]}" +do + spark_profile="${ssv%_*}" + scala_version="${ssv#*_}" + build_gluten_by_spark_version "$spark_profile" "$scala_version" +done # build libch.so bash "${GLUTEN_SOURCE}"/ep/build-clickhouse/src/build_clickhouse.sh @@ -103,4 +121,3 @@ tar -czf "${PACKAGE_NAME}".tar.gz "${PACKAGE_NAME}" echo "Build package successfully, package path:" echo "${GLUTEN_SOURCE}"/dist/"${PACKAGE_NAME}".tar.gz - diff --git a/gluten-ut/pom.xml b/gluten-ut/pom.xml index 41cfdd7d39..197b48ca36 100644 --- a/gluten-ut/pom.xml +++ b/gluten-ut/pom.xml @@ -201,7 +201,7 @@ </modules> </profile> <profile> - <id>kyspark</id> + <id>kyspark-3.3</id> <modules> <module>spark33</module> </modules> @@ -213,7 +213,7 @@ </modules> </profile> <profile> - <id>spark-3.5</id> + <id>kyspark-3.5</id> <modules> <module>spark35</module> </modules> diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index ff01c5452d..af59499568 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -522,6 +522,9 @@ class ClickHouseTestSettings extends BackendTestSettings { "session window groupBy with multiple keys statement - keys overlapped with sessions") .excludeCH("SPARK-36465: filter out events with negative/zero gap duration") .excludeCH("SPARK-36724: Support timestamp_ntz as a type of time column for SessionWindow") + // GLUTEN-9351 sum() not support with fallback operator + .excludeCH( + "SPARK-49836 using window fn with window as parameter should preserve parent operator") enableSuite[GlutenDataFrameSetOperationsSuite] .exclude("SPARK-37371: UnionExec should support columnar if all children support columnar") // Result depends on the implementation for nondeterministic expression rand. @@ -1677,6 +1680,9 @@ class ClickHouseTestSettings extends BackendTestSettings { .excludeCH("full outer join with unique keys using SortMergeJoin (whole-stage-codegen off)") .excludeCH("full outer join with unique keys using SortMergeJoin (whole-stage-codegen on)") .excludeCH("SPARK-32717: AQEOptimizer should respect excludedRules configuration") + // GLUTEN-9343 Incorrect result when using df.queryExecution.executeCollect directly + .excludeCH("SPARK-46037: ShuffledHashJoin build left with left outer join, codegen off (whole-stage-codegen off)") + .excludeCH("SPARK-46037: ShuffledHashJoin build left with left outer join, codegen off (whole-stage-codegen on)") enableSuite[GlutenOuterJoinSuiteForceShjOn] .excludeCH("basic left outer join using ShuffledHashJoin (whole-stage-codegen off)") .excludeCH("basic left outer join using ShuffledHashJoin (whole-stage-codegen on)") @@ -1702,6 +1708,9 @@ class ClickHouseTestSettings extends BackendTestSettings { .excludeCH("full outer join with unique keys using ShuffledHashJoin (whole-stage-codegen on)") .excludeCH("full outer join with unique keys using SortMergeJoin (whole-stage-codegen off)") .excludeCH("full outer join with unique keys using SortMergeJoin (whole-stage-codegen on)") + // GLUTEN-9343 Incorrect result when using df.queryExecution.executeCollect directly + .excludeCH("SPARK-46037: ShuffledHashJoin build left with left outer join, codegen off (whole-stage-codegen off)") + .excludeCH("SPARK-46037: ShuffledHashJoin build left with left outer join, codegen off (whole-stage-codegen on)") enableSuite[GlutenParametersSuite] enableSuite[GlutenParquetCodecSuite] // codec not supported in native @@ -1882,6 +1891,8 @@ class ClickHouseTestSettings extends BackendTestSettings { // Rewrite because the filter after datasource is not needed. .exclude( "SPARK-26677: negated null-safe equality comparison should not filter matched row groups") + // google guava cache conflict with sparkproject guava + .exclude("SPARK-33449: simple select queries with file meta cache") enableSuite[GlutenParquetV2SchemaPruningSuite] .excludeCH("Spark vectorized reader - without partition data column - select a single complex field and in where clause") .excludeCH("Spark vectorized reader - with partition data column - select a single complex field and in where clause") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/joins/GlutenOuterJoinSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/joins/GlutenOuterJoinSuite.scala index 04a555cfce..3438307297 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/joins/GlutenOuterJoinSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/joins/GlutenOuterJoinSuite.scala @@ -19,16 +19,38 @@ package org.apache.spark.sql.execution.joins import org.apache.gluten.config.GlutenConfig import org.apache.spark.SparkConf -import org.apache.spark.sql.GlutenSQLTestsBaseTrait +import org.apache.spark.sql.{DataFrame, GlutenSQLTestsBaseTrait, GlutenTestUtils} -class GlutenOuterJoinSuiteForceShjOn extends OuterJoinSuite with GlutenSQLTestsBaseTrait { +class GlutenOuterJoinSuiteBase extends OuterJoinSuite with GlutenSQLTestsBaseTrait { + testWithWholeStageCodegenOnAndOff( + "GLUTEN-9343: SPARK-46037: ShuffledHashJoin build left with left outer join, codegen off") { + _ => + def join(hint: String): DataFrame = { + sql(s""" + |SELECT /*+ $hint */ * + |FROM testData t1 + |LEFT OUTER JOIN + |testData2 t2 + |ON key = a AND concat(value, b) = '12' + |""".stripMargin) + } + + val df1 = join("SHUFFLE_HASH(t1)") + val df2 = join("SHUFFLE_MERGE(t1)") + GlutenTestUtils.compareAnswers(df1.collect().toSeq, df2.collect().toSeq, sort = true) + } +} + +class GlutenOuterJoinSuiteForceShjOn extends GlutenOuterJoinSuiteBase with GlutenSQLTestsBaseTrait { override def sparkConf: SparkConf = { super.sparkConf .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "true") } } -class GlutenOuterJoinSuiteForceShjOff extends OuterJoinSuite with GlutenSQLTestsBaseTrait { +class GlutenOuterJoinSuiteForceShjOff + extends GlutenOuterJoinSuiteBase + with GlutenSQLTestsBaseTrait { override def sparkConf: SparkConf = { super.sparkConf .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") diff --git a/pom.xml b/pom.xml index 44cf84aed0..1d631efdc8 100644 --- a/pom.xml +++ b/pom.xml @@ -70,8 +70,8 @@ <scala.version>2.12.15</scala.version> <spark.major.version>3</spark.major.version> <sparkbundle.version>3.3</sparkbundle.version> - <spark.version>3.3.0-kylin-5.2.1.0-SNAPSHOT</spark.version> - <sparkshim.artifactId>spark-sql-columnar-shims-kyspark</sparkshim.artifactId> + <spark.version>3.3.0-kylin-5.2.1.0-SNAPSHOT</spark.version> + <sparkshim.artifactId>spark-sql-columnar-shims-kyspark33</sparkshim.artifactId> <iceberg.version>1.5.0</iceberg.version> <delta.package.name>delta-core</delta.package.name> <delta.version>2.4.0</delta.version> @@ -316,10 +316,10 @@ </properties> </profile> <profile> - <id>kyspark</id> + <id>kyspark-3.3</id> <properties> - <sparkbundle.version>3.3</sparkbundle.version> - <sparkshim.artifactId>spark-sql-columnar-shims-kyspark</sparkshim.artifactId> + <sparkbundle.version>3.3</sparkbundle.version> + <sparkshim.artifactId>spark-sql-columnar-shims-kyspark33</sparkshim.artifactId> <spark.version>3.3.0-kylin-5.2.1.0-SNAPSHOT</spark.version> <iceberg.version>1.5.0</iceberg.version> <delta.package.name>delta-core</delta.package.name> @@ -344,11 +344,11 @@ </properties> </profile> <profile> - <id>spark-3.5</id> + <id>kyspark-3.5</id> <properties> <sparkbundle.version>3.5</sparkbundle.version> - <sparkshim.artifactId>spark-sql-columnar-shims-spark35</sparkshim.artifactId> - <spark.version>3.5.2</spark.version> + <sparkshim.artifactId>spark-sql-columnar-shims-kyspark35</sparkshim.artifactId> + <spark.version>3.5.5-kylin-5.2.3-SNAPSHOT</spark.version> <iceberg.version>1.5.0</iceberg.version> <delta.package.name>delta-spark</delta.package.name> <delta.version>3.2.0</delta.version> @@ -1083,6 +1083,10 @@ <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> </exclusion> + <exclusion> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </exclusion> <exclusion> <groupId>org.apache.tomcat</groupId> <artifactId>tomcat-annotation-api</artifactId> diff --git a/shims/pom.xml b/shims/pom.xml index 5bf788741e..908bb2be4e 100644 --- a/shims/pom.xml +++ b/shims/pom.xml @@ -61,7 +61,7 @@ </modules> </profile> <profile> - <id>kyspark</id> + <id>kyspark-3.3</id> <modules> <module>spark33</module> </modules> @@ -73,7 +73,7 @@ </modules> </profile> <profile> - <id>spark-3.5</id> + <id>kyspark-3.5</id> <modules> <module>spark35</module> </modules> diff --git a/shims/spark33/pom.xml b/shims/spark33/pom.xml index 35136dcecf..40e2d08b8e 100644 --- a/shims/spark33/pom.xml +++ b/shims/spark33/pom.xml @@ -24,7 +24,7 @@ <relativePath>../pom.xml</relativePath> </parent> - <artifactId>spark-sql-columnar-shims-kyspark</artifactId> + <artifactId>spark-sql-columnar-shims-kyspark33</artifactId> <name>Gluten Shims for Spark 3.3</name> <packaging>jar</packaging> diff --git a/shims/spark35/pom.xml b/shims/spark35/pom.xml index 4b0506a182..e915fe1b21 100644 --- a/shims/spark35/pom.xml +++ b/shims/spark35/pom.xml @@ -24,7 +24,7 @@ <relativePath>../pom.xml</relativePath> </parent> - <artifactId>spark-sql-columnar-shims-spark35</artifactId> + <artifactId>spark-sql-columnar-shims-kyspark35</artifactId> <name>Gluten Shims for Spark 3.5</name> <packaging>jar</packaging> diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index 543bef6c55..9c87974a14 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -163,7 +163,8 @@ class Spark35Shims extends SparkShims { override def filesGroupedToBuckets( selectedPartitions: Array[PartitionDirectory]): Map[Int, Array[PartitionedFile]] = { selectedPartitions - .flatMap(p => p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, p.values))) + .flatMap( + p => p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values))) .groupBy { f => BucketingUtils @@ -432,6 +433,7 @@ class Spark35Shims extends SparkShims { PartitionedFileUtil.splitFiles( sparkSession, FileStatusWithMetadata(file, metadata), + filePath, isSplitable, maxSplitBytes, partitionValues) diff --git a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala index a83c763c45..6cca913dd7 100644 --- a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala +++ b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala @@ -181,7 +181,8 @@ abstract class AbstractFileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions - .flatMap(p => p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, p.values))) + .flatMap( + p => p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values))) .groupBy { f => BucketingUtils @@ -272,6 +273,7 @@ abstract class AbstractFileSourceScanExec( PartitionedFileUtil.splitFiles( sparkSession = relation.sparkSession, file = file, + filePath = file.getPath, isSplitable = isSplitable, maxSplitBytes = maxSplitBytes, partitionValues = partition.values --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
