This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 801e07996a4 [SPARK-41192][CORE] Remove unscheduled speculative tasks 
when task finished to obtain better dynamic
801e07996a4 is described below

commit 801e07996a4d4ea448b6fc468cc6c9d6904ceef2
Author: wangyazhi <wangya...@baidu.com>
AuthorDate: Tue Dec 20 21:35:37 2022 -0600

    [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished 
to obtain better dynamic
    
    ### What changes were proposed in this pull request?
    ExecutorAllocationManager only record count for speculative task, 
`stageAttemptToNumSpeculativeTasks` increment when speculative task submit, and 
only decrement when speculative task end.
    If task finished before speculative task start, the speculative task will 
never be scheduled, which will cause leak of 
`stageAttemptToNumSpeculativeTasks` and mislead the calculation of target 
executors.
    
    This PR fixes the issue by add task index in 
`SparkListenerSpeculativeTaskSubmitted` event, and record speculative task with 
task index when submitted, task index should be removed when speculative task 
start or task finished(whether it is speculative or not)
    
    ### Why are the changes needed?
    To fix idle executors caused by pending speculative task from task that has 
been finished
    
    ### Does this PR introduce _any_ user-facing change?
    DeveloperApi `SparkListenerSpeculativeTaskSubmitted` add taskIndex with 
default value -1
    
    ### How was this patch tested?
    Add a comprehensive unit test.
    Pass the GA
    
    Closes #38711 from toujours33/SPARK-41192.
    
    Lead-authored-by: wangyazhi <wangya...@baidu.com>
    Co-authored-by: toujours33 <wangya...@baidu.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../apache/spark/ExecutorAllocationManager.scala   | 38 +++++----
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 14 ++--
 .../apache/spark/scheduler/DAGSchedulerEvent.scala |  2 +-
 .../org/apache/spark/scheduler/SparkListener.scala | 16 +++-
 .../apache/spark/scheduler/TaskSetManager.scala    |  2 +-
 .../spark/ExecutorAllocationManagerSuite.scala     | 97 ++++++++++++++++++++--
 .../spark/scheduler/TaskSetManagerSuite.scala      |  2 +-
 7 files changed, 139 insertions(+), 32 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 204ffc39a11..f06312c15cf 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageAttemptToNumRunningTask = new 
mutable.HashMap[StageAttempt, Int]
     private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, 
mutable.HashSet[Int]]
-    // Number of speculative tasks pending/running in each stageAttempt
-    private val stageAttemptToNumSpeculativeTasks = new 
mutable.HashMap[StageAttempt, Int]
-    // The speculative tasks started in each stageAttempt
+    // Map from each stageAttempt to a set of running speculative task indexes
+    // TODO(SPARK-41192): We simply need an Int for this.
     private val stageAttemptToSpeculativeTaskIndices =
+      new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()
+    // Map from each stageAttempt to a set of pending speculative task indexes
+    private val stageAttemptToPendingSpeculativeTasks =
       new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
 
     private val resourceProfileIdToStageAttempt =
