Repository: spark
Updated Branches:
  refs/heads/branch-1.4 792ed7a4b -> 0605e0843


[SPARK-8604] [SQL] HadoopFsRelation subclasses should set their output format 
class

`HadoopFsRelation` subclasses, especially `ParquetRelation2` should set its own 
output format class, so that the default output committer can be setup 
correctly when doing appending (where we ignore user defined output committers).

Author: Cheng Lian <l...@databricks.com>

Closes #6998 from liancheng/spark-8604 and squashes the following commits:

9be51d1 [Cheng Lian] Adds more comments
6db1368 [Cheng Lian] HadoopFsRelation subclasses should set their output format 
class

(cherry picked from commit c337844ed7f9b2cb7b217dc935183ef5e1096ca1)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0605e084
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0605e084
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0605e084

Branch: refs/heads/branch-1.4
Commit: 0605e08434e8c1d5f7d6ef766ea6eb94ba6ac92f
Parents: 792ed7a
Author: Cheng Lian <l...@databricks.com>
Authored: Thu Jun 25 00:06:23 2015 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Thu Jun 25 00:07:01 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/newParquet.scala   |  6 ++++++
 .../apache/spark/sql/hive/orc/OrcRelation.scala | 12 ++++++++++-
 .../spark/sql/sources/SimpleTextRelation.scala  |  2 ++
 .../sql/sources/hadoopFsRelationSuites.scala    | 21 ++++++++++++++++++++
 4 files changed, 40 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0605e084/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index abf9614..36b0047 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -195,6 +195,12 @@ private[sql] class ParquetRelation2(
       committerClass,
       classOf[ParquetOutputCommitter])
 
+    // We're not really using `ParquetOutputFormat[Row]` for writing data 
here, because we override
+    // it in `ParquetOutputWriter` to support appending and dynamic 
partitioning.  The reason why
+    // we set it here is to setup the output committer class to 
`ParquetOutputCommitter`, which is
+    // bundled with `ParquetOutputFormat[Row]`.
+    job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
+
     // TODO There's no need to use two kinds of WriteSupport
     // We should unify them. `SpecificMutableRow` can process both atomic 
(primitive) types and
     // complex types.

http://git-wip-us.apache.org/repos/asf/spark/blob/0605e084/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 101f2ff..3713f6f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, 
OrcOutputFormat, OrcSer
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
 import org.apache.hadoop.io.{NullWritable, Writable}
-import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, 
RecordWriter, Reporter}
+import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, 
OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
@@ -193,6 +193,16 @@ private[sql] class OrcRelation(
   }
 
   override def prepareJobForWrite(job: Job): OutputWriterFactory = {
+    job.getConfiguration match {
+      case conf: JobConf =>
+        conf.setOutputFormat(classOf[OrcOutputFormat])
+      case conf =>
+        conf.setClass(
+          "mapred.output.format.class",
+          classOf[OrcOutputFormat],
+          classOf[MapRedOutputFormat[_, _]])
+    }
+
     new OutputWriterFactory {
       override def newInstance(
           path: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/0605e084/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 5d7cd16..e814192 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -119,6 +119,8 @@ class SimpleTextRelation(
   }
 
   override def prepareJobForWrite(job: Job): OutputWriterFactory = new 
OutputWriterFactory {
+    job.setOutputFormatClass(classOf[TextOutputFormat[_, _]])
+
     override def newInstance(
         path: String,
         dataSchema: StructType,

http://git-wip-us.apache.org/repos/asf/spark/blob/0605e084/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 619bfba..5f413a8 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -719,4 +719,25 @@ class ParquetHadoopFsRelationSuite extends 
HadoopFsRelationTest {
       }
     }
   }
+
+  test("SPARK-8604: Parquet data source should write summary file while doing 
appending") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val df = sqlContext.range(0, 5)
+      df.write.mode(SaveMode.Overwrite).parquet(path)
+
+      val summaryPath = new Path(path, "_metadata")
+      val commonSummaryPath = new Path(path, "_common_metadata")
+
+      val fs = summaryPath.getFileSystem(configuration)
+      fs.delete(summaryPath, true)
+      fs.delete(commonSummaryPath, true)
+
+      df.write.mode(SaveMode.Append).parquet(path)
+      checkAnswer(sqlContext.read.parquet(path), df.unionAll(df))
+
+      assert(fs.exists(summaryPath))
+      assert(fs.exists(commonSummaryPath))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to