This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 57aa3d1ca6e Revert "[SPARK-40034][SQL] PathOutputCommitters to support dynamic partitions" 57aa3d1ca6e is described below commit 57aa3d1ca6e17e4c6b934d74176ea22ca56d60f7 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Tue Feb 28 09:27:00 2023 -0800 Revert "[SPARK-40034][SQL] PathOutputCommitters to support dynamic partitions" This reverts commit 5a599dec507786139fb2ecb7ce1a44c83fd06b0d. --- docs/cloud-integration.md | 112 +-------------- .../io/cloud/BindingParquetOutputCommitter.scala | 8 +- .../io/cloud/PathOutputCommitProtocol.scala | 86 ++---------- .../internal/io/cloud/CommitterBindingSuite.scala | 155 +++------------------ .../io/cloud/StubPathOutputCommitter.scala | 60 ++------ 5 files changed, 47 insertions(+), 374 deletions(-) diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md index 06342645e6d..991ba69c8cb 100644 --- a/docs/cloud-integration.md +++ b/docs/cloud-integration.md @@ -248,13 +248,8 @@ exhibits eventual consistency (example: S3), and often slower than classic filesystem renames. Some object store connectors provide custom committers to commit tasks and -jobs without using rename. - -### Hadoop S3A committers - -In versions of Spark built with Hadoop 3.1 or later, -the hadoop-aws JAR contains committers safe to use for S3 storage -accessed via the s3a connector. +jobs without using rename. In versions of Spark built with Hadoop 3.1 or later, +the S3A connector for AWS S3 is such a committer. Instead of writing data to a temporary directory on the store for renaming, these committers write the files to the final destination, but do not issue @@ -277,111 +272,22 @@ It has been tested with the most common formats supported by Spark. mydataframe.write.format("parquet").save("s3a://bucket/destination") ``` -More details on these committers can be found in -[the latest Hadoop documentation](https://hadoop.apache.org/docs/current/) -with S3A committer detail covered in -[Committing work to S3 with the S3A Committers](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html). +More details on these committers can be found in the latest Hadoop documentation. Note: depending upon the committer used, in-progress statistics may be under-reported with Hadoop versions before 3.3.1. -### Amazon EMR: the EMRFS S3-optimized committer - -Amazon EMR has its own S3-aware committers for parquet data. -For instructions on use, see -[the EMRFS S3-optimized committer](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html) - -For implementation and performanc details, see -["Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer"](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/ - - -### Azure and Google cloud storage: MapReduce Intermediate Manifest Committer. - -Versions of the hadoop-mapreduce-core JAR shipped after September 2022 (3.3.5 and later) -contain a committer optimized for performance and resilience on -Azure ADLS Generation 2 and Google Cloud Storage. - -This committer, the "manifest committer" uses a manifest file to propagate -directory listing information from the task committers to the job committer. -These manifests can be written atomically, without relying on atomic directory rename, -something GCS lacks. - -The job committer reads these manifests and will rename files from the task output -directories directly into the destination directory, in parallel, with optional -rate limiting to avoid throttling IO. -This delivers performance and scalability on the object stores. - -It is not critical for job correctness to use this with Azure storage; the -classic FileOutputCommitter is safe there -however this new committer scales -better for large jobs with deep and wide directory trees. - -Because Google GCS does not support atomic directory renaming, -the manifest committer should be used where available. - -This committer does support "dynamic partition overwrite" (see below). - -For details on availability and use of this committer, consult -the hadoop documentation for the Hadoop release used. - -It is not available on Hadoop 3.3.4 or earlier. - -### IBM Cloud Object Storage: Stocator - -IBM provide the Stocator output committer for IBM Cloud Object Storage and OpenStack Swift. - -Source, documentation and releasea can be found at -[https://github.com/CODAIT/stocator](Stocator - Storage Connector for Apache Spark). - - -## Cloud Committers and `INSERT OVERWRITE TABLE` - -Spark has a feature called "dynamic partition overwrite"; a table can be updated and only those -partitions into which new data is added will have their contents replaced. - -This is used in SQL statements of the form `INSERT OVERWRITE TABLE`, -and when Datasets are written in mode "overwrite" - -{% highlight scala %} -eventDataset.write - .mode("overwrite") - .partitionBy("year", "month") - .format("parquet") - .save(tablePath) -{% endhighlight %} - -This feature uses file renaming and has specific requirements of -both the committer and the filesystem: - -1. The committer's working directory must be in the destination filesystem. -2. The target filesystem must support file rename efficiently. - -These conditions are _not_ met by the S3A committers and AWS S3 storage. - -Committers for other cloud stores _may_ support this feature, and -declare to spark that they are compatible. If dynamic partition overwrite -is required when writing data through a hadoop committer, Spark -will always permit this when the original `FileOutputCommitter` -is used. For other committers, after their instantiation, Spark -will probe for their declaration of compatibility, and -permit the operation if state that they are compatible. - -If the committer is not compatible, the operation will fail with -the error message -`PathOutputCommitter does not support dynamicPartitionOverwrite` - -Unless there is a compatible committer for the target filesystem, -the sole solution is to use a cloud-friendly format for data -storage. - ## Further Reading Here is the documentation on the standard connectors both from Apache and the cloud providers. +* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html). * [Azure Blob Storage](https://hadoop.apache.org/docs/current/hadoop-azure/index.html). * [Azure Blob Filesystem (ABFS) and Azure Datalake Gen 2](https://hadoop.apache.org/docs/current/hadoop-azure/abfs.html). * [Azure Data Lake Gen 1](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html). * [Amazon S3 Strong Consistency](https://aws.amazon.com/s3/consistency/) * [Hadoop-AWS module (Hadoop 3.x)](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html). +* [Amazon S3 via S3A and S3N (Hadoop 2.x)](https://hadoop.apache.org/docs/current2/hadoop-aws/tools/hadoop-aws/index.html). * [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon. * [Using the EMRFS S3-optimized Committer](https://docs.amazonaws.cn/en_us/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html) * [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage). From Google. @@ -389,11 +295,3 @@ Here is the documentation on the standard connectors both from Apache and the cl * IBM Cloud Object Storage connector for Apache Spark: [Stocator](https://github.com/CODAIT/stocator), [IBM Object Storage](https://www.ibm.com/cloud/object-storage). From IBM. * [Using JindoFS SDK to access Alibaba Cloud OSS](https://github.com/aliyun/alibabacloud-jindofs). - -The Cloud Committer problem and hive-compatible solutions -* [Committing work to S3 with the S3A Committers](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html) -* [Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/) -* [The Manifest Committer for Azure and Google Cloud Storage](https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md) -* [A Zero-rename committer](https://github.com/steveloughran/zero-rename-committer/releases/). -* [Stocator: A High Performance Object Store Connector for Spark](http://arxiv.org/abs/1709.01812) - diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala index 1e740a6e778..81a57385dd9 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala @@ -19,7 +19,7 @@ package org.apache.spark.internal.io.cloud import java.io.IOException -import org.apache.hadoop.fs.{Path, StreamCapabilities} +import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, PathOutputCommitter} import org.apache.parquet.hadoop.ParquetOutputCommitter @@ -37,7 +37,7 @@ import org.apache.spark.internal.Logging class BindingParquetOutputCommitter( path: Path, context: TaskAttemptContext) - extends ParquetOutputCommitter(path, context) with Logging with StreamCapabilities { + extends ParquetOutputCommitter(path, context) with Logging { logTrace(s"${this.getClass.getName} binding to configured PathOutputCommitter and dest $path") @@ -119,8 +119,4 @@ class BindingParquetOutputCommitter( } override def toString: String = s"BindingParquetOutputCommitter($committer)" - - override def hasCapability(capability: String): Boolean = - committer.isInstanceOf[StreamCapabilities] && - committer.asInstanceOf[StreamCapabilities].hasCapability(capability) } diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index 44a521bd636..fc5d0a0b3a7 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -19,7 +19,7 @@ package org.apache.spark.internal.io.cloud import java.io.IOException -import org.apache.hadoop.fs.{Path, StreamCapabilities} +import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} @@ -38,28 +38,27 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol * In `setupCommitter` the factory is identified and instantiated; * this factory then creates the actual committer implementation. * - * Dynamic Partition support will be determined once the committer is - * instantiated in the setupJob/setupTask methods. If this - * class was instantiated with `dynamicPartitionOverwrite` set to true, - * then the instantiated committer must either be an instance of - * `FileOutputCommitter` or it must implement the `StreamCapabilities` - * interface and declare that it has the capability - * `mapreduce.job.committer.dynamic.partitioning`. - * That feature is available on Hadoop releases with the Intermediate - * Manifest Committer for GCS and ABFS; it is not supported by the - * S3A committers. - * @constructor Instantiate. + * @constructor Instantiate. dynamic partition overwrite is not supported, + * so that committers for stores which do not support rename + * will not get confused. * @param jobId job * @param dest destination * @param dynamicPartitionOverwrite does the caller want support for dynamic - * partition overwrite? + * partition overwrite. If so, it will be + * refused. */ class PathOutputCommitProtocol( jobId: String, dest: String, dynamicPartitionOverwrite: Boolean = false) - extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite) - with Serializable { + extends HadoopMapReduceCommitProtocol(jobId, dest, false) with Serializable { + + if (dynamicPartitionOverwrite) { + // until there's explicit extensions to the PathOutputCommitProtocols + // to support the spark mechanism, it's left to the individual committer + // choice to handle partitioning. + throw new IOException(PathOutputCommitProtocol.UNSUPPORTED) + } /** The committer created. */ @transient private var committer: PathOutputCommitter = _ @@ -115,33 +114,10 @@ class PathOutputCommitProtocol( // failures. Warn logTrace(s"Committer $committer may not be tolerant of task commit failures") } - } else { - // if required other committers need to be checked for dynamic partition - // compatibility through a StreamCapabilities probe. - if (dynamicPartitionOverwrite) { - if (supportsDynamicPartitions) { - logDebug( - s"Committer $committer has declared compatibility with dynamic partition overwrite") - } else { - throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer) - } - } } committer } - - /** - * Does the instantiated committer support dynamic partitions? - * @return true if the committer declares itself compatible. - */ - private def supportsDynamicPartitions = { - committer.isInstanceOf[FileOutputCommitter] || - (committer.isInstanceOf[StreamCapabilities] && - committer.asInstanceOf[StreamCapabilities] - .hasCapability(CAPABILITY_DYNAMIC_PARTITIONING)) - } - /** * Create a temporary file for a task. * @@ -164,28 +140,6 @@ class PathOutputCommitProtocol( file.toString } - /** - * Reject any requests for an absolute path file on a committer which - * is not compatible with it. - * - * @param taskContext task context - * @param absoluteDir final directory - * @param spec output filename - * @return a path string - * @throws UnsupportedOperationException if incompatible - */ - override def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, - absoluteDir: String, - spec: FileNameSpec): String = { - - if (supportsDynamicPartitions) { - super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec) - } else { - throw new UnsupportedOperationException(s"Absolute output locations not supported" + - s" by committer $committer") - } - } } object PathOutputCommitProtocol { @@ -207,17 +161,7 @@ object PathOutputCommitProtocol { val REJECT_FILE_OUTPUT_DEFVAL = false /** Error string for tests. */ - private[cloud] val UNSUPPORTED: String = "PathOutputCommitter does not support" + + private[cloud] val UNSUPPORTED: String = "PathOutputCommitProtocol does not support" + " dynamicPartitionOverwrite" - /** - * Stream Capabilities probe for spark dynamic partitioning compatibility. - */ - private[cloud] val CAPABILITY_DYNAMIC_PARTITIONING = - "mapreduce.job.committer.dynamic.partitioning" - - /** - * Scheme prefix for per-filesystem scheme committers. - */ - private[cloud] val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme" } diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala index 984c7dbc2cb..546f54229ea 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala @@ -18,17 +18,17 @@ package org.apache.spark.internal.io.cloud import java.io.{File, FileInputStream, FileOutputStream, IOException, ObjectInputStream, ObjectOutputStream} +import java.lang.reflect.InvocationTargetException import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, StreamCapabilities} +import org.apache.hadoop.fs.Path import org.apache.hadoop.io.IOUtils -import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptContext, TaskAttemptID} -import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, FileOutputFormat} +import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptID} +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.SparkFunSuite -import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec} -import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{CAPABILITY_DYNAMIC_PARTITIONING, OUTPUTCOMMITTER_FACTORY_SCHEME} +import org.apache.spark.internal.io.FileCommitProtocol class CommitterBindingSuite extends SparkFunSuite { @@ -49,20 +49,18 @@ class CommitterBindingSuite extends SparkFunSuite { * [[BindingParquetOutputCommitter]] committer bind to the schema-specific * committer declared for the destination path? And that lifecycle events * are correctly propagated? - * This only works with a hadoop build where BindingPathOutputCommitter - * does passthrough of stream capabilities, so check that first. */ test("BindingParquetOutputCommitter binds to the inner committer") { - val path = new Path("http://example/data") val job = newJob(path) val conf = job.getConfiguration conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) - StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http") - val tContext: TaskAttemptContext = new TaskAttemptContextImpl(conf, taskAttemptId0) + + StubPathOutputCommitterFactory.bind(conf, "http") + val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) val parquet = new BindingParquetOutputCommitter(path, tContext) - val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitterWithDynamicPartioning] + val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitter] parquet.setupJob(tContext) assert(inner.jobSetup, s"$inner job not setup") parquet.setupTask(tContext) @@ -78,18 +76,6 @@ class CommitterBindingSuite extends SparkFunSuite { assert(inner.jobCommitted, s"$inner job not committed") parquet.abortJob(tContext, JobStatus.State.RUNNING) assert(inner.jobAborted, s"$inner job not aborted") - - val binding = new BindingPathOutputCommitter(path, tContext) - // MAPREDUCE-7403 only arrived after hadoop 3.3.4; this test case - // is designed to work with versions with and without the feature. - if (binding.isInstanceOf[StreamCapabilities]) { - // this version of hadoop does support hasCapability probes - // through the BindingPathOutputCommitter used by the - // parquet committer, so verify that it goes through - // to the stub committer. - assert(parquet.hasCapability(CAPABILITY_DYNAMIC_PARTITIONING), - s"committer $parquet does not declare dynamic partition support") - } } /** @@ -144,124 +130,17 @@ class CommitterBindingSuite extends SparkFunSuite { assert("file:///tmp" === protocol.destination) } - /* - * Bind a job to a committer which doesn't support dynamic partitioning. - * Job setup must fail, and calling `newTaskTempFileAbsPath()` must - * raise `UnsupportedOperationException`. - */ - test("reject dynamic partitioning if not supported") { - val path = new Path("http://example/data") - val job = newJob(path) - val conf = job.getConfiguration - conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) - conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) - StubPathOutputCommitterBinding.bind(conf, "http") - val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) - val committer = FileCommitProtocol.instantiate( - pathCommitProtocolClassname, - jobId, - path.toUri.toString, - true) - val ioe = intercept[IOException] { - committer.setupJob(tContext) - } - if (!ioe.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) { - throw ioe - } - - // calls to newTaskTempFileAbsPath() will be rejected - intercept[UnsupportedOperationException] { - verifyAbsTempFileWorks(tContext, committer) - } - } - - /* - * Bind to a committer with dynamic partitioning support, - * verify that job and task setup works, and that - * `newTaskTempFileAbsPath()` creates a temp file which - * can be moved to an absolute path later. - */ - test("permit dynamic partitioning if the committer says it works") { - val path = new Path("http://example/data") - val job = newJob(path) - val conf = job.getConfiguration - conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) - conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) - StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http") - val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) - val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate( - pathCommitProtocolClassname, - jobId, - path.toUri.toString, - true).asInstanceOf[PathOutputCommitProtocol] - committer.setupJob(tContext) - committer.setupTask(tContext) - verifyAbsTempFileWorks(tContext, committer) - } - - /* - * Create a FileOutputCommitter through the PathOutputCommitProtocol - * using the relevant factory in hadoop-mapreduce-core JAR. - */ - test("FileOutputCommitter through PathOutputCommitProtocol") { - // temp path; use a unique filename - val jobCommitDir = File.createTempFile( - "FileOutputCommitter-through-PathOutputCommitProtocol", - "") - try { - // delete the temp file and create a temp dir. - jobCommitDir.delete(); - val jobUri = jobCommitDir.toURI - // hadoop path of the job - val path = new Path(jobUri) - val job = newJob(path) - val conf = job.getConfiguration - conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0) - conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) - bindToFileOutputCommitterFactory(conf, "file") - val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0) - val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate( + test("reject dynamic partitioning") { + val cause = intercept[InvocationTargetException] { + FileCommitProtocol.instantiate( pathCommitProtocolClassname, - jobId, - jobUri.toString, - true).asInstanceOf[PathOutputCommitProtocol] - committer.setupJob(tContext) - committer.setupTask(tContext) - verifyAbsTempFileWorks(tContext, committer) - } finally { - jobCommitDir.delete(); + jobId, "file:///tmp", true) + }.getCause + if (cause == null || !cause.isInstanceOf[IOException] + || !cause.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) { + throw cause } } - /** - * Verify that a committer supports `newTaskTempFileAbsPath()`. - * - * @param tContext task context - * @param committer committer - */ - private def verifyAbsTempFileWorks( - tContext: TaskAttemptContextImpl, - committer: FileCommitProtocol): Unit = { - val spec = FileNameSpec(".lotus.", ".123") - val absPath = committer.newTaskTempFileAbsPath( - tContext, - "/tmp", - spec) - assert(absPath.endsWith(".123"), s"wrong suffix in $absPath") - assert(absPath.contains("lotus"), s"wrong prefix in $absPath") - } - - /** - * Given a hadoop configuration, explicitly set up the factory binding for the scheme - * to a committer factory which always creates FileOutputCommitters. - * - * @param conf config to patch - * @param scheme filesystem scheme. - */ - def bindToFileOutputCommitterFactory(conf: Configuration, scheme: String): Unit = { - conf.set(OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme, - "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory") - } - } diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala index 5a0dba45ba8..88a36d227b1 100644 --- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala @@ -18,12 +18,10 @@ package org.apache.spark.internal.io.cloud import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, StreamCapabilities} +import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.output.{PathOutputCommitter, PathOutputCommitterFactory} -import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{CAPABILITY_DYNAMIC_PARTITIONING, OUTPUTCOMMITTER_FACTORY_SCHEME} - /** * A local path output committer which tracks its state, for use in tests. * @param outputPath final destination. @@ -93,45 +91,10 @@ class StubPathOutputCommitterFactory extends PathOutputCommitterFactory { } private def workPath(out: Path): Path = new Path(out, - StubPathOutputCommitterBinding.TEMP_DIR_NAME) -} - -/** - * An extension which declares that it supports dynamic partitioning. - * @param outputPath final destination. - * @param workPath work path - * @param context task/job attempt. - */ -class StubPathOutputCommitterWithDynamicPartioning( - outputPath: Path, - workPath: Path, - context: TaskAttemptContext) extends StubPathOutputCommitter(outputPath, workPath, context) - with StreamCapabilities { - - override def hasCapability(capability: String): Boolean = - CAPABILITY_DYNAMIC_PARTITIONING == capability - + StubPathOutputCommitterFactory.TEMP_DIR_NAME) } - -class StubPathOutputCommitterWithDynamicPartioningFactory extends PathOutputCommitterFactory { - - override def createOutputCommitter( - outputPath: Path, - context: TaskAttemptContext): PathOutputCommitter = { - new StubPathOutputCommitterWithDynamicPartioning(outputPath, workPath(outputPath), context) - } - - private def workPath(out: Path): Path = new Path(out, - StubPathOutputCommitterBinding.TEMP_DIR_NAME) -} - - -/** - * Class to help binding job configurations to the different - * stub committers available. - */ -object StubPathOutputCommitterBinding { +object StubPathOutputCommitterFactory { /** * This is the "Pending" directory of the FileOutputCommitter; @@ -139,6 +102,11 @@ object StubPathOutputCommitterBinding { */ val TEMP_DIR_NAME = "_temporary" + /** + * Scheme prefix for per-filesystem scheme committers. + */ + val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme" + /** * Given a hadoop configuration, set up the factory binding for the scheme. * @param conf config to patch @@ -149,16 +117,4 @@ object StubPathOutputCommitterBinding { conf.set(key, classOf[StubPathOutputCommitterFactory].getName()) } - /** - * Bind the configuration/scheme to the stub committer which - * declares support for dynamic partitioning. - * - * @param conf config to patch - * @param scheme filesystem scheme. - */ - def bindWithDynamicPartitioning(conf: Configuration, scheme: String): Unit = { - val key = OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme - conf.set(key, - classOf[StubPathOutputCommitterWithDynamicPartioningFactory].getName()) - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org