Repository: spark
Updated Branches:
  refs/heads/branch-1.3 64730a3de -> e54525f4a


[SPARK-10381] Fix mixup of taskAttemptNumber & attemptId in 
OutputCommitCoordinator (branch-1.3 backport)

This is a backport of #8544 to `branch-1.3` for inclusion in 1.3.2.

Author: Josh Rosen <[email protected]>

Closes #8790 from JoshRosen/SPARK-10381-1.3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e54525f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e54525f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e54525f4

Branch: refs/heads/branch-1.3
Commit: e54525f4a693c3531efba73f5b0f40e6e26f475e
Parents: 64730a3
Author: Josh Rosen <[email protected]>
Authored: Tue Sep 22 13:37:25 2015 -0700
Committer: Josh Rosen <[email protected]>
Committed: Tue Sep 22 13:37:25 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/SparkHadoopWriter.scala    |  3 +-
 .../scala/org/apache/spark/TaskEndReason.scala  |  5 +-
 .../spark/executor/CommitDeniedException.scala  |  4 +-
 .../spark/mapred/SparkHadoopMapRedUtil.scala    | 11 ++--
 .../apache/spark/scheduler/DAGScheduler.scala   |  7 +-
 .../scheduler/OutputCommitCoordinator.scala     | 49 +++++++-------
 .../org/apache/spark/scheduler/TaskInfo.scala   |  7 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |  5 +-
 .../org/apache/spark/util/JsonProtocol.scala    |  2 +-
 ...utputCommitCoordinatorIntegrationSuite.scala | 68 ++++++++++++++++++++
 .../OutputCommitCoordinatorSuite.scala          | 24 ++++---
 .../apache/spark/util/JsonProtocolSuite.scala   |  2 +-
 project/MimaExcludes.scala                      | 17 +++++
 .../spark/sql/hive/hiveWriterContainers.scala   |  2 +-
 14 files changed, 152 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e54525f4/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala 
