Repository: spark
Updated Branches:
  refs/heads/master 02218c4c7 -> 9104add4c


[SPARK-22217][SQL] ParquetFileFormat to support arbitrary OutputCommitters

## What changes were proposed in this pull request?

`ParquetFileFormat` to relax its requirement of output committer class from 
`org.apache.parquet.hadoop.ParquetOutputCommitter` or subclass thereof (and so 
implicitly Hadoop `FileOutputCommitter`) to any committer implementing 
`org.apache.hadoop.mapreduce.OutputCommitter`

This enables output committers which don't write to the filesystem the way 
`FileOutputCommitter` does to save parquet data from a dataframe: at present 
you cannot do this.

Before a committer which isn't a subclass of `ParquetOutputCommitter`, it 
checks to see if the context has requested summary metadata by setting 
`parquet.enable.summary-metadata`. If true, and the committer class isn't a 
parquet committer, it raises a RuntimeException with an error message.

(It could downgrade, of course, but raising an exception makes it clear there 
won't be an summary. It also makes the behaviour testable.)

Note that `SQLConf` already states that any `OutputCommitter` can be used, but 
that typically it's a subclass of ParquetOutputCommitter. That's not currently 
true. This patch will make the code consistent with the docs, adding tests to 
verify,

## How was this patch tested?

The patch includes a test suite, `ParquetCommitterSuite`, with a new committer, 
`MarkingFileOutputCommitter` which extends `FileOutputCommitter` and writes a 
marker file in the destination directory. The presence of the marker file can 
be used to verify the new committer was used. The tests then try the 
combinations of Parquet committer summary/no-summary and marking committer 
summary/no-summary.

| committer | summary | outcome |
|-----------|---------|---------|
| parquet   | true    | success |
| parquet   | false   | success |
| marking   | false   | success with marker |
| marking   | true    | exception |

All tests are happy.

Author: Steve Loughran <ste...@hortonworks.com>

Closes #19448 from steveloughran/cloud/SPARK-22217-committer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9104add4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9104add4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9104add4

Branch: refs/heads/master
Commit: 9104add4c7c6b578df15b64a8533a1266f90734e
Parents: 02218c4
Author: Steve Loughran <ste...@hortonworks.com>
Authored: Fri Oct 13 08:40:26 2017 +0900
Committer: hyukjinkwon <gurwls...@gmail.com>
Committed: Fri Oct 13 08:40:26 2017 +0900

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |   5 +-
 .../datasources/parquet/ParquetFileFormat.scala |  12 +-
 .../parquet/ParquetCommitterSuite.scala         | 152 +++++++++++++++++++
 3 files changed, 165 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9104add4/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5832374..618d4a0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -306,8 +306,9 @@ object SQLConf {
 
   val PARQUET_OUTPUT_COMMITTER_CLASS = 
buildConf("spark.sql.parquet.output.committer.class")
     .doc("The output committer class used by Parquet. The specified class 
needs to be a " +
-      "subclass of org.apache.hadoop.mapreduce.OutputCommitter.  Typically, 
it's also a subclass " +
-      "of org.apache.parquet.hadoop.ParquetOutputCommitter.")
+      "subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, 
it's also a subclass " +
+      "of org.apache.parquet.hadoop.ParquetOutputCommitter. If it is not, then 
metadata summaries" +
+      "will never be created, irrespective of the value of 
parquet.enable.summary-metadata")
     .internal()
     .stringConf
     .createWithDefault("org.apache.parquet.hadoop.ParquetOutputCommitter")

http://git-wip-us.apache.org/repos/asf/spark/blob/9104add4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index e1e7405..c1535ba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -86,7 +86,7 @@ class ParquetFileFormat
       conf.getClass(
         SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
         classOf[ParquetOutputCommitter],
-        classOf[ParquetOutputCommitter])
+        classOf[OutputCommitter])
 
     if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
       logInfo("Using default output committer for Parquet: " +
@@ -98,7 +98,7 @@ class ParquetFileFormat
     conf.setClass(
       SQLConf.OUTPUT_COMMITTER_CLASS.key,
       committerClass,
-      classOf[ParquetOutputCommitter])
+      classOf[OutputCommitter])
 
     // We're not really using `ParquetOutputFormat[Row]` for writing data 
here, because we override
     // it in `ParquetOutputWriter` to support appending and dynamic 
partitioning.  The reason why
@@ -138,6 +138,14 @@ class ParquetFileFormat
       conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
     }
 
