Repository: spark Updated Branches: refs/heads/master d6be46eb9 -> 928d0739c
[SPARK-25595] Ignore corrupt Avro files if flag IGNORE_CORRUPT_FILES enabled ## What changes were proposed in this pull request? With flag `IGNORE_CORRUPT_FILES` enabled, schema inference should ignore corrupt Avro files, which is consistent with Parquet and Orc data source. ## How was this patch tested? Unit test Closes #22611 from gengliangwang/ignoreCorruptAvro. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: hyukjinkwon <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/928d0739 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/928d0739 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/928d0739 Branch: refs/heads/master Commit: 928d0739c45d0fbb1d3bfc09c0ed7a213f09f3e5 Parents: d6be46e Author: Gengliang Wang <[email protected]> Authored: Wed Oct 3 17:08:55 2018 +0800 Committer: hyukjinkwon <[email protected]> Committed: Wed Oct 3 17:08:55 2018 +0800 ---------------------------------------------------------------------- .../apache/spark/sql/avro/AvroFileFormat.scala | 78 +++++++++++++------- .../org/apache/spark/sql/avro/AvroSuite.scala | 43 +++++++++++ 2 files changed, 93 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/928d0739/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 6df23c9..e60fa88 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -32,14 +32,14 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce.Job -import org.apache.spark.TaskContext +import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile} import org.apache.spark.sql.sources.{DataSourceRegister, Filter} import org.apache.spark.sql.types.StructType -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.{SerializableConfiguration, Utils} private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister with Logging with Serializable { @@ -59,36 +59,13 @@ private[avro] class AvroFileFormat extends FileFormat val conf = spark.sessionState.newHadoopConf() val parsedOptions = new AvroOptions(options, conf) - // Schema evolution is not supported yet. Here we only pick a single random sample file to - // figure out the schema of the whole dataset. - val sampleFile = - if (parsedOptions.ignoreExtension) { - files.headOption.getOrElse { - throw new FileNotFoundException("Files for schema inferring have been not found.") - } - } else { - files.find(_.getPath.getName.endsWith(".avro")).getOrElse { - throw new FileNotFoundException( - "No Avro files found. If files don't have .avro extension, set ignoreExtension to true") - } - } - // User can specify an optional avro json schema. val avroSchema = parsedOptions.schema .map(new Schema.Parser().parse) .getOrElse { - val in = new FsInput(sampleFile.getPath, conf) - try { - val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]()) - try { - reader.getSchema - } finally { - reader.close() - } - } finally { - in.close() - } - } + inferAvroSchemaFromFiles(files, conf, parsedOptions.ignoreExtension, + spark.sessionState.conf.ignoreCorruptFiles) + } SchemaConverters.toSqlType(avroSchema).dataType match { case t: StructType => Some(t) @@ -100,6 +77,51 @@ private[avro] class AvroFileFormat extends FileFormat } } + private def inferAvroSchemaFromFiles( + files: Seq[FileStatus], + conf: Configuration, + ignoreExtension: Boolean, + ignoreCorruptFiles: Boolean): Schema = { + // Schema evolution is not supported yet. Here we only pick first random readable sample file to + // figure out the schema of the whole dataset. + val avroReader = files.iterator.map { f => + val path = f.getPath + if (!ignoreExtension && !path.getName.endsWith(".avro")) { + None + } else { + Utils.tryWithResource { + new FsInput(path, conf) + } { in => + try { + Some(DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())) + } catch { + case e: IOException => + if (ignoreCorruptFiles) { + logWarning(s"Skipped the footer in the corrupted file: $path", e) + None + } else { + throw new SparkException(s"Could not read file: $path", e) + } + } + } + } + }.collectFirst { + case Some(reader) => reader + } + + avroReader match { + case Some(reader) => + try { + reader.getSchema + } finally { + reader.close() + } + case None => + throw new FileNotFoundException( + "No Avro files found. If files don't have .avro extension, set ignoreExtension to true") + } + } + override def shortName(): String = "avro" override def isSplitable( http://git-wip-us.apache.org/repos/asf/spark/blob/928d0739/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 9ad4388..1e08f7b 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { import testImplicits._ @@ -342,6 +343,48 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + private def createDummyCorruptFile(dir: File): Unit = { + Utils.tryWithResource { + FileUtils.forceMkdir(dir) + val corruptFile = new File(dir, "corrupt.avro") + new BufferedWriter(new FileWriter(corruptFile)) + } { writer => + writer.write("corrupt") + } + } + + test("Ignore corrupt Avro file if flag IGNORE_CORRUPT_FILES enabled") { + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + withTempPath { dir => + createDummyCorruptFile(dir) + val message = intercept[FileNotFoundException] { + spark.read.format("avro").load(dir.getAbsolutePath).schema + }.getMessage + assert(message.contains("No Avro files found.")) + + val srcFile = new File("src/test/resources/episodes.avro") + val destFile = new File(dir, "episodes.avro") + FileUtils.copyFile(srcFile, destFile) + + val result = spark.read.format("avro").load(srcFile.getAbsolutePath).collect() + checkAnswer(spark.read.format("avro").load(dir.getAbsolutePath), result) + } + } + } + + test("Throws IOException on reading corrupt Avro file if flag IGNORE_CORRUPT_FILES disabled") { + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + withTempPath { dir => + createDummyCorruptFile(dir) + val message = intercept[org.apache.spark.SparkException] { + spark.read.format("avro").load(dir.getAbsolutePath) + }.getMessage + + assert(message.contains("Could not read file")) + } + } + } + test("Date field type") { withTempPath { dir => val schema = StructType(Seq( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
