Repository: spark
Updated Branches:
  refs/heads/branch-2.1 c4c5328f2 -> bc7f05f5f


[SPARK-18219] Move commit protocol API (internal) from sql/core to core module

## What changes were proposed in this pull request?
This patch moves the new commit protocol API from sql/core to core module, so 
we can use it in the future in the RDD API.

As part of this patch, I also moved the speficiation of the random uuid for the 
write path out of the commit protocol, and instead pass in a job id.

## How was this patch tested?
N/A

Author: Reynold Xin <r...@databricks.com>

Closes #15731 from rxin/SPARK-18219.

(cherry picked from commit 937af592e65f4dd878aafcabf8fe2cfe7fa3d9b3)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc7f05f5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc7f05f5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc7f05f5

Branch: refs/heads/branch-2.1
Commit: bc7f05f5f03653c623190b8178bcbe981a41c2f3
Parents: c4c5328
Author: Reynold Xin <r...@databricks.com>
Authored: Thu Nov 3 02:42:48 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Nov 3 02:43:03 2016 -0700

----------------------------------------------------------------------
 .../spark/internal/io/FileCommitProtocol.scala  | 126 +++++++++
 .../io/HadoopMapReduceCommitProtocol.scala      | 111 ++++++++
 .../datasources/FileCommitProtocol.scala        | 257 -------------------
 .../datasources/FileFormatWriter.scala          |   3 +-
 .../InsertIntoHadoopFsRelationCommand.scala     |   6 +-
 .../SQLHadoopMapReduceCommitProtocol.scala      |  72 ++++++
 .../execution/streaming/FileStreamSink.scala    |   9 +-
 .../streaming/ManifestFileCommitProtocol.scala  |   6 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |   4 +-
 9 files changed, 327 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bc7f05f5/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala 
