This is an automated email from the ASF dual-hosted git repository. richox pushed a commit to branch dev-v6.0.0-decimal-cast in repository https://gitbox.apache.org/repos/asf/auron.git
commit c44088557602444e766a05c918780f7ff3b43dfe Author: zhangli20 <[email protected]> AuthorDate: Thu Dec 11 20:41:28 2025 +0800 introduce BlazeRuleEngine from spark241kwaiae to spark3 --- pom.xml | 2 +- .../org/apache/spark/sql/blaze/ShimsImpl.scala | 81 ++++++++++++++++++++-- 2 files changed, 78 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index 0301c25f..1a186ce6 100644 --- a/pom.xml +++ b/pom.xml @@ -416,7 +416,7 @@ <id>spark-3.5</id> <properties> <shimName>spark-3.5</shimName> - <pkgSuffix>-kwai-adapt-cele060-fix-non-partition-len</pkgSuffix> + <pkgSuffix>-kwai-hotfix-rule-engine</pkgSuffix> <shimPkg>spark-extension-shims-spark3</shimPkg> <javaVersion>1.8</javaVersion> <scalaVersion>2.12</scalaVersion> diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala index 44ce358e..8b74e735 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.blaze import java.io.File import java.util.UUID - import org.apache.commons.lang3.reflect.FieldUtils import org.apache.spark.OneToOneDependency import org.apache.spark.ShuffleDependency @@ -25,6 +24,7 @@ import org.apache.spark.SparkEnv import org.apache.spark.SparkException import org.apache.spark.TaskContext import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.OptionalConfigEntry import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.IndexShuffleBlockResolver @@ -37,7 +37,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.blaze.BlazeConverters.ForceNativeExecutionWrapperBase import org.apache.spark.sql.blaze.NativeConverters.NativeExprWrapperBase import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.Generator @@ -101,7 +101,7 @@ import org.apache.spark.sql.execution.blaze.plan._ import org.apache.spark.sql.execution.blaze.shuffle.RssPartitionWriterBase import org.apache.spark.sql.execution.blaze.shuffle.celeborn.BlazeCelebornShuffleManager import org.apache.spark.sql.execution.blaze.shuffle.BlazeBlockStoreShuffleReaderBase -import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitionedFile} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec} import org.apache.spark.sql.execution.joins.blaze.plan.NativeBroadcastJoinExec import org.apache.spark.sql.execution.joins.blaze.plan.NativeShuffledHashJoinExecProvider @@ -112,12 +112,17 @@ import org.apache.spark.sql.types.DataType import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.storage.BlockManagerId import org.apache.spark.storage.FileSegment import org.apache.spark.sql.execution.blaze.shuffle.uniffle.BlazeUniffleShuffleManager +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.storage.ShuffleDataBlockId import org.apache.spark.util.ExternalBlockStoreUtils -import org.blaze.{protobuf => pb, sparkver} +import org.blaze.{sparkver, protobuf => pb} class ShimsImpl extends Shims with Logging { @@ -156,6 +161,10 @@ class ShimsImpl extends Shims with Logging { logWarning(s"${BlazeConf.FORCE_SHUFFLED_HASH_JOIN.key} is not supported in $shimVersion") } } + + extension.injectOptimizerRule(sparkSession => { + BlazeRuleEngine(sparkSession) + }) } override def createConvertToNativeExec(child: SparkPlan): ConvertToNativeBase = @@ -1046,3 +1055,67 @@ case class NativeExprWrapper( @sparkver("3.2 / 3.3 / 3.4 / 3.5") override def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = copy() } + +case class BlazeRuleEngine(sparkSession: SparkSession) extends Rule[LogicalPlan] with Logging { + import BlazeSparkSessionExtension._ + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.foreachUp { + case p @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) => + // non parquet table rule + if (!( + BlazeConverters.enableScanParquet && fsRelation.fileFormat.isInstanceOf[ParquetFileFormat] + || BlazeConverters.enableScanOrc && fsRelation.fileFormat.isInstanceOf[OrcFileFormat] + )) { + turnOffBlazeWithReason(p.conf, BlazeMissPatterns.NonParquetFormat) + } + + // read encrypted table rule + val readEncryptedTableEnable = sparkSession.sparkContext.conf + .getBoolean("spark.hive.exist.read.encrypted.table", defaultValue = false) + if (readEncryptedTableEnable) { + turnOffBlazeWithReason(p.conf, BlazeMissPatterns.ReadEncryptedTable) + } + + // skip scan dp_dd.*** table because parquet statics don't get min/max info + if (p.catalogTable.map(_.identifier.unquotedString).getOrElse("").contains("dp_dd")) { + turnOffBlazeWithReason(p.conf, BlazeMissPatterns.ReadHbaseTable) + } + + // skip scan offline_attribution_mover_v2 + // issue: https://team.corp.kuaishou.com/task/T6538018 + if (p.catalogTable + .map(_.identifier.unquotedString) + .getOrElse("") + .contains("offline_attribution_mover_v2")) { + turnOffBlazeWithReason(p.conf, BlazeMissPatterns.ReadBlacklistedTable) + } + + case h: HiveTableRelation => + turnOffBlazeWithReason(h.conf, BlazeMissPatterns.NonParquetFormat) + + case _ => + } + plan + } + + private def turnOffBlazeWithReason(planConf: SQLConf, blazeMissPattern: String): Unit = { + planConf.setConf(blazeEnabledKey, false) + sparkSession.sparkContext.conf + .set(BlazeRuleEngine.blazeMissPatterns, blazeMissPattern) + } + + object BlazeMissPatterns extends Enumeration { + val NonParquetFormat = "NonParquetFormat" + val ReadEncryptedTable = "ReadEncryptedTable" + val ReadHbaseTable = "ReadHbaseTable" + val ReadBlacklistedTable = "ReadBlacklistedTable" + } +} + +object BlazeRuleEngine { + lazy val blazeMissPatterns: OptionalConfigEntry[String] = SQLConf + .buildConf("spark.blaze.blazeMissPatterns") + .stringConf + .createOptional +}
