Repository: spark
Updated Branches:
  refs/heads/branch-2.2 cd51e2c32 -> cfc04e062


[SPARK-22217][SQL] ParquetFileFormat to support arbitrary OutputCommitters

## What changes were proposed in this pull request?

`ParquetFileFormat` to relax its requirement of output committer class from 
`org.apache.parquet.hadoop.ParquetOutputCommitter` or subclass thereof (and so 
implicitly Hadoop `FileOutputCommitter`) to any committer implementing 
`org.apache.hadoop.mapreduce.OutputCommitter`

This enables output committers which don't write to the filesystem the way 
`FileOutputCommitter` does to save parquet data from a dataframe: at present 
you cannot do this.

Before a committer which isn't a subclass of `ParquetOutputCommitter`, it 
checks to see if the context has requested summary metadata by setting 
`parquet.enable.summary-metadata`. If true, and the committer class isn't a 
parquet committer, it raises a RuntimeException with an error message.

(It could downgrade, of course, but raising an exception makes it clear there 
won't be an summary. It also makes the behaviour testable.)

Note that `SQLConf` already states that any `OutputCommitter` can be used, but 
that typically it's a subclass of ParquetOutputCommitter. That's not currently 
true. This patch will make the code consistent with the docs, adding tests to 
verify,

## How was this patch tested?

The patch includes a test suite, `ParquetCommitterSuite`, with a new committer, 
`MarkingFileOutputCommitter` which extends `FileOutputCommitter` and writes a 
marker file in the destination directory. The presence of the marker file can 
be used to verify the new committer was used. The tests then try the 
combinations of Parquet committer summary/no-summary and marking committer 
summary/no-summary.

| committer | summary | outcome |
|-----------|---------|---------|
| parquet   | true    | success |
| parquet   | false   | success |
| marking   | false   | success with marker |
| marking   | true    | exception |

All tests are happy.

Author: Steve Loughran <[email protected]>

Closes #19448 from steveloughran/cloud/SPARK-22217-committer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cfc04e06
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cfc04e06
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cfc04e06

Branch: refs/heads/branch-2.2
Commit: cfc04e062b4f3b14d5b846f06c9c85bb2e21cf0a
Parents: cd51e2c
Author: Steve Loughran <[email protected]>
Authored: Fri Oct 13 08:40:26 2017 +0900
Committer: hyukjinkwon <[email protected]>
Committed: Fri Oct 13 10:00:33 2017 +0900

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |   5 +-
 .../datasources/parquet/ParquetFileFormat.scala |  12 +-
 .../parquet/ParquetCommitterSuite.scala         | 152 +++++++++++++++++++
 3 files changed, 165 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cfc04e06/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 79398fb..4c29f8e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -268,8 +268,9 @@ object SQLConf {
 
   val PARQUET_OUTPUT_COMMITTER_CLASS = 
buildConf("spark.sql.parquet.output.committer.class")
     .doc("The output committer class used by Parquet. The specified class 
needs to be a " +
-      "subclass of org.apache.hadoop.mapreduce.OutputCommitter.  Typically, 
it's also a subclass " +
-      "of org.apache.parquet.hadoop.ParquetOutputCommitter.")
+      "subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, 
it's also a subclass " +
+      "of org.apache.parquet.hadoop.ParquetOutputCommitter. If it is not, then 
metadata summaries" +
+      "will never be created, irrespective of the value of 
parquet.enable.summary-metadata")
     .internal()
     .stringConf
     .createWithDefault("org.apache.parquet.hadoop.ParquetOutputCommitter")

http://git-wip-us.apache.org/repos/asf/spark/blob/cfc04e06/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 87fbf8b..1d60495 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -85,7 +85,7 @@ class ParquetFileFormat
       conf.getClass(
         SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
         classOf[ParquetOutputCommitter],
-        classOf[ParquetOutputCommitter])
+        classOf[OutputCommitter])
 
     if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
       logInfo("Using default output committer for Parquet: " +
@@ -97,7 +97,7 @@ class ParquetFileFormat
     conf.setClass(
       SQLConf.OUTPUT_COMMITTER_CLASS.key,
       committerClass,
-      classOf[ParquetOutputCommitter])
+      classOf[OutputCommitter])
 
     // We're not really using `ParquetOutputFormat[Row]` for writing data 
here, because we override
     // it in `ParquetOutputWriter` to support appending and dynamic 
partitioning.  The reason why
@@ -137,6 +137,14 @@ class ParquetFileFormat
       conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
     }
 
