Copilot commented on code in PR #1923:
URL: https://github.com/apache/auron/pull/1923#discussion_r2704712952


##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeExec.scala:
##########
@@ -157,13 +158,53 @@ case class NativeShuffleExchangeExec(
         }
         writer.stop(true).get
       }
+
+      @sparkver("4.0")
+      override def write(
+          inputs: Iterator[_],
+          dep: ShuffleDependency[_, _, _],
+          mapId: Long,
+          mapIndex: Int,
+          context: TaskContext): MapStatus = {
+
+        // [SPARK-44605][CORE] Refined the internal ShuffleWriteProcessor API.
+        // Due to the restructuring of the write method in the API, we 
optimized and refactored the original Partition.
+        val rdd = dep.rdd
+        val partition = rdd.partitions(mapIndex)
+
+        val writer = SparkEnv.get.shuffleManager.getWriter(
+          dep.shuffleHandle,
+          mapId,
+          context,
+          createMetricsReporter(context))
+
+        writer match {
+          case writer: AuronRssShuffleWriterBase[_, _] =>
+            writer.nativeRssShuffleWrite(
+              rdd.asInstanceOf[MapPartitionsRDD[_, 
_]].prev.asInstanceOf[NativeRDD],
+              dep,
+              mapId.toInt,
+              context,
+              partition,
+              numPartitions)
+
+          case writer: AuronShuffleWriterBase[_, _] =>
+            writer.nativeShuffleWrite(
+              rdd.asInstanceOf[MapPartitionsRDD[_, 
_]].prev.asInstanceOf[NativeRDD],
+              dep,
+              mapId.toInt,
+              context,
+              partition)
+        }
+        writer.stop(true).get
+      }

Review Comment:
   The `inputs` parameter is not used in this method implementation. According 
to SPARK-44605, the API was refined to pass an Iterator instead of RDD, but 
this implementation retrieves the RDD from the dependency and reconstructs the 
partition. Verify that ignoring the `inputs` parameter is intentional and 
correct for the Auron native execution model, or consider using it if it 
contains the actual input data for this map task.



##########
spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeExec.scala:
##########
@@ -172,17 +213,22 @@ case class NativeShuffleExchangeExec(
   override def canChangeNumPartitions: Boolean =
     outputPartitioning != SinglePartition
 
-  @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
+  @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5 / 4.0")
   override def shuffleOrigin = {
     import org.apache.spark.sql.execution.exchange.ShuffleOrigin;
     _shuffleOrigin.get.asInstanceOf[ShuffleOrigin]
   }
 
-  @sparkver("3.2 / 3.3 / 3.4 / 3.5")
+  @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0")
   override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
     copy(child = newChild)
 
   @sparkver("3.0 / 3.1")
   override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
     copy(child = newChildren.head)
+
+  @sparkver("4.0")
+  override def shuffleId: Int = {
+    shuffleDependency.shuffleId;

Review Comment:
   The semicolon at the end of line 232 is unnecessary in Scala and should be 
removed for consistency with Scala style conventions.
   ```suggestion
       shuffleDependency.shuffleId
   ```



##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala:
##########
@@ -137,9 +137,21 @@ abstract class NativeBroadcastExchangeBase(mode: 
BroadcastMode, override val chi
     dummyBroadcasted.asInstanceOf[Broadcast[T]]
   }
 
-  def doExecuteBroadcastNative[T](): broadcast.Broadcast[T] = {
+  @sparkver("3.0 / 3.1 / 3.2 / 3.3 / 3.4 / 3.5")
+  def getBroadcastTimeout: Long = {
     val conf = SparkSession.getActiveSession.map(_.sqlContext.conf).orNull
-    val timeout: Long = conf.broadcastTimeout
+    conf.broadcastTimeout
+  }
+
+  @sparkver("4.0")
+  def getBroadcastTimeout: Long = {
+    SparkSession.getActiveSession
+      .map(_.conf.get("spark.sql.broadcastTimeout").toLong)
+      .getOrElse(300L)

Review Comment:
   The default timeout value of 300L seconds (5 minutes) is hardcoded. This 
should match Spark's default broadcast timeout configuration value. Verify that 
300 seconds is the correct default for Spark 4.0, or consider using Spark's 
configuration constant if available to ensure consistency with Spark's defaults.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to