b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 2ec42d3..c159882 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -103,8 +103,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
   }
 
   def commit() {
-    SparkHadoopMapRedUtil.commitTask(
-      getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
+    SparkHadoopMapRedUtil.commitTask(getOutputCommitter(), getTaskContext(), 
jobID, splitID)
   }
 
   def commitJob() {

http://git-wip-us.apache.org/repos/asf/spark/blob/e54525f4/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala 
b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 29a5cd5..b7fe655 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -154,10 +154,9 @@ case object TaskKilled extends TaskFailedReason {
 case class TaskCommitDenied(
     jobID: Int,
     partitionID: Int,
-    attemptID: Int)
-  extends TaskFailedReason {
+    attemptNumber: Int) extends TaskFailedReason {
   override def toErrorString: String = s"TaskCommitDenied (Driver denied task 
commit)" +
-    s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
+    s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber"
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e54525f4/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala 
b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
index f7604a3..a3b39b2 100644
--- a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
@@ -26,10 +26,10 @@ class CommitDeniedException(
     msg: String,
     jobID: Int,
     splitID: Int,
-    attemptID: Int)
+    attemptNumber: Int)
   extends Exception(msg) {
 
-  def toTaskEndReason: TaskEndReason = new TaskCommitDenied(jobID, splitID, 
attemptID)
+  def toTaskEndReason: TaskEndReason = new TaskCommitDenied(jobID, splitID, 
attemptNumber)
 
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e54525f4/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala 
b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index 639ee78..4786e97 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -89,8 +89,7 @@ object SparkHadoopMapRedUtil extends Logging {
       committer: MapReduceOutputCommitter,
       mrTaskContext: MapReduceTaskAttemptContext,
       jobId: Int,
-      splitId: Int,
-      attemptId: Int): Unit = {
+      splitId: Int): Unit = {
 
     val mrTaskAttemptID = mrTaskContext.getTaskAttemptID
 
@@ -120,7 +119,8 @@ object SparkHadoopMapRedUtil extends Logging {
 
       if (shouldCoordinateWithDriver) {
         val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
-        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, 
attemptId)
+        val taskAttemptNumber = TaskContext.get().attemptNumber()
+        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, 
taskAttemptNumber)
 
         if (canCommit) {
           performCommit()
@@ -130,7 +130,7 @@ object SparkHadoopMapRedUtil extends Logging {
           logInfo(message)
           // We need to abort the task so that the driver can reschedule new 
attempts, if necessary
           committer.abortTask(mrTaskContext)
-          throw new CommitDeniedException(message, jobId, splitId, attemptId)
+          throw new CommitDeniedException(message, jobId, splitId, 
taskAttemptNumber)
         }
       } else {
         // Speculation is disabled or a user has chosen to manually bypass the 
commit coordination
@@ -150,7 +150,6 @@ object SparkHadoopMapRedUtil extends Logging {
       committer,
       mrTaskContext,
       sparkTaskContext.stageId(),
-      sparkTaskContext.partitionId(),
-      sparkTaskContext.attemptNumber())
+      sparkTaskContext.partitionId())
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e54525f4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index e212bc3..a9b472b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -925,8 +925,11 @@ class DAGScheduler(
     val stageId = task.stageId
     val taskType = Utils.getFormattedClassName(task)
 
-    outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
-      event.taskInfo.attempt, event.reason)
+    outputCommitCoordinator.taskCompleted(
+      stageId,
+      task.partitionId,
+      event.taskInfo.attemptNumber, // this is a task attempt number
+      event.reason)
 
     // The success case is dealt with separately below, since we need to 
compute accumulator
     // updates before posting.

http://git-wip-us.apache.org/repos/asf/spark/blob/e54525f4/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 4c70958..ee30e70 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -27,7 +27,7 @@ import org.apache.spark.util.{AkkaUtils, ActorLogReceive}
 private sealed trait OutputCommitCoordinationMessage extends Serializable
 
 private case object StopCoordinator extends OutputCommitCoordinationMessage
-private case class AskPermissionToCommitOutput(stage: Int, task: Long, 
taskAttempt: Long)
+private case class AskPermissionToCommitOutput(stage: Int, partition: Int, 
attemptNumber: Int)
 
 /**
  * Authority that decides whether tasks can commit output to HDFS. Uses a 
"first committer wins"
@@ -49,8 +49,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, 
isDriver: Boolean)
   private val retryInterval = AkkaUtils.retryWaitMs(conf)
 
   private type StageId = Int
-  private type PartitionId = Long
-  private type TaskAttemptId = Long
+  private type PartitionId = Int
+  private type TaskAttemptNumber = Int
 
   /**
    * Map from active stages's id => partition id => task attempt with 
exclusive lock on committing
@@ -62,7 +62,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, 
isDriver: Boolean)
    * Access to this map should be guarded by synchronizing on the 
OutputCommitCoordinator instance.
    */
   private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
-  private type CommittersByStageMap = mutable.Map[StageId, 
mutable.Map[PartitionId, TaskAttemptId]]
+  private type CommittersByStageMap =
+    mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptNumber]]
 
   /**
    * Returns whether the OutputCommitCoordinator's internal data structures 
are all empty.
@@ -80,14 +81,15 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
    *
    * @param stage the stage number
    * @param partition the partition number
-   * @param attempt a unique identifier for this task attempt
+   * @param attemptNumber how many times this task has been attempted
+   *                      (see [[TaskContext.attemptNumber()]])
    * @return true if this task is authorized to commit, false otherwise
    */
   def canCommit(
       stage: StageId,
       partition: PartitionId,
-      attempt: TaskAttemptId): Boolean = {
-    val msg = AskPermissionToCommitOutput(stage, partition, attempt)
+      attemptNumber: TaskAttemptNumber): Boolean = {
+    val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
     coordinatorActor match {
       case Some(actor) =>
         AkkaUtils.askWithReply[Boolean](msg, actor, maxAttempts, 
retryInterval, timeout)
@@ -100,7 +102,7 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
 
   // Called by DAGScheduler
   private[scheduler] def stageStart(stage: StageId): Unit = synchronized {
-    authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, 
TaskAttemptId]()
+    authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, 
TaskAttemptNumber]()
   }
 
   // Called by DAGScheduler
@@ -112,7 +114,7 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
   private[scheduler] def taskCompleted(
       stage: StageId,
       partition: PartitionId,
-      attempt: TaskAttemptId,
+      attemptNumber: TaskAttemptNumber,
       reason: TaskEndReason): Unit = synchronized {
     val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
       logDebug(s"Ignoring task completion for completed stage")
@@ -122,12 +124,12 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
       case Success =>
       // The task output has been committed successfully
       case denied: TaskCommitDenied =>
-        logInfo(
-          s"Task was denied committing, stage: $stage, partition: $partition, 
attempt: $attempt")
+        logInfo(s"Task was denied committing, stage: $stage, partition: 
$partition, " +
+          s"attempt: $attemptNumber")
       case otherReason =>
-        if (authorizedCommitters.get(partition).exists(_ == attempt)) {
-          logDebug(s"Authorized committer $attempt (stage=$stage, 
partition=$partition) failed;" +
-            s" clearing lock")
+        if (authorizedCommitters.get(partition).exists(_ == attemptNumber)) {
+          logDebug(s"Authorized committer (attemptNumber=$attemptNumber, 
stage=$stage, " +
+            s"partition=$partition) failed; clearing lock")
           authorizedCommitters.remove(partition)
         }
     }
@@ -145,21 +147,23 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
   private[scheduler] def handleAskPermissionToCommit(
       stage: StageId,
       partition: PartitionId,
-      attempt: TaskAttemptId): Boolean = synchronized {
+      attemptNumber: TaskAttemptNumber): Boolean = synchronized {
     authorizedCommittersByStage.get(stage) match {
       case Some(authorizedCommitters) =>
         authorizedCommitters.get(partition) match {
           case Some(existingCommitter) =>
-            logDebug(s"Denying $attempt to commit for stage=$stage, 
partition=$partition; " +
-              s"existingCommitter = $existingCommitter")
+            logDebug(s"Denying attemptNumber=$attemptNumber to commit for 
stage=$stage, " +
+              s"partition=$partition; existingCommitter = $existingCommitter")
             false
           case None =>
-            logDebug(s"Authorizing $attempt to commit for stage=$stage, 
partition=$partition")
-            authorizedCommitters(partition) = attempt
+            logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for 
stage=$stage, " +
+              s"partition=$partition")
+            authorizedCommitters(partition) = attemptNumber
             true
         }
       case None =>
-        logDebug(s"Stage $stage has completed, so not allowing task attempt 
$attempt to commit")
+        logDebug(s"Stage $stage has completed, so not allowing attempt number 
$attemptNumber of" +
+          s"partition $partition to commit")
         false
     }
   }
@@ -172,8 +176,9 @@ private[spark] object OutputCommitCoordinator {
     extends Actor with ActorLogReceive with Logging {
 
     override def receiveWithLogging = {
-      case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
-        sender ! outputCommitCoordinator.handleAskPermissionToCommit(stage, 
partition, taskAttempt)
+      case AskPermissionToCommitOutput(stage, partition, attemptNumber) =>
+        sender ! outputCommitCoordinator.handleAskPermissionToCommit(
+          stage, partition, attemptNumber)
       case StopCoordinator =>
         logInfo("OutputCommitCoordinator stopped!")
         context.stop(self)

http://git-wip-us.apache.org/repos/asf/spark/blob/e54525f4/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 132a9ce..f113c2b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -29,7 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
 class TaskInfo(
     val taskId: Long,
     val index: Int,
-    val attempt: Int,
+    val attemptNumber: Int,
     val launchTime: Long,
     val executorId: String,
     val host: String,
@@ -95,7 +95,10 @@ class TaskInfo(
     }
   }
 
-  def id: String = s"$index.$attempt"
+  @deprecated("Use attemptNumber", "1.6.0")
+  def attempt: Int = attemptNumber
+
+  def id: String = s"$index.$attemptNumber"
 
   def duration: Long = {
     if (!finished) {

http://git-wip-us.apache.org/repos/asf/spark/blob/e54525f4/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index a121556..4e54ec8 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -517,8 +517,9 @@ private[ui] class StagePage(parent: StagesTab) extends 
WebUIPage("stage") {
       <tr>
         <td>{info.index}</td>
         <td>{info.taskId}</td>
-        <td sorttable_customkey={info.attempt.toString}>{
-          if (info.speculative) s"${info.attempt} (speculative)" else 
info.attempt.toString
+        <td sorttable_customkey={info.attemptNumber.toString}>{
+          if (info.speculative) s"${info.attemptNumber} (speculative)"
+          else info.attemptNumber.toString
         }</td>
         <td>{info.status}</td>
         <td>{info.taskLocality}</td>

http://git-wip-us.apache.org/repos/asf/spark/blob/e54525f4/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 474f79f..c332765 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -246,7 +246,7 @@ private[spark] object JsonProtocol {
   def taskInfoToJson(taskInfo: TaskInfo): JValue = {
     ("Task ID" -> taskInfo.taskId) ~
     ("Index" -> taskInfo.index) ~
-    ("Attempt" -> taskInfo.attempt) ~
+    ("Attempt" -> taskInfo.attemptNumber) ~
     ("Launch Time" -> taskInfo.launchTime) ~
     ("Executor ID" -> taskInfo.executorId) ~
     ("Host" -> taskInfo.host) ~

http://git-wip-us.apache.org/repos/asf/spark/blob/e54525f4/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
new file mode 100644
index 0000000..1ae5b03
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.{Span, Seconds}
+
+import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext, 
SparkFunSuite, TaskContext}
+import org.apache.spark.util.Utils
+
+/**
+ * Integration tests for the OutputCommitCoordinator.
+ *
+ * See also: [[OutputCommitCoordinatorSuite]] for unit tests that use mocks.
+ */
+class OutputCommitCoordinatorIntegrationSuite
+  extends SparkFunSuite
+  with LocalSparkContext
+  with Timeouts {
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    val conf = new SparkConf()
+      .set("master", "local[2,4]")
+      .set("spark.speculation", "true")
+      .set("spark.hadoop.mapred.output.committer.class",
+        classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName)
+    sc = new SparkContext("local[2, 4]", "test", conf)
+  }
+
+  test("exception thrown in OutputCommitter.commitTask()") {
+    // Regression test for SPARK-10381
+    failAfter(Span(60, Seconds)) {
+      val tempDir = Utils.createTempDir()
+      try {
+        sc.parallelize(1 to 4, 
2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
+      } finally {
+        Utils.deleteRecursively(tempDir)
+      }
+    }
+  }
+}
+
+private class ThrowExceptionOnFirstAttemptOutputCommitter extends 
FileOutputCommitter {
+  override def commitTask(context: TaskAttemptContext): Unit = {
+    val ctx = TaskContext.get()
+    if (ctx.attemptNumber < 1) {
+      throw new java.io.FileNotFoundException("Intentional exception")
+    }
+    super.commitTask(context)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e54525f4/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index 14ecf72..14a0ccf 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -63,6 +63,9 @@ import scala.language.postfixOps
  * was not in SparkHadoopWriter, the tests would still pass because only one 
of the
  * increments would be captured even though the commit in both tasks was 
executed
  * erroneously.
+ *
+ * See also: [[OutputCommitCoordinatorIntegrationSuite]] for integration tests 
that do
+ * not use mocks.
  */
 class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
 
@@ -164,27 +167,28 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite 
with BeforeAndAfter {
 
   test("Only authorized committer failures can clear the authorized committer 
lock (SPARK-6614)") {
     val stage: Int = 1
-    val partition: Long = 2
-    val authorizedCommitter: Long = 3
-    val nonAuthorizedCommitter: Long = 100
+    val partition: Int = 2
+    val authorizedCommitter: Int = 3
+    val nonAuthorizedCommitter: Int = 100
     outputCommitCoordinator.stageStart(stage)
-    assert(outputCommitCoordinator.canCommit(stage, partition, attempt = 
authorizedCommitter))
-    assert(!outputCommitCoordinator.canCommit(stage, partition, attempt = 
nonAuthorizedCommitter))
+
+    assert(outputCommitCoordinator.canCommit(stage, partition, 
authorizedCommitter))
+    assert(!outputCommitCoordinator.canCommit(stage, partition, 
nonAuthorizedCommitter))
     // The non-authorized committer fails
     outputCommitCoordinator.taskCompleted(
-      stage, partition, attempt = nonAuthorizedCommitter, reason = TaskKilled)
+      stage, partition, attemptNumber = nonAuthorizedCommitter, reason = 
TaskKilled)
     // New tasks should still not be able to commit because the authorized 
committer has not failed
     assert(
-      !outputCommitCoordinator.canCommit(stage, partition, attempt = 
nonAuthorizedCommitter + 1))
+      !outputCommitCoordinator.canCommit(stage, partition, 
nonAuthorizedCommitter + 1))
     // The authorized committer now fails, clearing the lock
     outputCommitCoordinator.taskCompleted(
-      stage, partition, attempt = authorizedCommitter, reason = TaskKilled)
+      stage, partition, attemptNumber = authorizedCommitter, reason = 
TaskKilled)
     // A new task should now be allowed to become the authorized committer
     assert(
-      outputCommitCoordinator.canCommit(stage, partition, attempt = 
nonAuthorizedCommitter + 2))
+      outputCommitCoordinator.canCommit(stage, partition, 
nonAuthorizedCommitter + 2))
     // There can only be one authorized committer
     assert(
-      !outputCommitCoordinator.canCommit(stage, partition, attempt = 
nonAuthorizedCommitter + 3))
+      !outputCommitCoordinator.canCommit(stage, partition, 
nonAuthorizedCommitter + 3))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e54525f4/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index b9d600b..90caf96 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -456,7 +456,7 @@ class JsonProtocolSuite extends SparkFunSuite {
   private def assertEquals(info1: TaskInfo, info2: TaskInfo) {
     assert(info1.taskId === info2.taskId)
     assert(info1.index === info2.index)
-    assert(info1.attempt === info2.attempt)
+    assert(info1.attemptNumber === info2.attemptNumber)
     assert(info1.launchTime === info2.launchTime)
     assert(info1.executorId === info2.executorId)
     assert(info1.host === info2.host)

http://git-wip-us.apache.org/repos/asf/spark/blob/e54525f4/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index ee6229a..31f6781 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -153,6 +153,23 @@ object MimaExcludes {
             
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.RealClock"),
             
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Clock"),
             
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.TestClock")
+          ) ++ Seq(
+            // SPARK-10381 Fix types / units in private 
AskPermissionToCommitOutput RPC message.
+            // This class is marked as `private` but MiMa still seems to be 
confused by the change.
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.scheduler.AskPermissionToCommitOutput.task"),
+            ProblemFilters.exclude[IncompatibleResultTypeProblem](
+               
"org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$2"),
+            ProblemFilters.exclude[IncompatibleMethTypeProblem](
+              "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              
"org.apache.spark.scheduler.AskPermissionToCommitOutput.taskAttempt"),
+            ProblemFilters.exclude[IncompatibleResultTypeProblem](
+              
"org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$3"),
+            ProblemFilters.exclude[IncompatibleMethTypeProblem](
+              "org.apache.spark.scheduler.AskPermissionToCommitOutput.this"),
+            ProblemFilters.exclude[IncompatibleMethTypeProblem](
+              "org.apache.spark.scheduler.AskPermissionToCommitOutput.apply")
           )
 
         case v if v.startsWith("1.2") =>

http://git-wip-us.apache.org/repos/asf/spark/blob/e54525f4/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 8398da2..8169e83 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -117,7 +117,7 @@ private[hive] class SparkHiveWriterContainer(
   }
 
   protected def commit() {
-    SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID, 
attemptID)
+    SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID)
   }
 
   private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to