Repository: spark
Updated Branches:
  refs/heads/master 50c727080 -> 10c546e9d


[SPARK-7599] [SQL] Don't restrict customized output committers to be subclasses 
of FileOutputCommitter

Author: Cheng Lian <l...@databricks.com>

Closes #6118 from liancheng/spark-7599 and squashes the following commits:

31e1bd6 [Cheng Lian] Don't restrict customized output committers to be 
subclasses of FileOutputCommitter


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10c546e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10c546e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10c546e9

Branch: refs/heads/master
Commit: 10c546e9d42a0f3fbf45c919e74f62c548ca8347
Parents: 50c7270
Author: Cheng Lian <l...@databricks.com>
Authored: Wed May 13 07:35:55 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed May 13 07:35:55 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/sources/commands.scala | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/10c546e9/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 8372d2c..fe8be5b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -244,7 +244,7 @@ private[sql] abstract class BaseWriterContainer(
   @transient private val jobContext: JobContext = job
 
   // The following fields are initialized and used on both driver and executor 
side.
-  @transient protected var outputCommitter: FileOutputCommitter = _
+  @transient protected var outputCommitter: OutputCommitter = _
   @transient private var jobId: JobID = _
   @transient private var taskId: TaskID = _
   @transient private var taskAttemptId: TaskAttemptID = _
@@ -282,14 +282,18 @@ private[sql] abstract class BaseWriterContainer(
     initWriters()
   }
 
-  private def newOutputCommitter(context: TaskAttemptContext): 
FileOutputCommitter = {
-    outputFormatClass.newInstance().getOutputCommitter(context) match {
-      case f: FileOutputCommitter => f
-      case f => sys.error(
-        s"FileOutputCommitter or its subclass is expected, but got a 
${f.getClass.getName}.")
+  protected def getWorkPath: String = {
+    outputCommitter match {
+      // FileOutputCommitter writes to a temporary location returned by 
`getWorkPath`.
+      case f: FileOutputCommitter => f.getWorkPath.toString
+      case _ => outputPath
     }
   }
 
+  private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter 
= {
+    outputFormatClass.newInstance().getOutputCommitter(context)
+  }
+
   private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
     this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
     this.taskId = new TaskID(this.jobId, true, splitId)
@@ -339,7 +343,7 @@ private[sql] class DefaultWriterContainer(
 
   override protected def initWriters(): Unit = {
     writer = outputWriterClass.newInstance()
-    writer.init(outputCommitter.getWorkPath.toString, dataSchema, 
taskAttemptContext)
+    writer.init(getWorkPath, dataSchema, taskAttemptContext)
   }
 
   override def outputWriterForRow(row: Row): OutputWriter = writer
@@ -381,7 +385,7 @@ private[sql] class DynamicPartitionWriterContainer(
     }.mkString
 
     outputWriters.getOrElseUpdate(partitionPath, {
-      val path = new Path(outputCommitter.getWorkPath, 
partitionPath.stripPrefix(Path.SEPARATOR))
+      val path = new Path(getWorkPath, 
partitionPath.stripPrefix(Path.SEPARATOR))
       val writer = outputWriterClass.newInstance()
       writer.init(path.toString, dataSchema, taskAttemptContext)
       writer


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to