Repository: spark
Updated Branches:
  refs/heads/master d6be46eb9 -> 928d0739c


[SPARK-25595] Ignore corrupt Avro files if flag IGNORE_CORRUPT_FILES enabled

## What changes were proposed in this pull request?

With flag `IGNORE_CORRUPT_FILES` enabled, schema inference should ignore 
corrupt Avro files, which is consistent with Parquet and Orc data source.

## How was this patch tested?

Unit test

Closes #22611 from gengliangwang/ignoreCorruptAvro.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/928d0739
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/928d0739
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/928d0739

Branch: refs/heads/master
Commit: 928d0739c45d0fbb1d3bfc09c0ed7a213f09f3e5
Parents: d6be46e
Author: Gengliang Wang <[email protected]>
Authored: Wed Oct 3 17:08:55 2018 +0800
Committer: hyukjinkwon <[email protected]>
Committed: Wed Oct 3 17:08:55 2018 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/avro/AvroFileFormat.scala  | 78 +++++++++++++-------
 .../org/apache/spark/sql/avro/AvroSuite.scala   | 43 +++++++++++
 2 files changed, 93 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/928d0739/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index 6df23c9..e60fa88 100755
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -32,14 +32,14 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapreduce.Job
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriterFactory, PartitionedFile}
 import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 private[avro] class AvroFileFormat extends FileFormat
   with DataSourceRegister with Logging with Serializable {
@@ -59,36 +59,13 @@ private[avro] class AvroFileFormat extends FileFormat
     val conf = spark.sessionState.newHadoopConf()
     val parsedOptions = new AvroOptions(options, conf)
 
-    // Schema evolution is not supported yet. Here we only pick a single 
random sample file to
-    // figure out the schema of the whole dataset.
-    val sampleFile =
-      if (parsedOptions.ignoreExtension) {
-        files.headOption.getOrElse {
-          throw new FileNotFoundException("Files for schema inferring have 
been not found.")
-        }
-      } else {
-        files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
-          throw new FileNotFoundException(
-            "No Avro files found. If files don't have .avro extension, set 
ignoreExtension to true")
-        }
-      }
-
     // User can specify an optional avro json schema.
     val avroSchema = parsedOptions.schema
       .map(new Schema.Parser().parse)
       .getOrElse {
-        val in = new FsInput(sampleFile.getPath, conf)
-        try {
-          val reader = DataFileReader.openReader(in, new 
GenericDatumReader[GenericRecord]())
-          try {
-            reader.getSchema
-          } finally {
-            reader.close()
-          }
-        } finally {
-          in.close()
-        }
-      }
+        inferAvroSchemaFromFiles(files, conf, parsedOptions.ignoreExtension,
+          spark.sessionState.conf.ignoreCorruptFiles)
+    }
 
     SchemaConverters.toSqlType(avroSchema).dataType match {
       case t: StructType => Some(t)
@@ -100,6 +77,51 @@ private[avro] class AvroFileFormat extends FileFormat
     }
   }
 
+  private def inferAvroSchemaFromFiles(
+      files: Seq[FileStatus],
+      conf: Configuration,
+      ignoreExtension: Boolean,
+      ignoreCorruptFiles: Boolean): Schema = {
+    // Schema evolution is not supported yet. Here we only pick first random 
readable sample file to
+    // figure out the schema of the whole dataset.
+    val avroReader = files.iterator.map { f =>
+      val path = f.getPath
+      if (!ignoreExtension && !path.getName.endsWith(".avro")) {
+        None
+      } else {
+        Utils.tryWithResource {
+          new FsInput(path, conf)
+        } { in =>
+          try {
+            Some(DataFileReader.openReader(in, new 
GenericDatumReader[GenericRecord]()))
+          } catch {
+            case e: IOException =>
+              if (ignoreCorruptFiles) {
+                logWarning(s"Skipped the footer in the corrupted file: $path", 
e)
+                None
+              } else {
+                throw new SparkException(s"Could not read file: $path", e)
+              }
+          }
+        }
+      }
+    }.collectFirst {
+      case Some(reader) => reader
+    }
+
+    avroReader match {
+      case Some(reader) =>
+        try {
+          reader.getSchema
+        } finally {
+          reader.close()
+        }
+      case None =>
+        throw new FileNotFoundException(
+          "No Avro files found. If files don't have .avro extension, set 
ignoreExtension to true")
+    }
+  }
+
   override def shortName(): String = "avro"
 
   override def isSplitable(

http://git-wip-us.apache.org/repos/asf/spark/blob/928d0739/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 9ad4388..1e08f7b 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
   import testImplicits._
@@ -342,6 +343,48 @@ class AvroSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils {
     }
   }
 
+  private def createDummyCorruptFile(dir: File): Unit = {
+    Utils.tryWithResource {
+      FileUtils.forceMkdir(dir)
+      val corruptFile = new File(dir, "corrupt.avro")
+      new BufferedWriter(new FileWriter(corruptFile))
+    } { writer =>
+      writer.write("corrupt")
+    }
+  }
+
+  test("Ignore corrupt Avro file if flag IGNORE_CORRUPT_FILES enabled") {
+    withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+      withTempPath { dir =>
+        createDummyCorruptFile(dir)
+        val message = intercept[FileNotFoundException] {
+          spark.read.format("avro").load(dir.getAbsolutePath).schema
+        }.getMessage
+        assert(message.contains("No Avro files found."))
+
+        val srcFile = new File("src/test/resources/episodes.avro")
+        val destFile = new File(dir, "episodes.avro")
+        FileUtils.copyFile(srcFile, destFile)
+
+        val result = 
spark.read.format("avro").load(srcFile.getAbsolutePath).collect()
+        checkAnswer(spark.read.format("avro").load(dir.getAbsolutePath), 
result)
+      }
+    }
+  }
+
+  test("Throws IOException on reading corrupt Avro file if flag 
IGNORE_CORRUPT_FILES disabled") {
+    withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+      withTempPath { dir =>
+        createDummyCorruptFile(dir)
+        val message = intercept[org.apache.spark.SparkException] {
+          spark.read.format("avro").load(dir.getAbsolutePath)
+        }.getMessage
+
+        assert(message.contains("Could not read file"))
+      }
+    }
+  }
+
   test("Date field type") {
     withTempPath { dir =>
       val schema = StructType(Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to