+    if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
+      && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) {
+      // output summary is requested, but the class is not a Parquet Committer
+      logWarning(s"Committer $committerClass is not a ParquetOutputCommitter 
and cannot" +
+        s" create job summaries. " +
+        s"Set Parquet option ${ParquetOutputFormat.ENABLE_JOB_SUMMARY} to 
false.")
+    }
+
     new OutputWriterFactory {
       // This OutputWriterFactory instance is deserialized when writing 
Parquet files on the
       // executor side without constructing or deserializing 
ParquetFileFormat. Therefore, we hold

http://git-wip-us.apache.org/repos/asf/spark/blob/cfc04e06/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala
new file mode 100644
index 0000000..caa4f6d
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.FileNotFoundException
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat}
+
+import org.apache.spark.{LocalSparkContext, SparkFunSuite}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+/**
+ * Test logic related to choice of output committers.
+ */
+class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils
+  with LocalSparkContext {
+
+  private val PARQUET_COMMITTER = 
classOf[ParquetOutputCommitter].getCanonicalName
+
+  protected var spark: SparkSession = _
+
+  /**
+   * Create a new [[SparkSession]] running in local-cluster mode with unsafe 
and codegen enabled.
+   */
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    spark = SparkSession.builder()
+      .master("local-cluster[2,1,1024]")
+      .appName("testing")
+      .getOrCreate()
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      if (spark != null) {
+        spark.stop()
+        spark = null
+      }
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  test("alternative output committer, merge schema") {
+    writeDataFrame(MarkingFileOutput.COMMITTER, summary = true, check = true)
+  }
+
+  test("alternative output committer, no merge schema") {
+    writeDataFrame(MarkingFileOutput.COMMITTER, summary = false, check = true)
+  }
+
+  test("Parquet output committer, merge schema") {
+    writeDataFrame(PARQUET_COMMITTER, summary = true, check = false)
+  }
+
+  test("Parquet output committer, no merge schema") {
+    writeDataFrame(PARQUET_COMMITTER, summary = false, check = false)
+  }
+
+  /**
+   * Write a trivial dataframe as Parquet, using the given committer
+   * and job summary option.
+   * @param committer committer to use
+   * @param summary create a job summary
+   * @param check look for a marker file
+   * @return if a marker file was sought, it's file status.
+   */
+  private def writeDataFrame(
+      committer: String,
+      summary: Boolean,
+      check: Boolean): Option[FileStatus] = {
+    var result: Option[FileStatus] = None
+    withSQLConf(
+      SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> committer,
+      ParquetOutputFormat.ENABLE_JOB_SUMMARY -> summary.toString) {
+        withTempPath { dest =>
+          val df = spark.createDataFrame(Seq((1, "4"), (2, "2")))
+          val destPath = new Path(dest.toURI)
+          df.write.format("parquet").save(destPath.toString)
+          if (check) {
+            result = Some(MarkingFileOutput.checkMarker(
+              destPath,
+              spark.sparkContext.hadoopConfiguration))
+          }
+        }
+    }
+    result
+  }
+}
+
+/**
+ * A file output committer which explicitly touches a file "marker"; this
+ * is how tests can verify that this committer was used.
+ * @param outputPath output path
+ * @param context task context
+ */
+private class MarkingFileOutputCommitter(
+    outputPath: Path,
+    context: TaskAttemptContext) extends FileOutputCommitter(outputPath, 
context) {
+
+  override def commitJob(context: JobContext): Unit = {
+    super.commitJob(context)
+    MarkingFileOutput.touch(outputPath, context.getConfiguration)
+  }
+}
+
+private object MarkingFileOutput {
+
+  val COMMITTER = classOf[MarkingFileOutputCommitter].getCanonicalName
+
+  /**
+   * Touch the marker.
+   * @param outputPath destination directory
+   * @param conf configuration to create the FS with
+   */
+  def touch(outputPath: Path, conf: Configuration): Unit = {
+    outputPath.getFileSystem(conf).create(new Path(outputPath, 
"marker")).close()
+  }
+
+  /**
+   * Get the file status of the marker
+   *
+   * @param outputPath destination directory
+   * @param conf configuration to create the FS with
+   * @return the status of the marker
+   * @throws FileNotFoundException if the marker is absent
+   */
+  def checkMarker(outputPath: Path, conf: Configuration): FileStatus = {
+    outputPath.getFileSystem(conf).getFileStatus(new Path(outputPath, 
"marker"))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to