This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2ac6163 [SPARK-23977][SQL] Support High Performance S3A committers [test-hadoop3.2] 2ac6163 is described below commit 2ac6163a5d04027ef4dbdf7d031cddf9415ed25e Author: Steve Loughran <ste...@cloudera.com> AuthorDate: Thu Aug 15 09:38:31 2019 -0700 [SPARK-23977][SQL] Support High Performance S3A committers [test-hadoop3.2] This patch adds the binding classes to enable spark to switch dataframe output to using the S3A zero-rename committers shipping in Hadoop 3.1+. It adds a source tree into the hadoop-cloud-storage module which only compiles with the hadoop-3.2 profile, and contains a binding for normal output and a specific bridge class for Parquet (as the parquet output format requires a subclass of `ParquetOutputCommitter`. Commit algorithms are a critical topic. There's no formal proof of correctness, but the algorithms are documented an analysed in [A Zero Rename Committer](https://github.com/steveloughran/zero-rename-committer/releases). This also reviews the classic v1 and v2 algorithms, IBM's swift committer and the one from EMRFS which they admit was based on the concepts implemented here. Test-wise * There's a public set of scala test suites [on github](https://github.com/hortonworks-spark/cloud-integration) * We have run integration tests against Spark on Yarn clusters. * This code has been shipping for ~12 months in HDP-3.x. Closes #24970 from steveloughran/cloud/SPARK-23977-s3a-committer. Authored-by: Steve Loughran <ste...@cloudera.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../spark/internal/io/FileCommitProtocol.scala | 6 +- .../io/HadoopMapReduceCommitProtocol.scala | 54 +++++-- docs/cloud-integration.md | 70 ++++++++- hadoop-cloud/pom.xml | 39 +++++ .../io/cloud/BindingParquetOutputCommitter.scala | 122 +++++++++++++++ .../io/cloud/PathOutputCommitProtocol.scala | 166 +++++++++++++++++++++ .../internal/io/cloud/CommitterBindingSuite.scala | 146 ++++++++++++++++++ .../io/cloud/StubPathOutputCommitter.scala | 120 +++++++++++++++ hadoop-cloud/src/test/resources/log4j.properties | 36 +++++ 9 files changed, 741 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 8540938..0746e43 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -42,7 +42,7 @@ import org.apache.spark.util.Utils * 3. When all necessary tasks completed successfully, the driver calls commitJob. If the job * failed to execute (e.g. too many failed tasks), the job should call abortJob. */ -abstract class FileCommitProtocol { +abstract class FileCommitProtocol extends Logging { import FileCommitProtocol._ /** @@ -129,7 +129,9 @@ abstract class FileCommitProtocol { * before the job has finished. These same task commit messages will be passed to commitJob() * if the entire job succeeds. */ - def onTaskCommit(taskCommit: TaskCommitMessage): Unit = {} + def onTaskCommit(taskCommit: TaskCommitMessage): Unit = { + logDebug(s"onTaskCommit($taskCommit)") + } } diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 7477e03..11ce608 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -17,6 +17,7 @@ package org.apache.spark.internal.io +import java.io.IOException import java.util.{Date, UUID} import scala.collection.mutable @@ -136,7 +137,7 @@ class HadoopMapReduceCommitProtocol( tmpOutputPath } - private def getFilename(taskContext: TaskAttemptContext, ext: String): String = { + protected def getFilename(taskContext: TaskAttemptContext, ext: String): String = { // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, // the file name is fine and won't overflow. @@ -205,11 +206,28 @@ class HadoopMapReduceCommitProtocol( } } + /** + * Abort the job; log and ignore any IO exception thrown. + * This is invariably invoked in an exception handler; raising + * an exception here will lose the root cause of the failure. + * + * @param jobContext job context + */ override def abortJob(jobContext: JobContext): Unit = { - committer.abortJob(jobContext, JobStatus.State.FAILED) - if (hasValidPath) { - val fs = stagingDir.getFileSystem(jobContext.getConfiguration) - fs.delete(stagingDir, true) + try { + committer.abortJob(jobContext, JobStatus.State.FAILED) + } catch { + case e: IOException => + logWarning(s"Exception while aborting ${jobContext.getJobID}", e) + } + try { + if (hasValidPath) { + val fs = stagingDir.getFileSystem(jobContext.getConfiguration) + fs.delete(stagingDir, true) + } + } catch { + case e: IOException => + logWarning(s"Exception while aborting ${jobContext.getJobID}", e) } } @@ -222,17 +240,35 @@ class HadoopMapReduceCommitProtocol( override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID + logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet) } + /** + * Abort the task; log and ignore any failure thrown. + * This is invariably invoked in an exception handler; raising + * an exception here will lose the root cause of the failure. + * + * @param taskContext context + */ override def abortTask(taskContext: TaskAttemptContext): Unit = { - committer.abortTask(taskContext) + try { + committer.abortTask(taskContext) + } catch { + case e: IOException => + logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}", e) + } // best effort cleanup of other staged files - for ((src, _) <- addedAbsPathFiles) { - val tmp = new Path(src) - tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false) + try { + for ((src, _) <- addedAbsPathFiles) { + val tmp = new Path(src) + tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false) + } + } catch { + case e: IOException => + logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}", e) } } } diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md index b64ffe5..a8d40fe 100644 --- a/docs/cloud-integration.md +++ b/docs/cloud-integration.md @@ -125,7 +125,7 @@ consult the relevant documentation. ### Recommended settings for writing to object stores For object stores whose consistency model means that rename-based commits are safe -use the `FileOutputCommitter` v2 algorithm for performance: +use the `FileOutputCommitter` v2 algorithm for performance; v1 for safety. ``` spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 @@ -143,8 +143,30 @@ job failure: spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true ``` +The original v1 commit algorithm renames the output of successful tasks +to a job attempt directory, and then renames all the files in that directory +into the final destination during the job commit phase: + +``` +spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1 +``` + +The slow performance of mimicked renames on Amazon S3 makes this algorithm +very, very slow. The recommended solution to this is switch to an S3 "Zero Rename" +committer (see below). + +For reference, here are the performance and safety characteristics of +different stores and connectors when renaming directories: + +| Store | Connector | Directory Rename Safety | Rename Performance | +|---------------|-----------|-------------------------|--------------------| +| Amazon S3 | s3a | Unsafe | O(data) | +| Azure Storage | wasb | Safe | O(files) | +| Azure Datalake Gen 2 | abfs | Safe | O(1) | +| Google GCS | gs | Safe | O(1) | + As storing temporary files can run up charges; delete -directories called `"_temporary"` on a regular basis to avoid this. +directories called `"_temporary"` on a regular basis. ### Parquet I/O Settings @@ -190,15 +212,49 @@ while they are still being written. Applications can write straight to the monit atomic `rename()` operation. Otherwise the checkpointing may be slow and potentially unreliable. +## Committing work into cloud storage safely and fast. + +As covered earlier, commit-by-rename is dangerous on any object store which +exhibits eventual consistency (example: S3), and often slower than classic +filesystem renames. + +Some object store connectors provide custom committers to commit tasks and +jobs without using rename. In versions of Spark built with Hadoop 3.1 or later, +the S3A connector for AWS S3 is such a committer. + +Instead of writing data to a temporary directory on the store for renaming, +these committers write the files to the final destination, but do not issue +the final POST command to make a large "multi-part" upload visible. Those +operations are postponed until the job commit itself. As a result, task and +job commit are much faster, and task failures do not affect the result. + +To switch to the S3A committers, use a version of Spark was built with Hadoop +3.1 or later, and switch the committers through the following options. + +``` +spark.hadoop.fs.s3a.committer.name directory +spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol +spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter +``` + +It has been tested with the most common formats supported by Spark. + +```python +mydataframe.write.format("parquet").save("s3a://bucket/destination") +``` + +More details on these committers can be found in the latest Hadoop documentation. + ## Further Reading Here is the documentation on the standard connectors both from Apache and the cloud providers. -* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html). Hadoop 2.6+ -* [Azure Blob Storage](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html). Since Hadoop 2.7 -* [Azure Data Lake](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html). Since Hadoop 2.8 -* [Amazon S3 via S3A and S3N](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html). Hadoop 2.6+ +* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html). +* [Azure Blob Storage and Azure Datalake Gen 2](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html). +* [Azure Data Lake Gen 1](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html). +* [Hadoop-AWS module (Hadoop 3.x)](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html). +* [Amazon S3 via S3A and S3N (Hadoop 2.x)](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html). * [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon * [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/hadoop/google-cloud-storage-connector). From Google - +* [The Azure Blob Filesystem driver (ABFS)](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-abfs-driver) diff --git a/hadoop-cloud/pom.xml b/hadoop-cloud/pom.xml index dbf4b98..31c729c 100644 --- a/hadoop-cloud/pom.xml +++ b/hadoop-cloud/pom.xml @@ -198,6 +198,45 @@ --> <profile> <id>hadoop-3.2</id> + <properties> + <extra.source.dir>src/hadoop-3/main/scala</extra.source.dir> + <extra.testsource.dir>src/hadoop-3/test/scala</extra.testsource.dir> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-scala-sources</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>${extra.source.dir}</source> + </sources> + </configuration> + </execution> + <execution> + <id>add-scala-test-sources</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>${extra.testsource.dir}</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> <dependencies> <!-- There's now a hadoop-cloud-storage which transitively pulls in the store JARs, diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala new file mode 100644 index 0000000..81a5738 --- /dev/null +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, PathOutputCommitter} +import org.apache.parquet.hadoop.ParquetOutputCommitter + +import org.apache.spark.internal.Logging + +/** + * This Parquet Committer subclass dynamically binds to the factory-configured + * output committer, and is intended to allow callers to use any 'PathOutputCommitter', + * even if not a subclass of 'ParquetOutputCommitter'. + * + * The Parquet `parquet.enable.summary-metadata` option will only be supported + * if the instantiated committer itself supports it. + */ +class BindingParquetOutputCommitter( + path: Path, + context: TaskAttemptContext) + extends ParquetOutputCommitter(path, context) with Logging { + + logTrace(s"${this.getClass.getName} binding to configured PathOutputCommitter and dest $path") + + private val committer = new BindingPathOutputCommitter(path, context) + + /** + * This is the committer ultimately bound to. + * @return the committer instantiated by the factory. + */ + private[cloud] def boundCommitter(): PathOutputCommitter = { + committer.getCommitter + } + + override def getWorkPath(): Path = { + committer.getWorkPath() + } + + override def setupTask(taskAttemptContext: TaskAttemptContext): Unit = { + committer.setupTask(taskAttemptContext) + } + + override def commitTask(taskAttemptContext: TaskAttemptContext): Unit = { + committer.commitTask(taskAttemptContext) + } + + override def abortTask(taskAttemptContext: TaskAttemptContext): Unit = { + committer.abortTask(taskAttemptContext) + } + + override def setupJob(jobContext: JobContext): Unit = { + committer.setupJob(jobContext) + } + + override def needsTaskCommit(taskAttemptContext: TaskAttemptContext): Boolean = { + committer.needsTaskCommit(taskAttemptContext) + } + + override def cleanupJob(jobContext: JobContext): Unit = { + committer.cleanupJob(jobContext) + } + + override def isCommitJobRepeatable(jobContext: JobContext): Boolean = { + committer.isCommitJobRepeatable(jobContext) + } + + override def commitJob(jobContext: JobContext): Unit = { + committer.commitJob(jobContext) + } + + override def recoverTask(taskAttemptContext: TaskAttemptContext): Unit = { + committer.recoverTask(taskAttemptContext) + } + + /** + * Abort the job; log and ignore any IO exception thrown. + * This is invariably invoked in an exception handler; raising + * an exception here will lose the root cause of the failure. + * + * @param jobContext job context + * @param state final state of the job + */ + override def abortJob(jobContext: JobContext, state: JobStatus.State): Unit = { + try { + committer.abortJob(jobContext, state) + } catch { + case e: IOException => + // swallow exception to avoid problems when called within exception + // handlers + logWarning("Abort job failed", e) + } + } + + override def isRecoverySupported: Boolean = { + committer.isRecoverySupported() + } + + override def isRecoverySupported(jobContext: JobContext): Boolean = { + committer.isRecoverySupported(jobContext) + } + + override def toString: String = s"BindingParquetOutputCommitter($committer)" +} diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala new file mode 100644 index 0000000..2ca5087 --- /dev/null +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} + +import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol + +/** + * Spark Commit protocol for Path Output Committers. + * This committer will work with the `FileOutputCommitter` and subclasses. + * All implementations *must* be serializable. + * + * Rather than ask the `FileOutputFormat` for a committer, it uses the + * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` factory + * API to create the committer. + * + * In `setupCommitter` the factory is identified and instantiated; + * this factory then creates the actual committer implementation. + * + * @constructor Instantiate. dynamic partition overwrite is not supported, + * so that committers for stores which do not support rename + * will not get confused. + * @param jobId job + * @param dest destination + * @param dynamicPartitionOverwrite does the caller want support for dynamic + * partition overwrite. If so, it will be + * refused. + */ +class PathOutputCommitProtocol( + jobId: String, + dest: String, + dynamicPartitionOverwrite: Boolean = false) + extends HadoopMapReduceCommitProtocol(jobId, dest, false) with Serializable { + + if (dynamicPartitionOverwrite) { + // until there's explicit extensions to the PathOutputCommitProtocols + // to support the spark mechanism, it's left to the individual committer + // choice to handle partitioning. + throw new IOException(PathOutputCommitProtocol.UNSUPPORTED) + } + + /** The committer created. */ + @transient private var committer: PathOutputCommitter = _ + + require(dest != null, "Null destination specified") + + private[cloud] val destination: String = dest + + /** The destination path. This is serializable in Hadoop 3. */ + private[cloud] val destPath: Path = new Path(destination) + + logTrace(s"Instantiated committer with job ID=$jobId;" + + s" destination=$destPath;" + + s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite") + + import PathOutputCommitProtocol._ + + /** + * Set up the committer. + * This creates it by talking directly to the Hadoop factories, instead + * of the V1 `mapred.FileOutputFormat` methods. + * @param context task attempt + * @return the committer to use. This will always be a subclass of + * `PathOutputCommitter`. + */ + override protected def setupCommitter(context: TaskAttemptContext): PathOutputCommitter = { + logTrace(s"Setting up committer for path $destination") + committer = PathOutputCommitterFactory.createCommitter(destPath, context) + + // Special feature to force out the FileOutputCommitter, so as to guarantee + // that the binding is working properly. + val rejectFileOutput = context.getConfiguration + .getBoolean(REJECT_FILE_OUTPUT, REJECT_FILE_OUTPUT_DEFVAL) + if (rejectFileOutput && committer.isInstanceOf[FileOutputCommitter]) { + // the output format returned a file output format committer, which + // is exactly what we do not want. So switch back to the factory. + val factory = PathOutputCommitterFactory.getCommitterFactory( + destPath, + context.getConfiguration) + logTrace(s"Using committer factory $factory") + committer = factory.createOutputCommitter(destPath, context) + } + + logTrace(s"Using committer ${committer.getClass}") + logTrace(s"Committer details: $committer") + if (committer.isInstanceOf[FileOutputCommitter]) { + require(!rejectFileOutput, + s"Committer created is the FileOutputCommitter $committer") + + if (committer.isCommitJobRepeatable(context)) { + // If FileOutputCommitter says its job commit is repeatable, it means + // it is using the v2 algorithm, which is not safe for task commit + // failures. Warn + logTrace(s"Committer $committer may not be tolerant of task commit failures") + } + } + committer + } + + /** + * Create a temporary file for a task. + * + * @param taskContext task context + * @param dir optional subdirectory + * @param ext file extension + * @return a path as a string + */ + override def newTaskTempFile( + taskContext: TaskAttemptContext, + dir: Option[String], + ext: String): String = { + + val workDir = committer.getWorkPath + val parent = dir.map { + d => new Path(workDir, d) + }.getOrElse(workDir) + val file = new Path(parent, getFilename(taskContext, ext)) + logTrace(s"Creating task file $file for dir $dir and ext $ext") + file.toString + } + +} + +object PathOutputCommitProtocol { + + /** + * Hadoop configuration option. + * Fail fast if the committer is using the path output protocol. + * This option can be used to catch configuration issues early. + * + * It's mostly relevant when testing/diagnostics, as it can be used to + * enforce that schema-specific options are triggering a switch + * to a new committer. + */ + val REJECT_FILE_OUTPUT = "pathoutputcommit.reject.fileoutput" + + /** + * Default behavior: accept the file output. + */ + val REJECT_FILE_OUTPUT_DEFVAL = false + + /** Error string for tests. */ + private[cloud] val UNSUPPORTED: String = "PathOutputCommitProtocol does not support" + + " dynamicPartitionOverwrite" + +} diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala new file mode 100644 index 0000000..546f542 --- /dev/null +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.{File, FileInputStream, FileOutputStream, IOException, ObjectInputStream, ObjectOutputStream} +import java.lang.reflect.InvocationTargetException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.IOUtils +import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptID} +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl + +import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.io.FileCommitProtocol + +class CommitterBindingSuite extends SparkFunSuite { + + private val jobId = "2007071202143_0101" + private val taskAttempt0 = "attempt_" + jobId + "_m_000000_0" + private val taskAttemptId0 = TaskAttemptID.forName(taskAttempt0) + + /** + * The classname to use when referring to the path output committer. + */ + private val pathCommitProtocolClassname: String = classOf[PathOutputCommitProtocol].getName + + /** hadoop-mapreduce option to enable the _SUCCESS marker. */ + private val successMarker = "mapreduce.fileoutputcommitter.marksuccessfuljobs" + + /** + * Does the + * [[BindingParquetOutputCommitter]] committer bind to the schema-specific + * committer declared for the destination path? And that lifecycle events + * are correctly propagated? + */ + test("BindingParquetOutputCommitter binds to the inner committer") { + val path = new Path("http://example/data") + val job = newJob(path) + val conf = job.getConfiguration + conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + + StubPathOutputCommitterFactory.bind(conf, "http") + val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) + val parquet = new BindingParquetOutputCommitter(path, tContext) + val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitter] + parquet.setupJob(tContext) + assert(inner.jobSetup, s"$inner job not setup") + parquet.setupTask(tContext) + assert(inner.taskSetup, s"$inner task not setup") + assert(parquet.needsTaskCommit(tContext), "needsTaskCommit false") + inner.needsTaskCommit = false + assert(!parquet.needsTaskCommit(tContext), "needsTaskCommit true") + parquet.commitTask(tContext) + assert(inner.taskCommitted, s"$inner task not committed") + parquet.abortTask(tContext) + assert(inner.taskAborted, s"$inner task not aborted") + parquet.commitJob(tContext) + assert(inner.jobCommitted, s"$inner job not committed") + parquet.abortJob(tContext, JobStatus.State.RUNNING) + assert(inner.jobAborted, s"$inner job not aborted") + } + + /** + * Create a a new job. Sets the task attempt ID. + * + * @return the new job + */ + def newJob(outDir: Path): Job = { + val job = Job.getInstance(new Configuration()) + val conf = job.getConfiguration + conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) + conf.setBoolean(successMarker, true) + FileOutputFormat.setOutputPath(job, outDir) + job + } + + test("committer protocol can be serialized and deserialized") { + val tempDir = File.createTempFile("ser", ".bin") + + tempDir.delete() + val committer = new PathOutputCommitProtocol(jobId, tempDir.toURI.toString, false) + + val serData = File.createTempFile("ser", ".bin") + var out: ObjectOutputStream = null + var in: ObjectInputStream = null + + try { + out = new ObjectOutputStream(new FileOutputStream(serData)) + out.writeObject(committer) + out.close + in = new ObjectInputStream(new FileInputStream(serData)) + val result = in.readObject() + + val committer2 = result.asInstanceOf[PathOutputCommitProtocol] + + assert(committer.destination === committer2.destination, + "destination mismatch on round trip") + assert(committer.destPath === committer2.destPath, + "destPath mismatch on round trip") + } finally { + IOUtils.closeStreams(out, in) + serData.delete() + } + } + + test("local filesystem instantiation") { + val instance = FileCommitProtocol.instantiate( + pathCommitProtocolClassname, + jobId, "file:///tmp", false) + + val protocol = instance.asInstanceOf[PathOutputCommitProtocol] + assert("file:///tmp" === protocol.destination) + } + + test("reject dynamic partitioning") { + val cause = intercept[InvocationTargetException] { + FileCommitProtocol.instantiate( + pathCommitProtocolClassname, + jobId, "file:///tmp", true) + }.getCause + if (cause == null || !cause.isInstanceOf[IOException] + || !cause.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) { + throw cause + } + } + +} + diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala new file mode 100644 index 0000000..88a36d2 --- /dev/null +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.{PathOutputCommitter, PathOutputCommitterFactory} + +/** + * A local path output committer which tracks its state, for use in tests. + * @param outputPath final destination. + * @param workPath work path + * @param context task/job attempt. + */ +class StubPathOutputCommitter( + outputPath: Path, + workPath: Path, + context: TaskAttemptContext) extends PathOutputCommitter(workPath, context) { + + var jobSetup: Boolean = false + var jobCommitted: Boolean = false + var jobAborted: Boolean = false + + var taskSetup: Boolean = false + var taskCommitted: Boolean = false + var taskAborted: Boolean = false + var needsTaskCommit: Boolean = true + + override def getOutputPath: Path = outputPath + + override def getWorkPath: Path = { + workPath + } + + override def setupTask(taskAttemptContext: TaskAttemptContext): Unit = { + taskSetup = true + } + + override def abortTask(taskAttemptContext: TaskAttemptContext): Unit = { + taskAborted = true + } + + override def commitTask(taskAttemptContext: TaskAttemptContext): Unit = { + taskCommitted = true + } + + override def setupJob(jobContext: JobContext): Unit = { + jobSetup = true + } + + override def commitJob(jobContext: JobContext): Unit = { + jobCommitted = true + } + + override def abortJob( + jobContext: JobContext, + state: JobStatus.State): Unit = { + jobAborted = true + } + + override def needsTaskCommit(taskAttemptContext: TaskAttemptContext): Boolean = { + needsTaskCommit + } + + override def toString(): String = s"StubPathOutputCommitter(setup=$jobSetup," + + s" committed=$jobCommitted, aborted=$jobAborted)" +} + +class StubPathOutputCommitterFactory extends PathOutputCommitterFactory { + + override def createOutputCommitter( + outputPath: Path, + context: TaskAttemptContext): PathOutputCommitter = { + new StubPathOutputCommitter(outputPath, workPath(outputPath), context) + } + + private def workPath(out: Path): Path = new Path(out, + StubPathOutputCommitterFactory.TEMP_DIR_NAME) +} + +object StubPathOutputCommitterFactory { + + /** + * This is the "Pending" directory of the FileOutputCommitter; + * data written here is, in that algorithm, renamed into place. + */ + val TEMP_DIR_NAME = "_temporary" + + /** + * Scheme prefix for per-filesystem scheme committers. + */ + val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme" + + /** + * Given a hadoop configuration, set up the factory binding for the scheme. + * @param conf config to patch + * @param scheme filesystem scheme. + */ + def bind(conf: Configuration, scheme: String): Unit = { + val key = OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme + conf.set(key, classOf[StubPathOutputCommitterFactory].getName()) + } + +} diff --git a/hadoop-cloud/src/test/resources/log4j.properties b/hadoop-cloud/src/test/resources/log4j.properties new file mode 100644 index 0000000..fb9d985 --- /dev/null +++ b/hadoop-cloud/src/test/resources/log4j.properties @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/unit-tests.log +test.appender=file +log4j.rootCategory=INFO, ${test.appender} +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Tests that launch java subprocesses can set the "test.appender" system property to +# "console" to avoid having the child process's logs overwrite the unit test's +# log file. +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%t: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.spark_project.jetty=WARN --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org