b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
new file mode 100644
index 0000000..fb80205
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import org.apache.hadoop.mapreduce._
+
+import org.apache.spark.util.Utils
+
+
+/**
+ * An interface to define how a single Spark job commits its outputs. Two 
notes:
+ *
+ * 1. Implementations must be serializable, as the committer instance 
instantiated on the driver
+ *    will be used for tasks on executors.
+ * 2. Implementations should have a constructor with either 2 or 3 arguments:
+ *    (jobId: String, path: String) or (jobId: String, path: String, isAppend: 
Boolean).
+ * 3. A committer should not be reused across multiple Spark jobs.
+ *
+ * The proper call sequence is:
+ *
+ * 1. Driver calls setupJob.
+ * 2. As part of each task's execution, executor calls setupTask and then 
commitTask
+ *    (or abortTask if task failed).
+ * 3. When all necessary tasks completed successfully, the driver calls 
commitJob. If the job
+ *    failed to execute (e.g. too many failed tasks), the job should call 
abortJob.
+ */
+abstract class FileCommitProtocol {
+  import FileCommitProtocol._
+
+  /**
+   * Setups up a job. Must be called on the driver before any other methods 
can be invoked.
+   */
+  def setupJob(jobContext: JobContext): Unit
+
+  /**
+   * Commits a job after the writes succeed. Must be called on the driver.
+   */
+  def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): 
Unit
+
+  /**
+   * Aborts a job after the writes fail. Must be called on the driver.
+   *
+   * Calling this function is a best-effort attempt, because it is possible 
that the driver
+   * just crashes (or killed) before it can call abort.
+   */
+  def abortJob(jobContext: JobContext): Unit
+
+  /**
+   * Sets up a task within a job.
+   * Must be called before any other task related methods can be invoked.
+   */
+  def setupTask(taskContext: TaskAttemptContext): Unit
+
+  /**
+   * Notifies the commit protocol to add a new file, and gets back the full 
path that should be
+   * used. Must be called on the executors when running tasks.
+   *
+   * Note that the returned temp file may have an arbitrary path. The commit 
protocol only
+   * promises that the file will be at the location specified by the arguments 
after job commit.
+   *
+   * A full file path consists of the following parts:
+   *  1. the base path
+   *  2. some sub-directory within the base path, used to specify partitioning
+   *  3. file prefix, usually some unique job id with the task id
+   *  4. bucket id
+   *  5. source specific file extension, e.g. ".snappy.parquet"
+   *
+   * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 
5, and the rest
+   * are left to the commit protocol implementation to decide.
+   */
+  def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], 
ext: String): String
+
+  /**
+   * Commits a task after the writes succeed. Must be called on the executors 
when running tasks.
+   */
+  def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
+
+  /**
+   * Aborts a task after the writes have failed. Must be called on the 
executors when running tasks.
+   *
+   * Calling this function is a best-effort attempt, because it is possible 
that the executor
+   * just crashes (or killed) before it can call abort.
+   */
+  def abortTask(taskContext: TaskAttemptContext): Unit
+}
+
+
+object FileCommitProtocol {
+  class TaskCommitMessage(val obj: Any) extends Serializable
+
+  object EmptyTaskCommitMessage extends TaskCommitMessage(null)
+
+  /**
+   * Instantiates a FileCommitProtocol using the given className.
+   */
+  def instantiate(className: String, jobId: String, outputPath: String, 
isAppend: Boolean)
+    : FileCommitProtocol = {
+    val clazz = 
Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
+
+    // First try the one with argument (jobId: String, outputPath: String, 
isAppend: Boolean).
+    // If that doesn't exist, try the one with (jobId: string, outputPath: 
String).
+    try {
+      val ctor = clazz.getDeclaredConstructor(classOf[String], 
classOf[String], classOf[Boolean])
+      ctor.newInstance(jobId, outputPath, 
isAppend.asInstanceOf[java.lang.Boolean])
+    } catch {
+      case _: NoSuchMethodException =>
+        val ctor = clazz.getDeclaredConstructor(classOf[String], 
classOf[String])
+        ctor.newInstance(jobId, outputPath)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bc7f05f5/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
new file mode 100644
index 0000000..66ccb6d
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import java.util.Date
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import org.apache.spark.SparkHadoopWriter
+import org.apache.spark.internal.Logging
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
+
+/**
+ * An [[FileCommitProtocol]] implementation backed by an underlying Hadoop 
OutputCommitter
+ * (from the newer mapreduce API, not the old mapred API).
+ *
+ * Unlike Hadoop's OutputCommitter, this implementation is serializable.
+ */
+class HadoopMapReduceCommitProtocol(jobId: String, path: String)
+  extends FileCommitProtocol with Serializable with Logging {
+
+  import FileCommitProtocol._
+
+  /** OutputCommitter from Hadoop is not serializable so marking it transient. 
*/
+  @transient private var committer: OutputCommitter = _
+
+  protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = 
{
+    context.getOutputFormatClass.newInstance().getOutputCommitter(context)
+  }
+
+  override def newTaskTempFile(
+      taskContext: TaskAttemptContext, dir: Option[String], ext: String): 
String = {
+    // The file name looks like 
part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+    // Note that %05d does not truncate the split number, so if we have more 
than 100000 tasks,
+    // the file name is fine and won't overflow.
+    val split = taskContext.getTaskAttemptID.getTaskID.getId
+    val filename = f"part-$split%05d-$jobId$ext"
+
+    val stagingDir: String = committer match {
+      // For FileOutputCommitter it has its own staging path called "work 
path".
+      case f: FileOutputCommitter => 
Option(f.getWorkPath.toString).getOrElse(path)
+      case _ => path
+    }
+
+    dir.map { d =>
+      new Path(new Path(stagingDir, d), filename).toString
+    }.getOrElse {
+      new Path(stagingDir, filename).toString
+    }
+  }
+
+  override def setupJob(jobContext: JobContext): Unit = {
+    // Setup IDs
+    val jobId = SparkHadoopWriter.createJobID(new Date, 0)
+    val taskId = new TaskID(jobId, TaskType.MAP, 0)
+    val taskAttemptId = new TaskAttemptID(taskId, 0)
+
+    // Set up the configuration object
+    jobContext.getConfiguration.set("mapred.job.id", jobId.toString)
+    jobContext.getConfiguration.set("mapred.tip.id", 
taskAttemptId.getTaskID.toString)
+    jobContext.getConfiguration.set("mapred.task.id", taskAttemptId.toString)
+    jobContext.getConfiguration.setBoolean("mapred.task.is.map", true)
+    jobContext.getConfiguration.setInt("mapred.task.partition", 0)
+
+    val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
+    committer = setupCommitter(taskAttemptContext)
+    committer.setupJob(jobContext)
+  }
+
+  override def commitJob(jobContext: JobContext, taskCommits: 
Seq[TaskCommitMessage]): Unit = {
+    committer.commitJob(jobContext)
+  }
+
+  override def abortJob(jobContext: JobContext): Unit = {
+    committer.abortJob(jobContext, JobStatus.State.FAILED)
+  }
+
+  override def setupTask(taskContext: TaskAttemptContext): Unit = {
+    committer = setupCommitter(taskContext)
+    committer.setupTask(taskContext)
+  }
+
+  override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
+    val attemptId = taskContext.getTaskAttemptID
+    SparkHadoopMapRedUtil.commitTask(
+      committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
+    EmptyTaskCommitMessage
+  }
+
+  override def abortTask(taskContext: TaskAttemptContext): Unit = {
+    committer.abortTask(taskContext)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bc7f05f5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
deleted file mode 100644
index f5dd5ce..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import java.util.{Date, UUID}
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-
-import org.apache.spark.SparkHadoopWriter
-import org.apache.spark.internal.Logging
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.Utils
-
-
-object FileCommitProtocol {
-  class TaskCommitMessage(val obj: Any) extends Serializable
-
-  object EmptyTaskCommitMessage extends TaskCommitMessage(null)
-
-  /**
-   * Instantiates a FileCommitProtocol using the given className.
-   */
-  def instantiate(className: String, outputPath: String, isAppend: Boolean): 
FileCommitProtocol = {
-    try {
-      val clazz = 
Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
-
-      // First try the one with argument (outputPath: String, isAppend: 
Boolean).
-      // If that doesn't exist, try the one with (outputPath: String).
-      try {
-        val ctor = clazz.getDeclaredConstructor(classOf[String], 
classOf[Boolean])
-        ctor.newInstance(outputPath, isAppend.asInstanceOf[java.lang.Boolean])
-      } catch {
-        case _: NoSuchMethodException =>
-          val ctor = clazz.getDeclaredConstructor(classOf[String])
-          ctor.newInstance(outputPath)
-      }
-    } catch {
-      case e: ClassNotFoundException =>
-        throw e
-    }
-  }
-}
-
-
-/**
- * An interface to define how a single Spark job commits its outputs. Two 
notes:
- *
- * 1. Implementations must be serializable, as the committer instance 
instantiated on the driver
- *    will be used for tasks on executors.
- * 2. A committer should not be reused across multiple Spark jobs.
- *
- * The proper call sequence is:
- *
- * 1. Driver calls setupJob.
- * 2. As part of each task's execution, executor calls setupTask and then 
commitTask
- *    (or abortTask if task failed).
- * 3. When all necessary tasks completed successfully, the driver calls 
commitJob. If the job
- *    failed to execute (e.g. too many failed tasks), the job should call 
abortJob.
- */
-abstract class FileCommitProtocol {
-  import FileCommitProtocol._
-
-  /**
-   * Setups up a job. Must be called on the driver before any other methods 
can be invoked.
-   */
-  def setupJob(jobContext: JobContext): Unit
-
-  /**
-   * Commits a job after the writes succeed. Must be called on the driver.
-   */
-  def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): 
Unit
-
-  /**
-   * Aborts a job after the writes fail. Must be called on the driver.
-   *
-   * Calling this function is a best-effort attempt, because it is possible 
that the driver
-   * just crashes (or killed) before it can call abort.
-   */
-  def abortJob(jobContext: JobContext): Unit
-
-  /**
-   * Sets up a task within a job.
-   * Must be called before any other task related methods can be invoked.
-   */
-  def setupTask(taskContext: TaskAttemptContext): Unit
-
-  /**
-   * Notifies the commit protocol to add a new file, and gets back the full 
path that should be
-   * used. Must be called on the executors when running tasks.
-   *
-   * Note that the returned temp file may have an arbitrary path. The commit 
protocol only
-   * promises that the file will be at the location specified by the arguments 
after job commit.
-   *
-   * A full file path consists of the following parts:
-   *  1. the base path
-   *  2. some sub-directory within the base path, used to specify partitioning
-   *  3. file prefix, usually some unique job id with the task id
-   *  4. bucket id
-   *  5. source specific file extension, e.g. ".snappy.parquet"
-   *
-   * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 
5, and the rest
-   * are left to the commit protocol implementation to decide.
-   */
-  def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], 
ext: String): String
-
-  /**
-   * Commits a task after the writes succeed. Must be called on the executors 
when running tasks.
-   */
-  def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
-
-  /**
-   * Aborts a task after the writes have failed. Must be called on the 
executors when running tasks.
-   *
-   * Calling this function is a best-effort attempt, because it is possible 
that the executor
-   * just crashes (or killed) before it can call abort.
-   */
-  def abortTask(taskContext: TaskAttemptContext): Unit
-}
-
-
-/**
- * An [[FileCommitProtocol]] implementation backed by an underlying Hadoop 
OutputCommitter
- * (from the newer mapreduce API, not the old mapred API).
- *
- * Unlike Hadoop's OutputCommitter, this implementation is serializable.
- */
-class HadoopCommitProtocolWrapper(path: String, isAppend: Boolean)
-  extends FileCommitProtocol with Serializable with Logging {
-
-  import FileCommitProtocol._
-
-  /** OutputCommitter from Hadoop is not serializable so marking it transient. 
*/
-  @transient private var committer: OutputCommitter = _
-
-  /** UUID used to identify the job in file name. */
-  private val uuid: String = UUID.randomUUID().toString
-
-  private def setupCommitter(context: TaskAttemptContext): Unit = {
-    committer = 
context.getOutputFormatClass.newInstance().getOutputCommitter(context)
-
-    if (!isAppend) {
-      // If we are appending data to an existing dir, we will only use the 
output committer
-      // associated with the file output format since it is not safe to use a 
custom
-      // committer for appending. For example, in S3, direct parquet output 
committer may
-      // leave partial data in the destination dir when the appending job 
fails.
-      // See SPARK-8578 for more details.
-      val configuration = context.getConfiguration
-      val clazz =
-        configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, 
classOf[OutputCommitter])
-
-      if (clazz != null) {
-        logInfo(s"Using user defined output committer class 
${clazz.getCanonicalName}")
-
-        // Every output format based on 
org.apache.hadoop.mapreduce.lib.output.OutputFormat
-        // has an associated output committer. To override this output 
committer,
-        // we will first try to use the output committer set in 
SQLConf.OUTPUT_COMMITTER_CLASS.
-        // If a data source needs to override the output committer, it needs 
to set the
-        // output committer in prepareForWrite method.
-        if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
-          // The specified output committer is a FileOutputCommitter.
-          // So, we will use the FileOutputCommitter-specified constructor.
-          val ctor = clazz.getDeclaredConstructor(classOf[Path], 
classOf[TaskAttemptContext])
-          committer = ctor.newInstance(new Path(path), context)
-        } else {
-          // The specified output committer is just an OutputCommitter.
-          // So, we will use the no-argument constructor.
-          val ctor = clazz.getDeclaredConstructor()
-          committer = ctor.newInstance()
-        }
-      }
-    }
-    logInfo(s"Using output committer class 
${committer.getClass.getCanonicalName}")
-  }
-
-  override def newTaskTempFile(
-      taskContext: TaskAttemptContext, dir: Option[String], ext: String): 
String = {
-    // The file name looks like 
part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
-    // Note that %05d does not truncate the split number, so if we have more 
than 100000 tasks,
-    // the file name is fine and won't overflow.
-    val split = taskContext.getTaskAttemptID.getTaskID.getId
-    val filename = f"part-$split%05d-$uuid$ext"
-
-    val stagingDir: String = committer match {
-      // For FileOutputCommitter it has its own staging path called "work 
path".
-      case f: FileOutputCommitter => 
Option(f.getWorkPath.toString).getOrElse(path)
-      case _ => path
-    }
-
-    dir.map { d =>
-      new Path(new Path(stagingDir, d), filename).toString
-    }.getOrElse {
-      new Path(stagingDir, filename).toString
-    }
-  }
-
-  override def setupJob(jobContext: JobContext): Unit = {
-    // Setup IDs
-    val jobId = SparkHadoopWriter.createJobID(new Date, 0)
-    val taskId = new TaskID(jobId, TaskType.MAP, 0)
-    val taskAttemptId = new TaskAttemptID(taskId, 0)
-
-    // Set up the configuration object
-    jobContext.getConfiguration.set("mapred.job.id", jobId.toString)
-    jobContext.getConfiguration.set("mapred.tip.id", 
taskAttemptId.getTaskID.toString)
-    jobContext.getConfiguration.set("mapred.task.id", taskAttemptId.toString)
-    jobContext.getConfiguration.setBoolean("mapred.task.is.map", true)
-    jobContext.getConfiguration.setInt("mapred.task.partition", 0)
-
-    val taskAttemptContext = new 
TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
-    setupCommitter(taskAttemptContext)
-
-    committer.setupJob(jobContext)
-  }
-
-  override def commitJob(jobContext: JobContext, taskCommits: 
Seq[TaskCommitMessage]): Unit = {
-    committer.commitJob(jobContext)
-  }
-
-  override def abortJob(jobContext: JobContext): Unit = {
-    committer.abortJob(jobContext, JobStatus.State.FAILED)
-  }
-
-  override def setupTask(taskContext: TaskAttemptContext): Unit = {
-    setupCommitter(taskContext)
-    committer.setupTask(taskContext)
-  }
-
-  override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
-    val attemptId = taskContext.getTaskAttemptID
-    SparkHadoopMapRedUtil.commitTask(
-      committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
-    EmptyTaskCommitMessage
-  }
-
-  override def abortTask(taskContext: TaskAttemptContext): Unit = {
-    committer.abortTask(taskContext)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/bc7f05f5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index bc00a0a..e404dcd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -29,6 +29,8 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 
 import org.apache.spark._
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.sql.{Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -37,7 +39,6 @@ import 
org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SQLExecution, UnsafeKVExternalSorter}
-import 
org.apache.spark.sql.execution.datasources.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

http://git-wip-us.apache.org/repos/asf/spark/blob/bc7f05f5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 230c74a..927c0c5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -21,6 +21,7 @@ import java.io.IOException
 
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -86,8 +87,9 @@ case class InsertIntoHadoopFsRelationCommand(
     if (doInsertion) {
       val committer = FileCommitProtocol.instantiate(
         sparkSession.sessionState.conf.fileCommitProtocolClass,
-        outputPath.toString,
-        isAppend)
+        jobId = java.util.UUID.randomUUID().toString,
+        outputPath = outputPath.toString,
+        isAppend = isAppend)
 
       FileFormatWriter.write(
         sparkSession = sparkSession,

http://git-wip-us.apache.org/repos/asf/spark/blob/bc7f05f5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
new file mode 100644
index 0000000..9b9ed28
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * A variant of [[HadoopMapReduceCommitProtocol]] that allows specifying the 
actual
+ * Hadoop output committer using an option specified in SQLConf.
+ */
+class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: 
Boolean)
+  extends HadoopMapReduceCommitProtocol(jobId, path) with Serializable with 
Logging {
+
+  override protected def setupCommitter(context: TaskAttemptContext): 
OutputCommitter = {
+    var committer = 
context.getOutputFormatClass.newInstance().getOutputCommitter(context)
+
+    if (!isAppend) {
+      // If we are appending data to an existing dir, we will only use the 
output committer
+      // associated with the file output format since it is not safe to use a 
custom
+      // committer for appending. For example, in S3, direct parquet output 
committer may
+      // leave partial data in the destination dir when the appending job 
fails.
+      // See SPARK-8578 for more details.
+      val configuration = context.getConfiguration
+      val clazz =
+        configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, 
classOf[OutputCommitter])
+
+      if (clazz != null) {
+        logInfo(s"Using user defined output committer class 
${clazz.getCanonicalName}")
+
+        // Every output format based on 
org.apache.hadoop.mapreduce.lib.output.OutputFormat
+        // has an associated output committer. To override this output 
committer,
+        // we will first try to use the output committer set in 
SQLConf.OUTPUT_COMMITTER_CLASS.
+        // If a data source needs to override the output committer, it needs 
to set the
+        // output committer in prepareForWrite method.
+        if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
+          // The specified output committer is a FileOutputCommitter.
+          // So, we will use the FileOutputCommitter-specified constructor.
+          val ctor = clazz.getDeclaredConstructor(classOf[Path], 
classOf[TaskAttemptContext])
+          committer = ctor.newInstance(new Path(path), context)
+        } else {
+          // The specified output committer is just an OutputCommitter.
+          // So, we will use the no-argument constructor.
+          val ctor = clazz.getDeclaredConstructor()
+          committer = ctor.newInstance()
+        }
+      }
+    }
+    logInfo(s"Using output committer class 
${committer.getClass.getCanonicalName}")
+    committer
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bc7f05f5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index daec2b5..e849caf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -20,9 +20,10 @@ package org.apache.spark.sql.execution.streaming
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.datasources.{FileCommitProtocol, 
FileFormat, FileFormatWriter}
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
FileFormatWriter}
 
 object FileStreamSink {
   // The name of the subdirectory that is used to store metadata about which 
files are valid.
@@ -54,7 +55,11 @@ class FileStreamSink(
       logInfo(s"Skipping already committed batch $batchId")
     } else {
       val committer = FileCommitProtocol.instantiate(
-        sparkSession.sessionState.conf.streamingFileCommitProtocolClass, path, 
isAppend = false)
+        className = 
sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
+        jobId = batchId.toString,
+        outputPath = path,
+        isAppend = false)
+
       committer match {
         case manifestCommitter: ManifestFileCommitProtocol =>
           manifestCommitter.setupManifestOptions(fileLog, batchId)

http://git-wip-us.apache.org/repos/asf/spark/blob/bc7f05f5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
index 5103122..1fe13fa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
@@ -25,8 +25,8 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.FileCommitProtocol
-import 
org.apache.spark.sql.execution.datasources.FileCommitProtocol.TaskCommitMessage
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 
 /**
  * A [[FileCommitProtocol]] that tracks the list of valid files in a manifest 
file, used in
@@ -34,7 +34,7 @@ import 
org.apache.spark.sql.execution.datasources.FileCommitProtocol.TaskCommitM
  *
  * @param path path to write the final output to.
  */
-class ManifestFileCommitProtocol(path: String)
+class ManifestFileCommitProtocol(jobId: String, path: String)
   extends FileCommitProtocol with Serializable with Logging {
 
   // Track the list of files added by a task, only used on the executors.

http://git-wip-us.apache.org/repos/asf/spark/blob/bc7f05f5/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7bb3ac0..7b8ed65 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.sql.catalyst.CatalystConf
-import org.apache.spark.sql.execution.datasources.HadoopCommitProtocolWrapper
+import 
org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
 import org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol
 import org.apache.spark.util.Utils
 
@@ -385,7 +385,7 @@ object SQLConf {
     SQLConfigBuilder("spark.sql.sources.commitProtocolClass")
       .internal()
       .stringConf
-      .createWithDefault(classOf[HadoopCommitProtocolWrapper].getName)
+      .createWithDefault(classOf[SQLHadoopMapReduceCommitProtocol].getName)
 
   val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
     SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to