+    if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
+      && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) {
+      // output summary is requested, but the class is not a Parquet Committer
+      logWarning(s"Committer $committerClass is not a ParquetOutputCommitter 
and cannot" +
+        s" create job summaries. " +
+        s"Set Parquet option ${ParquetOutputFormat.ENABLE_JOB_SUMMARY} to 
false.")
+    }
+
     new OutputWriterFactory {
       // This OutputWriterFactory instance is deserialized when writing 
Parquet files on the
       // executor side without constructing or deserializing 
ParquetFileFormat. Therefore, we hold

http://git-wip-us.apache.org/repos/asf/spark/blob/9104add4/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala
new file mode 100644
index 0000000..caa4f6d
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.FileNotFoundException
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat}
+
+import org.apache.spark.{LocalSparkContext, SparkFunSuite}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+/**
+ * Test logic related to choice of output committers.
+ */
+class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils
+  with LocalSparkContext {
+
+  private val PARQUET_COMMITTER = 
classOf[ParquetOutputCommitter].getCanonicalName
+
+  protected var spark: SparkSession = _
+
+  /**
+   * Create a new [[SparkSession]] running in local-cluster mode with unsafe 
and codegen enabled.
+   */
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    spark = SparkSession.builder()
+      .master("local-cluster[2,1,1024]")
+      .appName("testing")
+      .getOrCreate()
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      if (spark != null) {
+        spark.stop()
+        spark = null
+      }
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  test("alternative output committer, merge schema") {
+    writeDataFrame(MarkingFileOutput.COMMITTER, summary = true, check = true)
+  }
+
+  test("alternative output committer, no merge schema") {
+    writeDataFrame(MarkingFileOutput.COMMITTER, summary = false, check = true)
+  }
+
+  test("Parquet output committer, merge schema") {
+    writeDataFrame(PARQUET_COMMITTER, summary = true, check = false)
+  }
+
+  test("Parquet output committer, no merge schema") {
+    writeDataFrame(PARQUET_COMMITTER, summary = false, check = false)
+  }
+
+  /**
+   * Write a trivial dataframe as Parquet, using the given committer
+   * and job summary option.
+   * @param committer committer to use
+   * @param summary create a job summary
+   * @param check look for a marker file
+   * @return if a marker file was sought, it's file status.
+   */
+  private def writeDataFrame(
+      committer: String,
+      summary: Boolean,
+      check: Boolean): Option[FileStatus] = {
+    var result: Option[FileStatus] = None
+    withSQLConf(
+      SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> committer,
+      ParquetOutputFormat.ENABLE_JOB_SUMMARY -> summary.toString) {
+        withTempPath { dest =>
+          val df = spark.createDataFrame(Seq((1, "4"), (2, "2")))
+          val destPath = new Path(dest.toURI)
+          df.write.format("parquet").save(destPath.toString)
+          if (check) {
+            result = Some(MarkingFileOutput.checkMarker(
+              destPath,
+              spark.sparkContext.hadoopConfiguration))
+          }
+        }
+    }
+    result
+  }
+}
+
+/**
+ * A file output committer which explicitly touches a file "marker"; this
+ * is how tests can verify that this committer was used.
+ * @param outputPath output path
+ * @param context task context
+ */
+private class MarkingFileOutputCommitter(
+    outputPath: Path,
+    context: TaskAttemptContext) extends FileOutputCommitter(outputPath, 
context) {
+
+  override def commitJob(context: JobContext): Unit = {
+    super.commitJob(context)
+    MarkingFileOutput.touch(outputPath, context.getConfiguration)
+  }
+}
+
+private object MarkingFileOutput {
+
+  val COMMITTER = classOf[MarkingFileOutputCommitter].getCanonicalName
+
+  /**
+   * Touch the marker.
+   * @param outputPath destination directory
+   * @param conf configuration to create the FS with
+   */
+  def touch(outputPath: Path, conf: Configuration): Unit = {
+    outputPath.getFileSystem(conf).create(new Path(outputPath, 
"marker")).close()
+  }
+
+  /**
+   * Get the file status of the marker
+   *
+   * @param outputPath destination directory
+   * @param conf configuration to create the FS with
+   * @return the status of the marker
+   * @throws FileNotFoundException if the marker is absent
+   */
+  def checkMarker(outputPath: Path, conf: Configuration): FileStatus = {
+    outputPath.getFileSystem(conf).getFileStatus(new Path(outputPath, 
"marker"))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to