This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 2daa1b25 [AURON #985] Expect to convert DataWritingCommandExec to
NativeParquetSinkExec (#1274)
2daa1b25 is described below
commit 2daa1b258668f0107afc6b1714d55939e62875cb
Author: Fei Wang <[email protected]>
AuthorDate: Sat Sep 20 05:24:20 2025 -0700
[AURON #985] Expect to convert DataWritingCommandExec to
NativeParquetSinkExec (#1274)
Co-authored-by: Harvey Yue <[email protected]>
---
.../main/scala/org/apache/spark/sql/auron/ShimsImpl.scala | 13 ++++++++++---
.../org/apache/spark/sql/auron/AuronCallNativeWrapper.scala | 2 +-
.../org/apache/spark/sql/auron/AuronConvertStrategy.scala | 6 +++++-
.../src/main/scala/org/apache/spark/sql/auron/Shims.scala | 3 +++
4 files changed, 19 insertions(+), 5 deletions(-)
diff --git
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
index ee47643c..b28ff631 100644
---
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
+++
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
@@ -60,9 +60,7 @@ import org.apache.spark.sql.execution.ShuffledRowRDD
import org.apache.spark.sql.execution.ShufflePartitionSpec
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.UnaryExecNode
-import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
-import org.apache.spark.sql.execution.adaptive.QueryStageExec
-import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
BroadcastQueryStageExec, QueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.auron.plan._
import org.apache.spark.sql.execution.auron.plan.ConvertToNativeExec
import org.apache.spark.sql.execution.auron.plan.NativeAggBase
@@ -974,6 +972,15 @@ class ShimsImpl extends Shims with Logging {
isPruningExpr: Boolean,
fallback: Expression => pb.PhysicalExprNode):
Option[pb.PhysicalExprNode] = None
+ @sparkver("3.0")
+ override def getAdaptiveInputPlan(exec: AdaptiveSparkPlanExec): SparkPlan = {
+ exec.initialPlan
+ }
+
+ @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
+ override def getAdaptiveInputPlan(exec: AdaptiveSparkPlanExec): SparkPlan = {
+ exec.inputPlan
+ }
}
case class ForceNativeExecutionWrapper(override val child: SparkPlan)
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
index 870de90c..c827db32 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
@@ -65,7 +65,7 @@ case class AuronCallNativeWrapper(
private val batchRows: ArrayBuffer[InternalRow] = ArrayBuffer()
private var batchCurRowIdx = 0
- logInfo(s"Start executing native plan")
+ logInfo(s"Start executing native plan ${nativePlan.getPhysicalPlanTypeCase}")
private var nativeRuntimePtr =
JniBridge.callNative(NativeHelper.nativeMemory,
AuronConf.NATIVE_LOG_LEVEL.stringConf(), this)
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertStrategy.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertStrategy.scala
index 2fb55c0d..a79ff2c4 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertStrategy.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertStrategy.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.TakeOrderedAndProjectExec
import org.apache.spark.sql.execution.UnaryExecNode
import org.apache.spark.sql.execution.UnionExec
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
import org.apache.spark.sql.execution.aggregate.SortAggregateExec
@@ -174,7 +175,10 @@ object AuronConvertStrategy extends Logging {
e.setTagValue(convertStrategyTag, AlwaysConvert)
case e: LocalTableScanExec =>
e.setTagValue(convertStrategyTag, AlwaysConvert)
- case e: DataWritingCommandExec if isNative(e.child) =>
+ case e: DataWritingCommandExec
+ if isNative(e.child) ||
+ (e.child.isInstanceOf[AdaptiveSparkPlanExec] && isNative(
+
Shims.get.getAdaptiveInputPlan(e.child.asInstanceOf[AdaptiveSparkPlanExec]))) =>
e.setTagValue(convertStrategyTag, AlwaysConvert)
case e if e.getTagValue(convertToNonNativeTag).contains(true) =>
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
index 9ced597b..fbac6a92 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
@@ -41,6 +41,7 @@ import
org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.auron.plan._
import org.apache.spark.sql.execution.auron.plan.NativeBroadcastJoinBase
import org.apache.spark.sql.execution.auron.plan.NativeSortMergeJoinBase
@@ -255,6 +256,8 @@ abstract class Shims {
def getMinPartitionNum(sparkSession: SparkSession): Int
def postTransform(plan: SparkPlan, sc: SparkContext): Unit = {}
+
+ def getAdaptiveInputPlan(exec: AdaptiveSparkPlanExec): SparkPlan
}
object Shims {