This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 373b5661 [AURON #1419]Add neverConvertReasonTag to record the reason
for non-conversion. (#1420)
373b5661 is described below
commit 373b5661d87a230e4b114a02a86e88f24c9de57d
Author: guixiaowen <[email protected]>
AuthorDate: Thu Oct 9 11:31:59 2025 +0800
[AURON #1419]Add neverConvertReasonTag to record the reason for
non-conversion. (#1420)
Co-authored-by: guihuawen <[email protected]>
---
.../spark/sql/auron/AuronConvertStrategy.scala | 42 +++++++++--
.../apache/spark/sql/auron/AuronConverters.scala | 85 +++++++++++++++++++---
2 files changed, 111 insertions(+), 16 deletions(-)
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertStrategy.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertStrategy.scala
index a79ff2c4..0e4dca72 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertStrategy.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertStrategy.scala
@@ -53,6 +53,7 @@ object AuronConvertStrategy extends Logging {
val convertibleTag: TreeNodeTag[Boolean] = TreeNodeTag("auron.convertible")
val convertToNonNativeTag: TreeNodeTag[Boolean] =
TreeNodeTag("auron.convertToNonNative")
val convertStrategyTag: TreeNodeTag[ConvertStrategy] =
TreeNodeTag("auron.convert.strategy")
+ val neverConvertReasonTag: TreeNodeTag[String] =
TreeNodeTag("auron.never.convert.reason")
val childOrderingRequiredTag: TreeNodeTag[Boolean] = TreeNodeTag(
"auron.child.ordering.required")
val joinSmallerSideTag: TreeNodeTag[BuildSide] =
TreeNodeTag("auron.join.smallerSide")
@@ -79,6 +80,9 @@ object AuronConvertStrategy extends Logging {
case _ =>
exec.setTagValue(convertibleTag, false)
exec.setTagValue(convertStrategyTag, NeverConvert)
+ exec.setTagValue(
+ neverConvertReasonTag,
+ s"${exec.getClass.getSimpleName} is not supported yet.")
}
danglingChildren = newDangling :+ converted
}
@@ -190,6 +194,9 @@ object AuronConvertStrategy extends Logging {
case e =>
// not marked -- default to NeverConvert
e.setTagValue(convertStrategyTag, NeverConvert)
+ e.setTagValue(
+ neverConvertReasonTag,
+ s"${exec.getClass.getSimpleName} not marked, default to
NeverConvert.")
}
}
@@ -206,9 +213,10 @@ object AuronConvertStrategy extends Logging {
while (!finished) {
finished = true
- val dontConvertIf = (exec: SparkPlan, condition: Boolean) => {
+ val dontConvertIf = (exec: SparkPlan, condition: Boolean,
neverConvertReason: String) => {
if (condition) {
exec.setTagValue(convertStrategyTag, NeverConvert)
+ exec.setTagValue(neverConvertReasonTag, neverConvertReason)
finished = false
}
}
@@ -218,28 +226,41 @@ object AuronConvertStrategy extends Logging {
// don't use NativeFilter because it requires ConvertToNative with a
lot of records
if (!isNeverConvert(e) && e.isInstanceOf[FilterExec]) {
val child = e.children.head
- dontConvertIf(e, isNeverConvert(child))
+ dontConvertIf(
+ e,
+ isNeverConvert(child),
+ s"${e.getClass.getSimpleName}, children is not native.")
}
// NonNative -> NativeAgg
// don't use NativeAgg because it requires ConvertToNative with a lot
of records
if (!isNeverConvert(e) && isAggregate(e)) {
val child = e.children.head
- dontConvertIf(e, isNeverConvert(child))
+ dontConvertIf(
+ e,
+ isNeverConvert(child),
+ s"${e.getClass.getSimpleName}, children is not native.")
}
// Agg -> NativeShuffle
// don't use NativeShuffle because the next stage is like to use
non-native shuffle reader
if (!isNeverConvert(e) && e.isInstanceOf[ShuffleExchangeLike]) {
val child = e.children.head
- dontConvertIf(e, isAggregate(child) && isNeverConvert(child))
+ dontConvertIf(
+ e,
+ isAggregate(child) && isNeverConvert(child),
+ s"${e.getClass.getSimpleName}, children is not native and children
is agg.")
}
// NativeExpand -> NonNative
// don't use NativeExpand because it requires C2R with a lot of records
if (isNeverConvert(e)) {
e.children.find(_.isInstanceOf[ExpandExec]) match {
- case Some(expand) => dontConvertIf(expand, !isNeverConvert(expand))
+ case Some(expand) =>
+ dontConvertIf(
+ expand,
+ !isNeverConvert(expand),
+ s"${e.getClass.getSimpleName}, children is nativeExpand.")
case _ =>
}
}
@@ -248,7 +269,11 @@ object AuronConvertStrategy extends Logging {
// don't use NativeParquetScan because it requires C2R with a lot of
records
if (isNeverConvert(e)) {
e.children.find(_.isInstanceOf[FileSourceScanExec]) match {
- case Some(scan) => dontConvertIf(scan, !isNeverConvert(scan))
+ case Some(scan) =>
+ dontConvertIf(
+ scan,
+ !isNeverConvert(scan),
+ s"${e.getClass.getSimpleName}, children is nativeParquetScan.")
case _ =>
}
}
@@ -257,7 +282,10 @@ object AuronConvertStrategy extends Logging {
// don't use native sort
if (isNeverConvert(e)) {
e.children.filter(_.isInstanceOf[SortExec]).foreach { sort =>
- dontConvertIf(sort, !isNeverConvert(sort) &&
isNeverConvert(sort.children.head))
+ dontConvertIf(
+ sort,
+ !isNeverConvert(sort) && isNeverConvert(sort.children.head),
+ s"${e.getClass.getSimpleName}, children and parent both are not
native.")
}
}
}
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
index a090b3ef..8f075ac4 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
@@ -27,12 +27,7 @@ import
org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
import org.apache.spark.Partition
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.auron.AuronConvertStrategy.childOrderingRequiredTag
-import org.apache.spark.sql.auron.AuronConvertStrategy.convertibleTag
-import org.apache.spark.sql.auron.AuronConvertStrategy.convertStrategyTag
-import org.apache.spark.sql.auron.AuronConvertStrategy.convertToNonNativeTag
-import org.apache.spark.sql.auron.AuronConvertStrategy.isNeverConvert
-import org.apache.spark.sql.auron.AuronConvertStrategy.joinSmallerSideTag
+import
org.apache.spark.sql.auron.AuronConvertStrategy.{childOrderingRequiredTag,
convertibleTag, convertStrategyTag, convertToNonNativeTag, isNeverConvert,
joinSmallerSideTag, neverConvertReasonTag}
import org.apache.spark.sql.auron.NativeConverters.{roundRobinTypeSupported,
scalarTypeSupported, StubExpr}
import org.apache.spark.sql.auron.util.AuronLogUtils.logDebugPlanConversion
import org.apache.spark.sql.catalyst.expressions.AggregateWindowFunction
@@ -265,16 +260,70 @@ object AuronConverters extends Logging {
if (Shims.get.isNative(exec)) { // for QueryStageInput and
CustomShuffleReader
exec.setTagValue(convertibleTag, true)
exec.setTagValue(convertStrategyTag, AlwaysConvert)
+ exec
} else {
- exec.setTagValue(convertibleTag, false)
- exec.setTagValue(convertStrategyTag, NeverConvert)
+ addNeverConvertReasonTag(exec)
}
- exec
}
}
}
}
+ private def addNeverConvertReasonTag(exec: SparkPlan) = {
+ val neverConvertReason =
+ exec match {
+ case _: FileSourceScanExec if !enableScan =>
+ "Conversion disabled: spark.auron.enable.scan=false."
+ case _: ProjectExec if !enableProject =>
+ "Conversion disabled: spark.auron.enable.project=false."
+ case _: FilterExec if !enableFilter =>
+ "Conversion disabled: spark.auron.enable.filter=false."
+ case _: SortExec if !enableSort =>
+ "Conversion disabled: spark.auron.enable.sort=false."
+ case _: UnionExec if !enableUnion =>
+ "Conversion disabled: spark.auron.enable.union=false."
+ case _: SortMergeJoinExec if !enableSmj =>
+ "Conversion disabled: spark.auron.enable.smj=false."
+ case _: ShuffledHashJoinExec if !enableShj =>
+ "Conversion disabled: spark.auron.enable.shj=false."
+ case _: BroadcastHashJoinExec if !enableBhj =>
+ "Conversion disabled: spark.auron.enable.bhj=false."
+ case _: BroadcastNestedLoopJoinExec if !enableBnlj =>
+ "Conversion disabled: spark.auron.enable.bnlj=false."
+ case _: LocalLimitExec if !enableLocalLimit =>
+ "Conversion disabled: spark.auron.enable.local.limit=false."
+ case _: GlobalLimitExec if !enableGlobalLimit =>
+ "Conversion disabled: spark.auron.enable.global.limit=false."
+ case _: TakeOrderedAndProjectExec if !enableTakeOrderedAndProject =>
+ "Conversion disabled:
spark.auron.enable.take.ordered.and.project=false."
+ case _: HashAggregateExec if !enableAggr =>
+ "Conversion disabled: spark.auron.enable.aggr=false."
+ case _: ObjectHashAggregateExec if !enableAggr =>
+ "Conversion disabled: spark.auron.enable.aggr=false."
+ case _: SortAggregateExec if !enableAggr =>
+ "Conversion disabled: spark.auron.enable.aggr=false."
+ case _: ExpandExec if !enableExpand =>
+ "Conversion disabled: spark.auron.enable.expand=false."
+ case _: WindowExec if !enableWindow =>
+ "Conversion disabled: spark.auron.enable.window=false."
+ case _: UnaryExecNode
+ if exec.getClass.getSimpleName == "WindowGroupLimitExec" &&
!enableWindowGroupLimit =>
+ "Conversion disabled: spark.auron.enable.window.group.limit=false."
+ case _: GenerateExec if !enableGenerate =>
+ "Conversion disabled: spark.auron.enable.generate=false."
+ case _: LocalTableScanExec if !enableLocalTableScan =>
+ "Conversion disabled: spark.auron.enable.local.table.scan=false."
+ case _: DataWritingCommandExec if !enableDataWriting =>
+ "Conversion disabled: spark.auron.enable.data.writing=false."
+ case _ =>
+ s"${exec.getClass.getSimpleName} is not supported yet."
+ }
+ exec.setTagValue(convertibleTag, false)
+ exec.setTagValue(convertStrategyTag, NeverConvert)
+ exec.setTagValue(neverConvertReasonTag, neverConvertReason)
+ exec
+ }
+
def tryConvert[T <: SparkPlan](exec: T, convert: T => SparkPlan): SparkPlan
= {
try {
exec.setTagValue(convertibleTag, true)
@@ -283,8 +332,26 @@ object AuronConverters extends Logging {
} catch {
case e @ (_: NotImplementedError | _: AssertionError | _: Exception) =>
logWarning(s"Falling back exec: ${exec.getClass.getSimpleName}:
${e.getMessage}")
+ val neverConvertReason = e match {
+ case _: AssertionError =>
+ exec match {
+ case _: FileSourceScanExec if enableScan =>
+ if (!enableScanParquet) {
+ "Conversion disabled: spark.auron.enable.scan.parquet=false."
+ } else if (!enableScanOrc) {
+ "Conversion disabled: spark.auron.enable.scan.orc=false."
+ } else {
+ s"Falling back exec: ${exec.getClass.getSimpleName}:
${e.getMessage}"
+ }
+ case _ =>
+ s"Falling back exec: ${exec.getClass.getSimpleName}:
${e.getMessage}"
+ }
+ case _ =>
+ s"Falling back exec: ${exec.getClass.getSimpleName}:
${e.getMessage}"
+ }
exec.setTagValue(convertibleTag, false)
exec.setTagValue(convertStrategyTag, NeverConvert)
+ exec.setTagValue(neverConvertReasonTag, neverConvertReason)
exec
}
}