Repository: spark Updated Branches: refs/heads/master 50c727080 -> 10c546e9d
[SPARK-7599] [SQL] Don't restrict customized output committers to be subclasses of FileOutputCommitter Author: Cheng Lian <l...@databricks.com> Closes #6118 from liancheng/spark-7599 and squashes the following commits: 31e1bd6 [Cheng Lian] Don't restrict customized output committers to be subclasses of FileOutputCommitter Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10c546e9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10c546e9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10c546e9 Branch: refs/heads/master Commit: 10c546e9d42a0f3fbf45c919e74f62c548ca8347 Parents: 50c7270 Author: Cheng Lian <l...@databricks.com> Authored: Wed May 13 07:35:55 2015 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Wed May 13 07:35:55 2015 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/sources/commands.scala | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/10c546e9/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 8372d2c..fe8be5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -244,7 +244,7 @@ private[sql] abstract class BaseWriterContainer( @transient private val jobContext: JobContext = job // The following fields are initialized and used on both driver and executor side. - @transient protected var outputCommitter: FileOutputCommitter = _ + @transient protected var outputCommitter: OutputCommitter = _ @transient private var jobId: JobID = _ @transient private var taskId: TaskID = _ @transient private var taskAttemptId: TaskAttemptID = _ @@ -282,14 +282,18 @@ private[sql] abstract class BaseWriterContainer( initWriters() } - private def newOutputCommitter(context: TaskAttemptContext): FileOutputCommitter = { - outputFormatClass.newInstance().getOutputCommitter(context) match { - case f: FileOutputCommitter => f - case f => sys.error( - s"FileOutputCommitter or its subclass is expected, but got a ${f.getClass.getName}.") + protected def getWorkPath: String = { + outputCommitter match { + // FileOutputCommitter writes to a temporary location returned by `getWorkPath`. + case f: FileOutputCommitter => f.getWorkPath.toString + case _ => outputPath } } + private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = { + outputFormatClass.newInstance().getOutputCommitter(context) + } + private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = { this.jobId = SparkHadoopWriter.createJobID(new Date, jobId) this.taskId = new TaskID(this.jobId, true, splitId) @@ -339,7 +343,7 @@ private[sql] class DefaultWriterContainer( override protected def initWriters(): Unit = { writer = outputWriterClass.newInstance() - writer.init(outputCommitter.getWorkPath.toString, dataSchema, taskAttemptContext) + writer.init(getWorkPath, dataSchema, taskAttemptContext) } override def outputWriterForRow(row: Row): OutputWriter = writer @@ -381,7 +385,7 @@ private[sql] class DynamicPartitionWriterContainer( }.mkString outputWriters.getOrElseUpdate(partitionPath, { - val path = new Path(outputCommitter.getWorkPath, partitionPath.stripPrefix(Path.SEPARATOR)) + val path = new Path(getWorkPath, partitionPath.stripPrefix(Path.SEPARATOR)) val writer = outputWriterClass.newInstance() writer.init(path.toString, dataSchema, taskAttemptContext) writer --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org