@@ -722,7 +724,7 @@ private[spark] class ExecutorAllocationManager(
         // because the attempt may still have running tasks,
         // even after another attempt for the stage is submitted.
         stageAttemptToNumTasks -= stageAttempt
-        stageAttemptToNumSpeculativeTasks -= stageAttempt
+        stageAttemptToPendingSpeculativeTasks -= stageAttempt
         stageAttemptToTaskIndices -= stageAttempt
         stageAttemptToSpeculativeTaskIndices -= stageAttempt
         stageAttemptToExecutorPlacementHints -= stageAttempt
@@ -733,7 +735,9 @@ private[spark] class ExecutorAllocationManager(
 
         // If this is the last stage with pending tasks, mark the scheduler 
queue as empty
         // This is needed in case the stage is aborted for any reason
-        if (stageAttemptToNumTasks.isEmpty && 
stageAttemptToNumSpeculativeTasks.isEmpty) {
+        if (stageAttemptToNumTasks.isEmpty
+          && stageAttemptToPendingSpeculativeTasks.isEmpty
+          && stageAttemptToSpeculativeTaskIndices.isEmpty) {
           allocationManager.onSchedulerQueueEmpty()
         }
       }
@@ -751,6 +755,8 @@ private[spark] class ExecutorAllocationManager(
         if (taskStart.taskInfo.speculative) {
           stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt,
             new mutable.HashSet[Int]) += taskIndex
+          stageAttemptToPendingSpeculativeTasks
+            .get(stageAttempt).foreach(_.remove(taskIndex))
         } else {
           stageAttemptToTaskIndices.getOrElseUpdate(stageAttempt,
             new mutable.HashSet[Int]) += taskIndex
@@ -776,15 +782,14 @@ private[spark] class ExecutorAllocationManager(
         }
         if (taskEnd.taskInfo.speculative) {
           stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach 
{_.remove{taskIndex}}
-          // If the previous task attempt succeeded first and it was the last 
task in a stage,
-          // the stage may have been removed before handing this speculative 
TaskEnd event.
-          if (stageAttemptToNumSpeculativeTasks.contains(stageAttempt)) {
-            stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1
-          }
         }
 
         taskEnd.reason match {
-          case Success | _: TaskKilled =>
+          case Success =>
+            // Remove pending speculative task in case the normal task
+            // is finished before starting the speculative task
+            
stageAttemptToPendingSpeculativeTasks.get(stageAttempt).foreach(_.remove(taskIndex))
+          case _: TaskKilled =>
           case _ =>
             if (!hasPendingTasks) {
               // If the task failed (not intentionally killed), we expect it 
to be resubmitted
@@ -810,9 +815,10 @@ private[spark] class ExecutorAllocationManager(
       val stageId = speculativeTask.stageId
       val stageAttemptId = speculativeTask.stageAttemptId
       val stageAttempt = StageAttempt(stageId, stageAttemptId)
+      val taskIndex = speculativeTask.taskIndex
       allocationManager.synchronized {
-        stageAttemptToNumSpeculativeTasks(stageAttempt) =
-          stageAttemptToNumSpeculativeTasks.getOrElse(stageAttempt, 0) + 1
+        stageAttemptToPendingSpeculativeTasks.getOrElseUpdate(stageAttempt,
+          new mutable.HashSet[Int]).add(taskIndex)
         allocationManager.onSchedulerBacklogged()
       }
     }
@@ -843,7 +849,7 @@ private[spark] class ExecutorAllocationManager(
     def removeStageFromResourceProfileIfUnused(stageAttempt: StageAttempt): 
Unit = {
       if (!stageAttemptToNumRunningTask.contains(stageAttempt) &&
           !stageAttemptToNumTasks.contains(stageAttempt) &&
-          !stageAttemptToNumSpeculativeTasks.contains(stageAttempt) &&
+          !stageAttemptToPendingSpeculativeTasks.contains(stageAttempt) &&
           !stageAttemptToTaskIndices.contains(stageAttempt) &&
           !stageAttemptToSpeculativeTaskIndices.contains(stageAttempt)
       ) {
@@ -896,9 +902,7 @@ private[spark] class ExecutorAllocationManager(
     }
 
     private def getPendingSpeculativeTaskSum(attempt: StageAttempt): Int = {
-      val numTotalTasks = stageAttemptToNumSpeculativeTasks.getOrElse(attempt, 
0)
-      val numRunning = 
stageAttemptToSpeculativeTaskIndices.get(attempt).map(_.size).getOrElse(0)
-      numTotalTasks - numRunning
+      
stageAttemptToPendingSpeculativeTasks.get(attempt).map(_.size).getOrElse(0)
     }
 
     /**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c55d44dfd59..bb17a987717 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -383,8 +383,8 @@ private[spark] class DAGScheduler(
   /**
    * Called by the TaskSetManager when it decides a speculative task is needed.
    */
-  def speculativeTaskSubmitted(task: Task[_]): Unit = {
-    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  def speculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = {
+    eventProcessLoop.post(SpeculativeTaskSubmitted(task, taskIndex))
   }
 
   /**
@@ -1178,8 +1178,10 @@ private[spark] class DAGScheduler(
     listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, 
taskInfo))
   }
 
-  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = 
{
-    listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, 
task.stageAttemptId))
+  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], 
taskIndex: Int): Unit = {
+    val speculativeTaskSubmittedEvent = new 
SparkListenerSpeculativeTaskSubmitted(
+      task.stageId, task.stageAttemptId, taskIndex, task.partitionId)
+    listenerBus.post(speculativeTaskSubmittedEvent)
   }
 
   private[scheduler] def handleUnschedulableTaskSetAdded(
@@ -2962,8 +2964,8 @@ private[scheduler] class 
DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
     case BeginEvent(task, taskInfo) =>
       dagScheduler.handleBeginEvent(task, taskInfo)
 
-    case SpeculativeTaskSubmitted(task) =>
-      dagScheduler.handleSpeculativeTaskSubmitted(task)
+    case SpeculativeTaskSubmitted(task, taskIndex) =>
+      dagScheduler.handleSpeculativeTaskSubmitted(task, taskIndex)
 
     case UnschedulableTaskSetAdded(stageId, stageAttemptId) =>
       dagScheduler.handleUnschedulableTaskSetAdded(stageId, stageAttemptId)
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index f9df8de620f..c16e5ea03d7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -99,7 +99,7 @@ case class TaskSetFailed(taskSet: TaskSet, reason: String, 
exception: Option[Thr
 private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent
 
 private[scheduler]
-case class SpeculativeTaskSubmitted(task: Task[_]) extends DAGSchedulerEvent
+case class SpeculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1) 
extends DAGSchedulerEvent
 
 private[scheduler]
 case class UnschedulableTaskSetAdded(stageId: Int, stageAttemptId: Int)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index a9d86347940..d3bbbaffd59 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -56,7 +56,21 @@ case class SparkListenerTaskGettingResult(taskInfo: 
TaskInfo) extends SparkListe
 case class SparkListenerSpeculativeTaskSubmitted(
     stageId: Int,
     stageAttemptId: Int = 0)
-  extends SparkListenerEvent
+  extends SparkListenerEvent {
+  // Note: this is here for backwards-compatibility with older versions of 
this event which
+  // didn't stored taskIndex
+  private var _taskIndex: Int = -1
+  private var _partitionId: Int = -1
+
+  def taskIndex: Int = _taskIndex
+  def partitionId: Int = _partitionId
+
+  def this(stageId: Int, stageAttemptId: Int, taskIndex: Int, partitionId: 
Int) = {
+    this(stageId, stageAttemptId)
+    _partitionId = partitionId
+    _taskIndex = taskIndex
+  }
+}
 
 @DeveloperApi
 case class SparkListenerTaskEnd(
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 2ed1ce1a49e..cbb8fd0a334 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -1143,7 +1143,7 @@ private[spark] class TaskSetManager(
               " than %.0f ms(%d speculatable tasks in this taskset now)")
               .format(index, taskSet.id, info.host, threshold, 
speculatableTasks.size + 1))
           speculatableTasks += index
-          sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
+          sched.dagScheduler.speculativeTaskSubmitted(tasks(index), index)
         }
         foundTasksResult |= speculated
       }
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index c616c43fe1b..1cb913b248f 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -461,6 +461,16 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite 
{
     assert(numExecutorsTargetForDefaultProfileId(manager) === 10)
   }
 
+  private def speculativeTaskSubmitEventFromTaskIndex(
+    stageId: Int,
+    stageAttemptId: Int = 0,
+    taskIndex: Int = -1,
+    partitionId: Int = -1): SparkListenerSpeculativeTaskSubmitted = {
+    val event = new SparkListenerSpeculativeTaskSubmitted(stageId, 
stageAttemptId,
+      taskIndex = taskIndex, partitionId = partitionId)
+    event
+  }
+
   test("add executors when speculative tasks added") {
     val manager = createManager(createConf(0, 10, 0))
 
@@ -469,13 +479,13 @@ class ExecutorAllocationManagerSuite extends 
SparkFunSuite {
 
     post(SparkListenerStageSubmitted(createStageInfo(1, 2)))
     // Verify that we're capped at number of tasks including the speculative 
ones in the stage
-    post(SparkListenerSpeculativeTaskSubmitted(1))
+    post(speculativeTaskSubmitEventFromTaskIndex(1, taskIndex = 0))
     assert(numExecutorsTargetForDefaultProfileId(manager) === 0)
     assert(numExecutorsToAddForDefaultProfile(manager) === 1)
     assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
     doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
-    post(SparkListenerSpeculativeTaskSubmitted(1))
-    post(SparkListenerSpeculativeTaskSubmitted(1))
+    post(speculativeTaskSubmitEventFromTaskIndex(1, taskIndex = 1))
+    post(speculativeTaskSubmitEventFromTaskIndex(1, taskIndex = 2))
     assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
     assert(numExecutorsToAddForDefaultProfile(manager) === 2)
     assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
@@ -671,6 +681,83 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite 
{
     onExecutorRemoved(manager, "5")
   }
 
+  test("SPARK-41192: remove executors when task finished before speculative 
task scheduled") {
+    val clock = new ManualClock()
+    val stage = createStageInfo(0, 40)
+    val conf = createConf(0, 10, 0).set(config.EXECUTOR_CORES, 4)
+    val manager = createManager(conf, clock = clock)
+    val updatesNeeded =
+      new mutable.HashMap[ResourceProfile, 
ExecutorAllocationManager.TargetNumUpdates]
+
+    // submit 40 tasks, total executors needed = 40/4 = 10
+    post(SparkListenerStageSubmitted(stage))
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 4)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+    assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 3)
+    doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
+
+    (0 until 10).foreach(execId => onExecutorAddedDefaultProfile(manager, 
execId.toString))
+    (0 until 40).map { i => createTaskInfo(i, i, executorId = s"${i / 
4}")}.foreach {
+      info => post(SparkListenerTaskStart(0, 0, info))
+    }
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 10)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 
10)
+    // 30 tasks (0 - 29) finished
+    (0 until 30).map { i => createTaskInfo(i, i, executorId = s"${i / 
4}")}.foreach {
+      info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new 
ExecutorMetrics, null)) }
+    // 10 speculative tasks (30 - 39) launch for the remaining tasks
+    (30 until 40).foreach { index =>
+      post(speculativeTaskSubmitEventFromTaskIndex(0, taskIndex = index))}
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 5)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 
5)
+    (0 until 5).foreach { i => assert(removeExecutorDefaultProfile(manager, 
i.toString))}
+    (0 until 5).foreach { i => onExecutorRemoved(manager, i.toString)}
+
+    // 5 original tasks (30 - 34) finished before speculative task start,
+    // the speculative task will be removed from pending tasks
+    // executors needed = (5 + 5) / 4 + 1
+    (30 until 35).map { i =>
+      createTaskInfo(i, i, executorId = s"${i / 4}")}
+      .foreach { info => post(
+        SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, 
null))}
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 3)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 
3)
+
+    (40 until 45).map { i =>
+      createTaskInfo(i, i - 5, executorId = s"${i / 4}", speculative = true)
+    }.foreach {
+      info => post(SparkListenerTaskStart(0, 0, info))
+    }
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 3)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 
3)
+
+    (35 until 39).map { i =>
+      createTaskInfo(i, i, executorId = s"${i / 4}")
+    }.foreach {
+      info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new 
ExecutorMetrics, null))
+    }
+    (35 until 39).map { i =>
+      createTaskInfo(i + 5, i, executorId = s"${(i + 5) / 4}", speculative = 
true)
+    }.foreach {
+      info => post(SparkListenerTaskEnd(0, 0, null, TaskKilled("attempt"),
+        info, new ExecutorMetrics, null))
+    }
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
+    assert(numExecutorsTarget(manager, defaultProfile.id) === 1)
+    assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 
1)
+  }
+
   test("SPARK-30511 remove executors when speculative tasks end") {
     val clock = new ManualClock()
     val stage = createStageInfo(0, 40)
@@ -707,7 +794,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
     (0 to 6).foreach { i => onExecutorRemoved(manager, i.toString)}
 
     // 10 speculative tasks (30 - 39) launch for the remaining tasks
-    (30 to 39).foreach { _ => post(SparkListenerSpeculativeTaskSubmitted(0))}
+    (30 to 39).foreach { i => post(speculativeTaskSubmitEventFromTaskIndex(0, 
taskIndex = i))}
     assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
     doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
     assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
@@ -785,7 +872,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
       createTaskInfo(38, 38, executorId = "9"), new ExecutorMetrics, null))
     post(SparkListenerTaskEnd(0, 0, null, UnknownReason,
       createTaskInfo(49, 39, executorId = "12", speculative = true), new 
ExecutorMetrics, null))
-    post(SparkListenerSpeculativeTaskSubmitted(0))
+    post(speculativeTaskSubmitEventFromTaskIndex(0, taskIndex = 39))
     clock.advance(1000)
     manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
     // maxNeeded = 1, allocate one more to satisfy speculation locality 
requirement
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 2dc7f0d0dfa..a3b9eff8084 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -74,7 +74,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: 
FakeTaskScheduler)
     taskScheduler.taskSetsFailed += taskSet.id
   }
 
-  override def speculativeTaskSubmitted(task: Task[_]): Unit = {
+  override def speculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = 
{
     taskScheduler.speculativeTasks += task.partitionId
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to