Repository: spark Updated Branches: refs/heads/master ed5c2dccd -> bbdfc0a40
[SPARK-8121] [SQL] Fixes InsertIntoHadoopFsRelation job initialization for Hadoop 1.x For Hadoop 1.x, `TaskAttemptContext` constructor clones the `Configuration` argument, thus configurations done in `HadoopFsRelation.prepareForWriteJob()` are not populated to *driver* side `TaskAttemptContext` (executor side configurations are properly populated). Currently this should only affect Parquet output committer class configuration. Author: Cheng Lian <l...@databricks.com> Closes #6669 from liancheng/spark-8121 and squashes the following commits: 73819e8 [Cheng Lian] Minor logging fix fce089c [Cheng Lian] Adds more logging b6f78a6 [Cheng Lian] Fixes compilation error introduced while rebasing 963a1aa [Cheng Lian] Addresses @yhuai's comment c3a0b1a [Cheng Lian] Fixes InsertIntoHadoopFsRelation job initialization Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bbdfc0a4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bbdfc0a4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bbdfc0a4 Branch: refs/heads/master Commit: bbdfc0a40fb39760c122e7b9ce80aa1e340e55ee Parents: ed5c2dc Author: Cheng Lian <l...@databricks.com> Authored: Mon Jun 8 11:34:18 2015 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Mon Jun 8 11:34:18 2015 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/SQLConf.scala | 1 + .../apache/spark/sql/parquet/newParquet.scala | 7 +++ .../org/apache/spark/sql/sources/commands.scala | 18 +++++-- .../spark/sql/parquet/ParquetIOSuite.scala | 52 +++++++++++++++++--- 4 files changed, 65 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bbdfc0a4/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index c778889..be786f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -76,6 +76,7 @@ private[spark] object SQLConf { // The output committer class used by FSBasedRelation. The specified class needs to be a // subclass of org.apache.hadoop.mapreduce.OutputCommitter. + // NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf` val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass" // Whether to perform eager analysis when constructing a dataframe. http://git-wip-us.apache.org/repos/asf/spark/blob/bbdfc0a4/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 5dda440..7af4eb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -212,6 +212,13 @@ private[sql] class ParquetRelation2( classOf[ParquetOutputCommitter], classOf[ParquetOutputCommitter]) + if (conf.get("spark.sql.parquet.output.committer.class") == null) { + logInfo("Using default output committer for Parquet: " + + classOf[ParquetOutputCommitter].getCanonicalName) + } else { + logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName) + } + conf.setClass( SQLConf.OUTPUT_COMMITTER_CLASS, committerClass, http://git-wip-us.apache.org/repos/asf/spark/blob/bbdfc0a4/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index bd3aad6..c94199b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -297,12 +297,16 @@ private[sql] abstract class BaseWriterContainer( def driverSideSetup(): Unit = { setupIDs(0, 0, 0) setupConf() - taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId) - // This preparation must happen before initializing output format and output committer, since - // their initialization involves the job configuration, which can be potentially decorated in - // `relation.prepareJobForWrite`. + // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor + // clones the Configuration object passed in. If we initialize the TaskAttemptContext first, + // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext. + // + // Also, the `prepareJobForWrite` call must happen before initializing output format and output + // committer, since their initialization involve the job configuration, which can be potentially + // decorated in `prepareJobForWrite`. outputWriterFactory = relation.prepareJobForWrite(job) + taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId) outputFormatClass = job.getOutputFormatClass outputCommitter = newOutputCommitter(taskAttemptContext) @@ -331,6 +335,8 @@ private[sql] abstract class BaseWriterContainer( SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter]) Option(committerClass).map { clazz => + logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") + // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat // has an associated output committer. To override this output committer, // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. @@ -350,7 +356,9 @@ private[sql] abstract class BaseWriterContainer( }.getOrElse { // If output committer class is not set, we will use the one associated with the // file output format. - outputFormatClass.newInstance().getOutputCommitter(context) + val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context) + logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}") + outputCommitter } } http://git-wip-us.apache.org/repos/asf/spark/blob/bbdfc0a4/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index 2b6a270..46b2585 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -23,16 +23,18 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} -import org.scalatest.BeforeAndAfterAll +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.parquet.example.data.simple.SimpleGroup import org.apache.parquet.example.data.{Group, GroupWriter} import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext -import org.apache.parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName} -import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter} +import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} +import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter} import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} +import org.scalatest.BeforeAndAfterAll +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.catalyst.util.DateUtils @@ -196,7 +198,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { withParquetDataFrame(allNulls :: Nil) { df => val rows = df.collect() - assert(rows.size === 1) + assert(rows.length === 1) assert(rows.head === Row(Seq.fill(5)(null): _*)) } } @@ -209,7 +211,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { withParquetDataFrame(allNones :: Nil) { df => val rows = df.collect() - assert(rows.size === 1) + assert(rows.length === 1) assert(rows.head === Row(Seq.fill(3)(null): _*)) } } @@ -379,6 +381,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { } test("SPARK-6352 DirectParquetOutputCommitter") { + val clonedConf = new Configuration(configuration) + // Write to a parquet file and let it fail. // _temporary should be missing if direct output committer works. try { @@ -393,14 +397,46 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { val fs = path.getFileSystem(configuration) assert(!fs.exists(path)) } + } finally { + // Hadoop 1 doesn't have `Configuration.unset` + configuration.clear() + clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) } - finally { - configuration.set("spark.sql.parquet.output.committer.class", - "org.apache.parquet.hadoop.ParquetOutputCommitter") + } + + test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overriden") { + withTempPath { dir => + val clonedConf = new Configuration(configuration) + + configuration.set( + SQLConf.OUTPUT_COMMITTER_CLASS, classOf[ParquetOutputCommitter].getCanonicalName) + + configuration.set( + "spark.sql.parquet.output.committer.class", + classOf[BogusParquetOutputCommitter].getCanonicalName) + + try { + val message = intercept[SparkException] { + sqlContext.range(0, 1).write.parquet(dir.getCanonicalPath) + }.getCause.getMessage + assert(message === "Intentional exception for testing purposes") + } finally { + // Hadoop 1 doesn't have `Configuration.unset` + configuration.clear() + clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + } } } } +class BogusParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) + extends ParquetOutputCommitter(outputPath, context) { + + override def commitJob(jobContext: JobContext): Unit = { + sys.error("Intentional exception for testing purposes") + } +} + class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll { private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org