This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 2daa1b25 [AURON #985] Expect to convert DataWritingCommandExec to 
NativeParquetSinkExec (#1274)
2daa1b25 is described below

commit 2daa1b258668f0107afc6b1714d55939e62875cb
Author: Fei Wang <[email protected]>
AuthorDate: Sat Sep 20 05:24:20 2025 -0700

    [AURON #985] Expect to convert DataWritingCommandExec to 
NativeParquetSinkExec (#1274)
    
    Co-authored-by: Harvey Yue <[email protected]>
---
 .../main/scala/org/apache/spark/sql/auron/ShimsImpl.scala   | 13 ++++++++++---
 .../org/apache/spark/sql/auron/AuronCallNativeWrapper.scala |  2 +-
 .../org/apache/spark/sql/auron/AuronConvertStrategy.scala   |  6 +++++-
 .../src/main/scala/org/apache/spark/sql/auron/Shims.scala   |  3 +++
 4 files changed, 19 insertions(+), 5 deletions(-)

diff --git 
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
 
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
index ee47643c..b28ff631 100644
--- 
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
+++ 
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
@@ -60,9 +60,7 @@ import org.apache.spark.sql.execution.ShuffledRowRDD
 import org.apache.spark.sql.execution.ShufflePartitionSpec
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.UnaryExecNode
-import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
-import org.apache.spark.sql.execution.adaptive.QueryStageExec
-import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
BroadcastQueryStageExec, QueryStageExec, ShuffleQueryStageExec}
 import org.apache.spark.sql.execution.auron.plan._
 import org.apache.spark.sql.execution.auron.plan.ConvertToNativeExec
 import org.apache.spark.sql.execution.auron.plan.NativeAggBase
@@ -974,6 +972,15 @@ class ShimsImpl extends Shims with Logging {
       isPruningExpr: Boolean,
       fallback: Expression => pb.PhysicalExprNode): 
Option[pb.PhysicalExprNode] = None
 
+  @sparkver("3.0")
+  override def getAdaptiveInputPlan(exec: AdaptiveSparkPlanExec): SparkPlan = {
+    exec.initialPlan
+  }
+
+  @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
+  override def getAdaptiveInputPlan(exec: AdaptiveSparkPlanExec): SparkPlan = {
+    exec.inputPlan
+  }
 }
 
 case class ForceNativeExecutionWrapper(override val child: SparkPlan)
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
index 870de90c..c827db32 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
@@ -65,7 +65,7 @@ case class AuronCallNativeWrapper(
   private val batchRows: ArrayBuffer[InternalRow] = ArrayBuffer()
   private var batchCurRowIdx = 0
 
-  logInfo(s"Start executing native plan")
+  logInfo(s"Start executing native plan ${nativePlan.getPhysicalPlanTypeCase}")
   private var nativeRuntimePtr =
     JniBridge.callNative(NativeHelper.nativeMemory, 
AuronConf.NATIVE_LOG_LEVEL.stringConf(), this)
 
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertStrategy.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertStrategy.scala
index 2fb55c0d..a79ff2c4 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertStrategy.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertStrategy.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.TakeOrderedAndProjectExec
 import org.apache.spark.sql.execution.UnaryExecNode
 import org.apache.spark.sql.execution.UnionExec
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
 import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
 import org.apache.spark.sql.execution.aggregate.SortAggregateExec
@@ -174,7 +175,10 @@ object AuronConvertStrategy extends Logging {
         e.setTagValue(convertStrategyTag, AlwaysConvert)
       case e: LocalTableScanExec =>
         e.setTagValue(convertStrategyTag, AlwaysConvert)
-      case e: DataWritingCommandExec if isNative(e.child) =>
+      case e: DataWritingCommandExec
+          if isNative(e.child) ||
+            (e.child.isInstanceOf[AdaptiveSparkPlanExec] && isNative(
+              
Shims.get.getAdaptiveInputPlan(e.child.asInstanceOf[AdaptiveSparkPlanExec]))) =>
         e.setTagValue(convertStrategyTag, AlwaysConvert)
 
       case e if e.getTagValue(convertToNonNativeTag).contains(true) =>
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
index 9ced597b..fbac6a92 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
@@ -41,6 +41,7 @@ import 
org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.auron.plan._
 import org.apache.spark.sql.execution.auron.plan.NativeBroadcastJoinBase
 import org.apache.spark.sql.execution.auron.plan.NativeSortMergeJoinBase
@@ -255,6 +256,8 @@ abstract class Shims {
   def getMinPartitionNum(sparkSession: SparkSession): Int
 
   def postTransform(plan: SparkPlan, sc: SparkContext): Unit = {}
+
+  def getAdaptiveInputPlan(exec: AdaptiveSparkPlanExec): SparkPlan
 }
 
 object Shims {

Reply via email to