Repository: spark
Updated Branches:
  refs/heads/master eb69335cd -> 47776e7c0


[SPARK-17850][CORE] Add a flag to ignore corrupt files

## What changes were proposed in this pull request?

Add a flag to ignore corrupt files. For Spark core, the configuration is 
`spark.files.ignoreCorruptFiles`. For Spark SQL, it's 
`spark.sql.files.ignoreCorruptFiles`.

## How was this patch tested?

The added unit tests

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #15422 from zsxwing/SPARK-17850.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47776e7c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47776e7c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47776e7c

Branch: refs/heads/master
Commit: 47776e7c0c68590fe446cef910900b1aaead06f9
Parents: eb69335
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed Oct 12 13:51:53 2016 -0700
Committer: Mridul Muralidharan <mmuralidharan@HW11853.local>
Committed: Wed Oct 12 13:51:53 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/internal/config/package.scala  |  5 ++
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  8 ++-
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 10 +++-
 .../test/scala/org/apache/spark/FileSuite.scala | 62 +++++++++++++++++++-
 .../sql/execution/datasources/FileScanRDD.scala | 30 +++++++++-
 .../org/apache/spark/sql/internal/SQLConf.scala |  8 +++
 .../datasources/FileSourceStrategySuite.scala   | 37 +++++++++++-
 7 files changed, 153 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/47776e7c/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 5a71015..517fc3e 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -156,4 +156,9 @@ package object config {
     .doc("Port to use for the block managed on the driver.")
     .fallbackConf(BLOCK_MANAGER_PORT)
 
+  private[spark] val IGNORE_CORRUPT_FILES = 
ConfigBuilder("spark.files.ignoreCorruptFiles")
+    .doc("Whether to ignore corrupt files. If true, the Spark jobs will 
continue to run when " +
+      "encountering corrupt files and contents that have been read will still 
be returned.")
+    .booleanConf
+    .createWithDefault(false)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/47776e7c/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 4640b5d..e1cf393 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import java.io.EOFException
+import java.io.IOException
 import java.text.SimpleDateFormat
 import java.util.Date
 
@@ -43,6 +43,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
 import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
 import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
 import org.apache.spark.storage.StorageLevel
@@ -139,6 +140,8 @@ class HadoopRDD[K, V](
 
   private val shouldCloneJobConf = 
sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
 
+  private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
+
   // Returns a JobConf that will be used on slaves to obtain input splits for 
Hadoop reads.
   protected def getJobConf(): JobConf = {
     val conf: Configuration = broadcastedConf.value.value
@@ -253,8 +256,7 @@ class HadoopRDD[K, V](
         try {
           finished = !reader.next(key, value)
         } catch {
-          case eof: EOFException =>
-            finished = true
+          case e: IOException if ignoreCorruptFiles => finished = true
         }
         if (!finished) {
           inputMetrics.incRecordsRead(1)

http://git-wip-us.apache.org/repos/asf/spark/blob/47776e7c/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 1c7aec9..baf31fb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.rdd
 
+import java.io.IOException
 import java.text.SimpleDateFormat
 import java.util.Date
 
@@ -33,6 +34,7 @@ import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
 import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
@@ -85,6 +87,8 @@ class NewHadoopRDD[K, V](
 
   private val shouldCloneJobConf = 
sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
 
+  private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
+
   def getConf: Configuration = {
     val conf: Configuration = confBroadcast.value.value
     if (shouldCloneJobConf) {
@@ -179,7 +183,11 @@ class NewHadoopRDD[K, V](
 
       override def hasNext: Boolean = {
         if (!finished && !havePair) {
-          finished = !reader.nextKeyValue
+          try {
+            finished = !reader.nextKeyValue
+          } catch {
+            case e: IOException if ignoreCorruptFiles => finished = true
+          }
           if (finished) {
             // Close and release the reader here; close() will also be called 
when the task
             // completes, but for tasks that read from many files, it helps to 
release the

http://git-wip-us.apache.org/repos/asf/spark/blob/47776e7c/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala 
b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 993834f..cc52bb1 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark
 
-import java.io.{File, FileWriter}
+import java.io._
+import java.util.zip.GZIPOutputStream
 
 import scala.io.Source
 
@@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => 
NewFileSplit, TextInp
 import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => 
NewTextOutputFormat}
 
 import org.apache.spark.input.PortableDataStream
+import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
 import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
@@ -541,4 +543,62 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
         }.collect()
     assert(inputPaths.toSet === Set(s"$outDir/part-00000", 
s"$outDir/part-00001"))
   }
+
+  test("spark.files.ignoreCorruptFiles should work both HadoopRDD and 
NewHadoopRDD") {
+    val inputFile = File.createTempFile("input-", ".gz")
+    try {
+      // Create a corrupt gzip file
+      val byteOutput = new ByteArrayOutputStream()
+      val gzip = new GZIPOutputStream(byteOutput)
+      try {
+        gzip.write(Array[Byte](1, 2, 3, 4))
+      } finally {
+        gzip.close()
+      }
+      val bytes = byteOutput.toByteArray
+      val o = new FileOutputStream(inputFile)
+      try {
+        // It's corrupt since we only write half of bytes into the file.
+        o.write(bytes.take(bytes.length / 2))
+      } finally {
+        o.close()
+      }
+
+      // Reading a corrupt gzip file should throw EOFException
+      sc = new SparkContext("local", "test")
+      // Test HadoopRDD
+      var e = intercept[SparkException] {
+        sc.textFile(inputFile.toURI.toString).collect()
+      }
+      assert(e.getCause.isInstanceOf[EOFException])
+      assert(e.getCause.getMessage === "Unexpected end of input stream")
+      // Test NewHadoopRDD
+      e = intercept[SparkException] {
+        sc.newAPIHadoopFile(
+          inputFile.toURI.toString,
+          classOf[NewTextInputFormat],
+          classOf[LongWritable],
+          classOf[Text]).collect()
+      }
+      assert(e.getCause.isInstanceOf[EOFException])
+      assert(e.getCause.getMessage === "Unexpected end of input stream")
+      sc.stop()
+
+      val conf = new SparkConf().set(IGNORE_CORRUPT_FILES, true)
+      sc = new SparkContext("local", "test", conf)
+      // Test HadoopRDD
+      assert(sc.textFile(inputFile.toURI.toString).collect().isEmpty)
+      // Test NewHadoopRDD
+      assert {
+        sc.newAPIHadoopFile(
+          inputFile.toURI.toString,
+          classOf[NewTextInputFormat],
+          classOf[LongWritable],
+          classOf[Text]).collect().isEmpty
+      }
+    } finally {
+      inputFile.delete()
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/47776e7c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index c66da3a..8994457 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.io.IOException
+
 import scala.collection.mutable
 
 import org.apache.spark.{Partition => RDDPartition, TaskContext}
@@ -25,6 +27,7 @@ import org.apache.spark.rdd.{InputFileNameHolder, RDD}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.vectorized.ColumnarBatch
+import org.apache.spark.util.NextIterator
 
 /**
  * A part (i.e. "block") of a single file that should be read, along with 
partition column values
@@ -62,6 +65,8 @@ class FileScanRDD(
     @transient val filePartitions: Seq[FilePartition])
   extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
 
+  private val ignoreCorruptFiles = 
sparkSession.sessionState.conf.ignoreCorruptFiles
+
   override def compute(split: RDDPartition, context: TaskContext): 
Iterator[InternalRow] = {
     val iterator = new Iterator[Object] with AutoCloseable {
       private val inputMetrics = context.taskMetrics().inputMetrics
@@ -119,7 +124,30 @@ class FileScanRDD(
           InputFileNameHolder.setInputFileName(currentFile.filePath)
 
           try {
-            currentIterator = readFunction(currentFile)
+            if (ignoreCorruptFiles) {
+              currentIterator = new NextIterator[Object] {
+                private val internalIter = readFunction(currentFile)
+
+                override def getNext(): AnyRef = {
+                  try {
+                    if (internalIter.hasNext) {
+                      internalIter.next()
+                    } else {
+                      finished = true
+                      null
+                    }
+                  } catch {
+                    case e: IOException =>
+                      finished = true
+                      null
+                  }
+                }
+
+                override def close(): Unit = {}
+              }
+            } else {
+              currentIterator = readFunction(currentFile)
+            }
           } catch {
             case e: java.io.FileNotFoundException =>
               throw new java.io.FileNotFoundException(

http://git-wip-us.apache.org/repos/asf/spark/blob/47776e7c/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8cbfc4c..9e7c1ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -576,6 +576,12 @@ object SQLConf {
       .doubleConf
       .createWithDefault(0.05)
 
+  val IGNORE_CORRUPT_FILES = 
SQLConfigBuilder("spark.sql.files.ignoreCorruptFiles")
+    .doc("Whether to ignore corrupt files. If true, the Spark jobs will 
continue to run when " +
+      "encountering corrupt files and contents that have been read will still 
be returned.")
+    .booleanConf
+    .createWithDefault(false)
+
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
   }
@@ -743,6 +749,8 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf with Logging {
 
   def warehousePath: String = new Path(getConf(WAREHOUSE_PATH)).toString
 
+  def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
+
   override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
 
   override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)

http://git-wip-us.apache.org/repos/asf/spark/blob/47776e7c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 45411fa..c5deb31 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import java.io.File
+import java.io._
 import java.util.concurrent.atomic.AtomicInteger
+import java.util.zip.GZIPOutputStream
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, 
RawLocalFileSystem}
@@ -441,6 +442,40 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
     }
   }
 
+  test("spark.files.ignoreCorruptFiles should work in SQL") {
+    val inputFile = File.createTempFile("input-", ".gz")
+    try {
+      // Create a corrupt gzip file
+      val byteOutput = new ByteArrayOutputStream()
+      val gzip = new GZIPOutputStream(byteOutput)
+      try {
+        gzip.write(Array[Byte](1, 2, 3, 4))
+      } finally {
+        gzip.close()
+      }
+      val bytes = byteOutput.toByteArray
+      val o = new FileOutputStream(inputFile)
+      try {
+        // It's corrupt since we only write half of bytes into the file.
+        o.write(bytes.take(bytes.length / 2))
+      } finally {
+        o.close()
+      }
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+        val e = intercept[SparkException] {
+          spark.read.text(inputFile.toURI.toString).collect()
+        }
+        assert(e.getCause.isInstanceOf[EOFException])
+        assert(e.getCause.getMessage === "Unexpected end of input stream")
+      }
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+        assert(spark.read.text(inputFile.toURI.toString).collect().isEmpty)
+      }
+    } finally {
+      inputFile.delete()
+    }
+  }
+
   // Helpers for checking the arguments passed to the FileFormat.
 
   protected val checkPartitionSchema =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to