This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ac6163  [SPARK-23977][SQL] Support High Performance S3A committers 
[test-hadoop3.2]
2ac6163 is described below

commit 2ac6163a5d04027ef4dbdf7d031cddf9415ed25e
Author: Steve Loughran <ste...@cloudera.com>
AuthorDate: Thu Aug 15 09:38:31 2019 -0700

    [SPARK-23977][SQL] Support High Performance S3A committers [test-hadoop3.2]
    
    This patch adds the binding classes to enable spark to switch dataframe 
output to using the S3A zero-rename committers shipping in Hadoop 3.1+. It adds 
a source tree into the hadoop-cloud-storage module which only compiles with the 
hadoop-3.2 profile, and contains a binding for normal output and a specific 
bridge class for Parquet (as the parquet output format requires a subclass of 
`ParquetOutputCommitter`.
    
    Commit algorithms are a critical topic. There's no formal proof of 
correctness, but the algorithms are documented an analysed in [A Zero Rename 
Committer](https://github.com/steveloughran/zero-rename-committer/releases). 
This also reviews the classic v1 and v2 algorithms, IBM's swift committer and 
the one from EMRFS which they admit was based on the concepts implemented here.
    
    Test-wise
    
    * There's a public set of scala test suites [on 
github](https://github.com/hortonworks-spark/cloud-integration)
    * We have run integration tests against Spark on Yarn clusters.
    * This code has been shipping for ~12 months in HDP-3.x.
    
    Closes #24970 from steveloughran/cloud/SPARK-23977-s3a-committer.
    
    Authored-by: Steve Loughran <ste...@cloudera.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../spark/internal/io/FileCommitProtocol.scala     |   6 +-
 .../io/HadoopMapReduceCommitProtocol.scala         |  54 +++++--
 docs/cloud-integration.md                          |  70 ++++++++-
 hadoop-cloud/pom.xml                               |  39 +++++
 .../io/cloud/BindingParquetOutputCommitter.scala   | 122 +++++++++++++++
 .../io/cloud/PathOutputCommitProtocol.scala        | 166 +++++++++++++++++++++
 .../internal/io/cloud/CommitterBindingSuite.scala  | 146 ++++++++++++++++++
 .../io/cloud/StubPathOutputCommitter.scala         | 120 +++++++++++++++
 hadoop-cloud/src/test/resources/log4j.properties   |  36 +++++
 9 files changed, 741 insertions(+), 18 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala 
b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
index 8540938..0746e43 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
@@ -42,7 +42,7 @@ import org.apache.spark.util.Utils
  * 3. When all necessary tasks completed successfully, the driver calls 
commitJob. If the job
  *    failed to execute (e.g. too many failed tasks), the job should call 
abortJob.
  */
-abstract class FileCommitProtocol {
+abstract class FileCommitProtocol extends Logging {
   import FileCommitProtocol._
 
   /**
@@ -129,7 +129,9 @@ abstract class FileCommitProtocol {
    * before the job has finished. These same task commit messages will be 
passed to commitJob()
    * if the entire job succeeds.
    */
-  def onTaskCommit(taskCommit: TaskCommitMessage): Unit = {}
+  def onTaskCommit(taskCommit: TaskCommitMessage): Unit = {
+    logDebug(s"onTaskCommit($taskCommit)")
+  }
 }
 
 
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index 7477e03..11ce608 100644
--- 
a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.internal.io
 
+import java.io.IOException
 import java.util.{Date, UUID}
 
 import scala.collection.mutable
@@ -136,7 +137,7 @@ class HadoopMapReduceCommitProtocol(
     tmpOutputPath
   }
 
-  private def getFilename(taskContext: TaskAttemptContext, ext: String): 
String = {
+  protected def getFilename(taskContext: TaskAttemptContext, ext: String): 
String = {
     // The file name looks like 
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
     // Note that %05d does not truncate the split number, so if we have more 
than 100000 tasks,
     // the file name is fine and won't overflow.
@@ -205,11 +206,28 @@ class HadoopMapReduceCommitProtocol(
     }
   }
 
+  /**
+   * Abort the job; log and ignore any IO exception thrown.
+   * This is invariably invoked in an exception handler; raising
+   * an exception here will lose the root cause of the failure.
+   *
+   * @param jobContext job context
+   */
   override def abortJob(jobContext: JobContext): Unit = {
-    committer.abortJob(jobContext, JobStatus.State.FAILED)
-    if (hasValidPath) {
-      val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
-      fs.delete(stagingDir, true)
+    try {
+      committer.abortJob(jobContext, JobStatus.State.FAILED)
+    } catch {
+      case e: IOException =>
+        logWarning(s"Exception while aborting ${jobContext.getJobID}", e)
+    }
+    try {
+      if (hasValidPath) {
+        val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
+        fs.delete(stagingDir, true)
+      }
+    } catch {
+      case e: IOException =>
+        logWarning(s"Exception while aborting ${jobContext.getJobID}", e)
     }
   }
 
@@ -222,17 +240,35 @@ class HadoopMapReduceCommitProtocol(
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage 
= {
     val attemptId = taskContext.getTaskAttemptID
+    logTrace(s"Commit task ${attemptId}")
     SparkHadoopMapRedUtil.commitTask(
       committer, taskContext, attemptId.getJobID.getId, 
attemptId.getTaskID.getId)
     new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
   }
 
+  /**
+   * Abort the task; log and ignore any failure thrown.
+   * This is invariably invoked in an exception handler; raising
+   * an exception here will lose the root cause of the failure.
+   *
+   * @param taskContext context
+   */
   override def abortTask(taskContext: TaskAttemptContext): Unit = {
-    committer.abortTask(taskContext)
+    try {
+      committer.abortTask(taskContext)
+    } catch {
+      case e: IOException =>
+        logWarning(s"Exception while aborting 
${taskContext.getTaskAttemptID}", e)
+    }
     // best effort cleanup of other staged files
-    for ((src, _) <- addedAbsPathFiles) {
-      val tmp = new Path(src)
-      tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
+    try {
+      for ((src, _) <- addedAbsPathFiles) {
+        val tmp = new Path(src)
+        tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
+      }
+    } catch {
+      case e: IOException =>
+        logWarning(s"Exception while aborting 
${taskContext.getTaskAttemptID}", e)
     }
   }
 }
diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md
index b64ffe5..a8d40fe 100644
--- a/docs/cloud-integration.md
+++ b/docs/cloud-integration.md
@@ -125,7 +125,7 @@ consult the relevant documentation.
 ### Recommended settings for writing to object stores
 
 For object stores whose consistency model means that rename-based commits are 
safe
-use the `FileOutputCommitter` v2 algorithm for performance:
+use the `FileOutputCommitter` v2 algorithm for performance; v1 for safety.
 
 ```
 spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
@@ -143,8 +143,30 @@ job failure:
 spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true
 ```
 
+The original v1 commit algorithm renames the output of successful tasks
+to a job attempt directory, and then renames all the files in that directory
+into the final destination during the job commit phase:
+
+```
+spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1
+```
+
+The slow performance of mimicked renames on Amazon S3 makes this algorithm
+very, very slow. The recommended solution to this is switch to an S3 "Zero 
Rename"
+committer (see below).
+
+For reference, here are the performance and safety characteristics of
+different stores and connectors when renaming directories:
+
+| Store         | Connector | Directory Rename Safety | Rename Performance |
+|---------------|-----------|-------------------------|--------------------|
+| Amazon S3     | s3a       | Unsafe                  | O(data) |
+| Azure Storage | wasb      | Safe                    | O(files) |
+| Azure Datalake Gen 2 | abfs | Safe                  | O(1) |
+| Google GCS    | gs        | Safe                    | O(1) |
+
 As storing temporary files can run up charges; delete
-directories called `"_temporary"` on a regular basis to avoid this.
+directories called `"_temporary"` on a regular basis.
 
 ### Parquet I/O Settings
 
@@ -190,15 +212,49 @@ while they are still being written. Applications can 
write straight to the monit
 atomic `rename()` operation.
 Otherwise the checkpointing may be slow and potentially unreliable.
 
+## Committing work into cloud storage safely and fast.
+
+As covered earlier, commit-by-rename is dangerous on any object store which
+exhibits eventual consistency (example: S3), and often slower than classic
+filesystem renames.
+
+Some object store connectors provide custom committers to commit tasks and
+jobs without using rename. In versions of Spark built with Hadoop 3.1 or later,
+the S3A connector for AWS S3 is such a committer.
+
+Instead of writing data to a temporary directory on the store for renaming,
+these committers write the files to the final destination, but do not issue
+the final POST command to make a large "multi-part" upload visible. Those
+operations are postponed until the job commit itself. As a result, task and
+job commit are much faster, and task failures do not affect the result. 
+
+To switch to the S3A committers, use a version of Spark was built with Hadoop
+3.1 or later, and switch the committers through the following options.
+
+```
+spark.hadoop.fs.s3a.committer.name directory
+spark.sql.sources.commitProtocolClass 
org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
+spark.sql.parquet.output.committer.class 
org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
+```
+
+It has been tested with the most common formats supported by Spark.
+
+```python
+mydataframe.write.format("parquet").save("s3a://bucket/destination")
+```
+
+More details on these committers can be found in the latest Hadoop 
documentation.
+
 ## Further Reading
 
 Here is the documentation on the standard connectors both from Apache and the 
cloud providers.
 
-* [OpenStack 
Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html). 
Hadoop 2.6+
-* [Azure Blob 
Storage](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html).
 Since Hadoop 2.7
-* [Azure Data 
Lake](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html). 
Since Hadoop 2.8
-* [Amazon S3 via S3A and 
S3N](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html).
 Hadoop 2.6+
+* [OpenStack 
Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html).
+* [Azure Blob Storage and Azure Datalake Gen 
2](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html).
+* [Azure Data Lake Gen 
1](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html).
+* [Hadoop-AWS module (Hadoop 
3.x)](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html).
+* [Amazon S3 via S3A and S3N (Hadoop 
2.x)](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html).
 * [Amazon EMR File System 
(EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). 
From Amazon
 * [Google Cloud Storage Connector for Spark and 
Hadoop](https://cloud.google.com/hadoop/google-cloud-storage-connector). From 
Google
-
+* [The Azure Blob Filesystem driver 
(ABFS)](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-abfs-driver)
 
diff --git a/hadoop-cloud/pom.xml b/hadoop-cloud/pom.xml
index dbf4b98..31c729c 100644
--- a/hadoop-cloud/pom.xml
+++ b/hadoop-cloud/pom.xml
@@ -198,6 +198,45 @@
     -->
     <profile>
       <id>hadoop-3.2</id>
+      <properties>
+        <extra.source.dir>src/hadoop-3/main/scala</extra.source.dir>
+        <extra.testsource.dir>src/hadoop-3/test/scala</extra.testsource.dir>
+      </properties>
+
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>add-scala-sources</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>${extra.source.dir}</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-scala-test-sources</id>
+                <phase>generate-test-sources</phase>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>${extra.testsource.dir}</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
       <dependencies>
         <!--
         There's now a hadoop-cloud-storage which transitively pulls in the 
store JARs,
diff --git 
a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala
 
b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala
new file mode 100644
index 0000000..81a5738
--- /dev/null
+++ 
b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io.cloud
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, 
PathOutputCommitter}
+import org.apache.parquet.hadoop.ParquetOutputCommitter
+
+import org.apache.spark.internal.Logging
+
+/**
+ * This Parquet Committer subclass dynamically binds to the factory-configured
+ * output committer, and is intended to allow callers to use any 
'PathOutputCommitter',
+ * even if not a subclass of 'ParquetOutputCommitter'.
+ *
+ * The Parquet `parquet.enable.summary-metadata` option will only be supported
+ * if the instantiated committer itself supports it.
+ */
+class BindingParquetOutputCommitter(
+    path: Path,
+    context: TaskAttemptContext)
+  extends ParquetOutputCommitter(path, context) with Logging {
+
+  logTrace(s"${this.getClass.getName} binding to configured 
PathOutputCommitter and dest $path")
+
+  private val committer = new BindingPathOutputCommitter(path, context)
+
+  /**
+   * This is the committer ultimately bound to.
+   * @return the committer instantiated by the factory.
+   */
+  private[cloud] def boundCommitter(): PathOutputCommitter = {
+    committer.getCommitter
+  }
+
+  override def getWorkPath(): Path = {
+    committer.getWorkPath()
+  }
+
+  override def setupTask(taskAttemptContext: TaskAttemptContext): Unit = {
+    committer.setupTask(taskAttemptContext)
+  }
+
+  override def commitTask(taskAttemptContext: TaskAttemptContext): Unit = {
+    committer.commitTask(taskAttemptContext)
+  }
+
+  override def abortTask(taskAttemptContext: TaskAttemptContext): Unit = {
+    committer.abortTask(taskAttemptContext)
+  }
+
+  override def setupJob(jobContext: JobContext): Unit = {
+    committer.setupJob(jobContext)
+  }
+
+  override def needsTaskCommit(taskAttemptContext: TaskAttemptContext): 
Boolean = {
+    committer.needsTaskCommit(taskAttemptContext)
+  }
+
+  override def cleanupJob(jobContext: JobContext): Unit = {
+    committer.cleanupJob(jobContext)
+  }
+
+  override def isCommitJobRepeatable(jobContext: JobContext): Boolean = {
+    committer.isCommitJobRepeatable(jobContext)
+  }
+
+  override def commitJob(jobContext: JobContext): Unit = {
+    committer.commitJob(jobContext)
+  }
+
+  override def recoverTask(taskAttemptContext: TaskAttemptContext): Unit = {
+    committer.recoverTask(taskAttemptContext)
+  }
+
+  /**
+   * Abort the job; log and ignore any IO exception thrown.
+   * This is invariably invoked in an exception handler; raising
+   * an exception here will lose the root cause of the failure.
+   *
+   * @param jobContext job context
+   * @param state final state of the job
+   */
+  override def abortJob(jobContext: JobContext, state: JobStatus.State): Unit 
= {
+    try {
+      committer.abortJob(jobContext, state)
+    } catch {
+      case e: IOException =>
+        // swallow exception to avoid problems when called within exception
+        // handlers
+        logWarning("Abort job failed", e)
+    }
+  }
+
+  override def isRecoverySupported: Boolean = {
+    committer.isRecoverySupported()
+  }
+
+  override def isRecoverySupported(jobContext: JobContext): Boolean = {
+    committer.isRecoverySupported(jobContext)
+  }
+
+  override def toString: String = s"BindingParquetOutputCommitter($committer)"
+}
diff --git 
a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
 
b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
new file mode 100644
index 0000000..2ca5087
--- /dev/null
+++ 
b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io.cloud
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, 
PathOutputCommitter, PathOutputCommitterFactory}
+
+import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
+
+/**
+ * Spark Commit protocol for Path Output Committers.
+ * This committer will work with the `FileOutputCommitter` and subclasses.
+ * All implementations *must* be serializable.
+ *
+ * Rather than ask the `FileOutputFormat` for a committer, it uses the
+ * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` factory
+ * API to create the committer.
+ *
+ * In `setupCommitter` the factory is identified and instantiated;
+ * this factory then creates the actual committer implementation.
+ *
+ * @constructor Instantiate. dynamic partition overwrite is not supported,
+ *              so that committers for stores which do not support rename
+ *              will not get confused.
+ * @param jobId                     job
+ * @param dest                      destination
+ * @param dynamicPartitionOverwrite does the caller want support for dynamic
+ *                                  partition overwrite. If so, it will be
+ *                                  refused.
+ */
+class PathOutputCommitProtocol(
+    jobId: String,
+    dest: String,
+    dynamicPartitionOverwrite: Boolean = false)
+  extends HadoopMapReduceCommitProtocol(jobId, dest, false) with Serializable {
+
+  if (dynamicPartitionOverwrite) {
+    // until there's explicit extensions to the PathOutputCommitProtocols
+    // to support the spark mechanism, it's left to the individual committer
+    // choice to handle partitioning.
+    throw new IOException(PathOutputCommitProtocol.UNSUPPORTED)
+  }
+
+  /** The committer created. */
+  @transient private var committer: PathOutputCommitter = _
+
+  require(dest != null, "Null destination specified")
+
+  private[cloud] val destination: String = dest
+
+  /** The destination path. This is serializable in Hadoop 3. */
+  private[cloud] val destPath: Path = new Path(destination)
+
+  logTrace(s"Instantiated committer with job ID=$jobId;" +
+    s" destination=$destPath;" +
+    s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite")
+
+  import PathOutputCommitProtocol._
+
+  /**
+   * Set up the committer.
+   * This creates it by talking directly to the Hadoop factories, instead
+   * of the V1 `mapred.FileOutputFormat` methods.
+   * @param context task attempt
+   * @return the committer to use. This will always be a subclass of
+   *         `PathOutputCommitter`.
+   */
+  override protected def setupCommitter(context: TaskAttemptContext): 
PathOutputCommitter = {
+    logTrace(s"Setting up committer for path $destination")
+    committer = PathOutputCommitterFactory.createCommitter(destPath, context)
+
+    // Special feature to force out the FileOutputCommitter, so as to guarantee
+    // that the binding is working properly.
+    val rejectFileOutput = context.getConfiguration
+      .getBoolean(REJECT_FILE_OUTPUT, REJECT_FILE_OUTPUT_DEFVAL)
+    if (rejectFileOutput && committer.isInstanceOf[FileOutputCommitter]) {
+      // the output format returned a file output format committer, which
+      // is exactly what we do not want. So switch back to the factory.
+      val factory = PathOutputCommitterFactory.getCommitterFactory(
+        destPath,
+        context.getConfiguration)
+      logTrace(s"Using committer factory $factory")
+      committer = factory.createOutputCommitter(destPath, context)
+    }
+
+    logTrace(s"Using committer ${committer.getClass}")
+    logTrace(s"Committer details: $committer")
+    if (committer.isInstanceOf[FileOutputCommitter]) {
+      require(!rejectFileOutput,
+        s"Committer created is the FileOutputCommitter $committer")
+
+      if (committer.isCommitJobRepeatable(context)) {
+        // If FileOutputCommitter says its job commit is repeatable, it means
+        // it is using the v2 algorithm, which is not safe for task commit
+        // failures. Warn
+        logTrace(s"Committer $committer may not be tolerant of task commit 
failures")
+      }
+    }
+    committer
+  }
+
+  /**
+   * Create a temporary file for a task.
+   *
+   * @param taskContext task context
+   * @param dir         optional subdirectory
+   * @param ext         file extension
+   * @return a path as a string
+   */
+  override def newTaskTempFile(
+      taskContext: TaskAttemptContext,
+      dir: Option[String],
+      ext: String): String = {
+
+    val workDir = committer.getWorkPath
+    val parent = dir.map {
+      d => new Path(workDir, d)
+    }.getOrElse(workDir)
+    val file = new Path(parent, getFilename(taskContext, ext))
+    logTrace(s"Creating task file $file for dir $dir and ext $ext")
+    file.toString
+  }
+
+}
+
+object PathOutputCommitProtocol {
+
+  /**
+   * Hadoop configuration option.
+   * Fail fast if the committer is using the path output protocol.
+   * This option can be used to catch configuration issues early.
+   *
+   * It's mostly relevant when testing/diagnostics, as it can be used to
+   * enforce that schema-specific options are triggering a switch
+   * to a new committer.
+   */
+  val REJECT_FILE_OUTPUT = "pathoutputcommit.reject.fileoutput"
+
+  /**
+   * Default behavior: accept the file output.
+   */
+  val REJECT_FILE_OUTPUT_DEFVAL = false
+
+  /** Error string for tests. */
+  private[cloud] val UNSUPPORTED: String = "PathOutputCommitProtocol does not 
support" +
+    " dynamicPartitionOverwrite"
+
+}
diff --git 
a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala
 
b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala
new file mode 100644
index 0000000..546f542
--- /dev/null
+++ 
b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io.cloud
+
+import java.io.{File, FileInputStream, FileOutputStream, IOException, 
ObjectInputStream, ObjectOutputStream}
+import java.lang.reflect.InvocationTargetException
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.IOUtils
+import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptID}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.internal.io.FileCommitProtocol
+
+class CommitterBindingSuite extends SparkFunSuite {
+
+  private val jobId = "2007071202143_0101"
+  private val taskAttempt0 = "attempt_" + jobId + "_m_000000_0"
+  private val taskAttemptId0 = TaskAttemptID.forName(taskAttempt0)
+
+  /**
+   * The classname to use when referring to the path output committer.
+   */
+  private val pathCommitProtocolClassname: String = 
classOf[PathOutputCommitProtocol].getName
+
+  /** hadoop-mapreduce option to enable the _SUCCESS marker. */
+  private val successMarker = 
"mapreduce.fileoutputcommitter.marksuccessfuljobs"
+
+  /**
+   * Does the
+   * [[BindingParquetOutputCommitter]] committer bind to the schema-specific
+   * committer declared for the destination path? And that lifecycle events
+   * are correctly propagated?
+   */
+  test("BindingParquetOutputCommitter binds to the inner committer") {
+    val path = new Path("http://example/data";)
+    val job = newJob(path)
+    val conf = job.getConfiguration
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
+    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
+
+    StubPathOutputCommitterFactory.bind(conf, "http")
+    val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
+    val parquet = new BindingParquetOutputCommitter(path, tContext)
+    val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitter]
+    parquet.setupJob(tContext)
+    assert(inner.jobSetup, s"$inner job not setup")
+    parquet.setupTask(tContext)
+    assert(inner.taskSetup, s"$inner task not setup")
+    assert(parquet.needsTaskCommit(tContext), "needsTaskCommit false")
+    inner.needsTaskCommit = false
+    assert(!parquet.needsTaskCommit(tContext), "needsTaskCommit true")
+    parquet.commitTask(tContext)
+    assert(inner.taskCommitted, s"$inner task not committed")
+    parquet.abortTask(tContext)
+    assert(inner.taskAborted, s"$inner task not aborted")
+    parquet.commitJob(tContext)
+    assert(inner.jobCommitted, s"$inner job not committed")
+    parquet.abortJob(tContext, JobStatus.State.RUNNING)
+    assert(inner.jobAborted, s"$inner job not aborted")
+  }
+
+  /**
+   * Create a a new job. Sets the task attempt ID.
+   *
+   * @return the new job
+   */
+  def newJob(outDir: Path): Job = {
+    val job = Job.getInstance(new Configuration())
+    val conf = job.getConfiguration
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
+    conf.setBoolean(successMarker, true)
+    FileOutputFormat.setOutputPath(job, outDir)
+    job
+  }
+
+  test("committer protocol can be serialized and deserialized") {
+    val tempDir = File.createTempFile("ser", ".bin")
+
+    tempDir.delete()
+    val committer = new PathOutputCommitProtocol(jobId, 
tempDir.toURI.toString, false)
+
+    val serData = File.createTempFile("ser", ".bin")
+    var out: ObjectOutputStream = null
+    var in: ObjectInputStream = null
+
+    try {
+      out = new ObjectOutputStream(new FileOutputStream(serData))
+      out.writeObject(committer)
+      out.close
+      in = new ObjectInputStream(new FileInputStream(serData))
+      val result = in.readObject()
+
+      val committer2 = result.asInstanceOf[PathOutputCommitProtocol]
+
+      assert(committer.destination === committer2.destination,
+        "destination mismatch on round trip")
+      assert(committer.destPath === committer2.destPath,
+        "destPath mismatch on round trip")
+    } finally {
+      IOUtils.closeStreams(out, in)
+      serData.delete()
+    }
+  }
+
+  test("local filesystem instantiation") {
+    val instance = FileCommitProtocol.instantiate(
+      pathCommitProtocolClassname,
+      jobId, "file:///tmp", false)
+
+    val protocol = instance.asInstanceOf[PathOutputCommitProtocol]
+    assert("file:///tmp" === protocol.destination)
+  }
+
+  test("reject dynamic partitioning") {
+    val cause = intercept[InvocationTargetException] {
+      FileCommitProtocol.instantiate(
+        pathCommitProtocolClassname,
+        jobId, "file:///tmp", true)
+    }.getCause
+    if (cause == null || !cause.isInstanceOf[IOException]
+        || !cause.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) {
+      throw cause
+    }
+  }
+
+}
+
diff --git 
a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala
 
b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala
new file mode 100644
index 0000000..88a36d2
--- /dev/null
+++ 
b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io.cloud
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.{PathOutputCommitter, 
PathOutputCommitterFactory}
+
+/**
+ * A local path output committer which tracks its state, for use in tests.
+ * @param outputPath final destination.
+ * @param workPath work path
+ * @param context task/job attempt.
+ */
+class StubPathOutputCommitter(
+    outputPath: Path,
+    workPath: Path,
+    context: TaskAttemptContext) extends PathOutputCommitter(workPath, 
context) {
+
+  var jobSetup: Boolean = false
+  var jobCommitted: Boolean = false
+  var jobAborted: Boolean = false
+
+  var taskSetup: Boolean = false
+  var taskCommitted: Boolean = false
+  var taskAborted: Boolean = false
+  var needsTaskCommit: Boolean = true
+
+  override def getOutputPath: Path = outputPath
+
+  override def getWorkPath: Path = {
+    workPath
+  }
+
+  override def setupTask(taskAttemptContext: TaskAttemptContext): Unit = {
+    taskSetup = true
+  }
+
+  override def abortTask(taskAttemptContext: TaskAttemptContext): Unit = {
+    taskAborted = true
+  }
+
+  override def commitTask(taskAttemptContext: TaskAttemptContext): Unit = {
+    taskCommitted = true
+  }
+
+  override def setupJob(jobContext: JobContext): Unit = {
+    jobSetup = true
+  }
+
+  override def commitJob(jobContext: JobContext): Unit = {
+    jobCommitted = true
+  }
+
+  override def abortJob(
+      jobContext: JobContext,
+      state: JobStatus.State): Unit = {
+    jobAborted = true
+  }
+
+  override def needsTaskCommit(taskAttemptContext: TaskAttemptContext): 
Boolean = {
+    needsTaskCommit
+  }
+
+  override def toString(): String = 
s"StubPathOutputCommitter(setup=$jobSetup," +
+    s" committed=$jobCommitted, aborted=$jobAborted)"
+}
+
+class StubPathOutputCommitterFactory extends PathOutputCommitterFactory {
+
+  override def createOutputCommitter(
+      outputPath: Path,
+      context: TaskAttemptContext): PathOutputCommitter = {
+    new StubPathOutputCommitter(outputPath, workPath(outputPath), context)
+  }
+
+  private def workPath(out: Path): Path = new Path(out,
+    StubPathOutputCommitterFactory.TEMP_DIR_NAME)
+}
+
+object StubPathOutputCommitterFactory {
+
+  /**
+   * This is the "Pending" directory of the FileOutputCommitter;
+   * data written here is, in that algorithm, renamed into place.
+   */
+  val TEMP_DIR_NAME = "_temporary"
+
+  /**
+   * Scheme prefix for per-filesystem scheme committers.
+   */
+  val OUTPUTCOMMITTER_FACTORY_SCHEME = 
"mapreduce.outputcommitter.factory.scheme"
+
+  /**
+   * Given a hadoop configuration, set up the factory binding for the scheme.
+   * @param conf config to patch
+   * @param scheme filesystem scheme.
+   */
+  def bind(conf: Configuration, scheme: String): Unit = {
+    val key = OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme
+    conf.set(key, classOf[StubPathOutputCommitterFactory].getName())
+  }
+
+}
diff --git a/hadoop-cloud/src/test/resources/log4j.properties 
b/hadoop-cloud/src/test/resources/log4j.properties
new file mode 100644
index 0000000..fb9d985
--- /dev/null
+++ b/hadoop-cloud/src/test/resources/log4j.properties
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+test.appender=file
+log4j.rootCategory=INFO, ${test.appender}
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p 
%c{1}: %m%n
+
+# Tests that launch java subprocesses can set the "test.appender" system 
property to
+# "console" to avoid having the child process's logs overwrite the unit test's
+# log file.
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark_project.jetty=WARN


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to