Repository: spark
Updated Branches:
  refs/heads/branch-2.0 10c476fc8 -> 603424c16


[SPARK-13792][SQL] Limit logging of bad records in CSV data source

## What changes were proposed in this pull request?
This pull request adds a new option (maxMalformedLogPerPartition) in CSV reader 
to limit the maximum of logging message Spark generates per partition for 
malformed records.

The error log looks something like
```
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: More than 10 malformed records have been 
found on this partition. Malformed records from now on will not be logged.
```

Closes #12173

## How was this patch tested?
Manually tested.

Author: Reynold Xin <r...@databricks.com>

Closes #13795 from rxin/SPARK-13792.

(cherry picked from commit c775bf09e0c3540f76de3f15d3fd35112a4912c1)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/603424c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/603424c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/603424c1

Branch: refs/heads/branch-2.0
Commit: 603424c161e9be670ee8461053225364cc700515
Parents: 10c476f
Author: Reynold Xin <r...@databricks.com>
Authored: Mon Jun 20 21:46:12 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Jun 20 21:46:20 2016 -0700

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                |  4 ++
 .../org/apache/spark/sql/DataFrameReader.scala  |  2 +
 .../datasources/csv/CSVFileFormat.scala         |  9 ++++-
 .../execution/datasources/csv/CSVOptions.scala  |  2 +
 .../execution/datasources/csv/CSVRelation.scala | 42 +++++++++++++-------
 5 files changed, 44 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/603424c1/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 72fd184..89506ca 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -392,6 +392,10 @@ class DataFrameReader(ReaderUtils):
         :param maxCharsPerColumn: defines the maximum number of characters 
allowed for any given
                                   value being read. If None is set, it uses 
the default value,
                                   ``1000000``.
+        :param maxMalformedLogPerPartition: sets the maximum number of 
malformed rows Spark will
+                                            log for each partition. Malformed 
records beyond this
+                                            number will be ignored. If None is 
set, it
+                                            uses the default value, ``10``.
         :param mode: allows a mode for dealing with corrupt records during 
parsing. If None is
                      set, it uses the default value, ``PERMISSIVE``.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/603424c1/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 841503b..35ba9c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -382,6 +382,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * a record can have.</li>
    * <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number 
of characters allowed
    * for any given value being read.</li>
+   * <li>`maxMalformedLogPerPartition` (default `10`): sets the maximum number 
of malformed rows
+   * Spark will log for each partition. Malformed records beyond this number 
will be ignored.</li>
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt 
records
    *    during parsing.</li>
    * <ul>

http://git-wip-us.apache.org/repos/asf/spark/blob/603424c1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index be52de8..12e19f9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -120,7 +120,14 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
 
       val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, 
headers)
       val parser = CSVRelation.csvParser(dataSchema, 
requiredSchema.fieldNames, csvOptions)
-      tokenizedIterator.flatMap(parser(_).toSeq)
+      var numMalformedRecords = 0
+      tokenizedIterator.flatMap { recordTokens =>
+        val row = parser(recordTokens, numMalformedRecords)
+        if (row.isEmpty) {
+          numMalformedRecords += 1
+        }
+        row
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/603424c1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 9f4ce83..581eda7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -113,6 +113,8 @@ private[sql] class CSVOptions(@transient private val 
parameters: Map[String, Str
 
   val escapeQuotes = getBool("escapeQuotes", true)
 
+  val maxMalformedLogPerPartition = getInt("maxMalformedLogPerPartition", 10)
+
   val inputBufferSize = 128
 
   val isCommentSet = this.comment != '\u0000'

http://git-wip-us.apache.org/repos/asf/spark/blob/603424c1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index d72c8b9..083ac33 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -50,10 +50,19 @@ object CSVRelation extends Logging {
     }
   }
 
+  /**
+   * Returns a function that parses a single CSV record (in the form of an 
array of strings in which
+   * each element represents a column) and turns it into either one resulting 
row or no row (if the
+   * the record is malformed).
+   *
+   * The 2nd argument in the returned function represents the total number of 
malformed rows
+   * observed so far.
+   */
+  // This is pretty convoluted and we should probably rewrite the entire CSV 
parsing soon.
   def csvParser(
       schema: StructType,
       requiredColumns: Array[String],
-      params: CSVOptions): Array[String] => Option[InternalRow] = {
+      params: CSVOptions): (Array[String], Int) => Option[InternalRow] = {
     val schemaFields = schema.fields
     val requiredFields = StructType(requiredColumns.map(schema(_))).fields
     val safeRequiredFields = if (params.dropMalformed) {
@@ -72,9 +81,16 @@ object CSVRelation extends Logging {
     val requiredSize = requiredFields.length
     val row = new GenericMutableRow(requiredSize)
 
-    (tokens: Array[String]) => {
+    (tokens: Array[String], numMalformedRows) => {
       if (params.dropMalformed && schemaFields.length != tokens.length) {
-        logWarning(s"Dropping malformed line: 
${tokens.mkString(params.delimiter.toString)}")
+        if (numMalformedRows < params.maxMalformedLogPerPartition) {
+          logWarning(s"Dropping malformed line: 
${tokens.mkString(params.delimiter.toString)}")
+        }
+        if (numMalformedRows == params.maxMalformedLogPerPartition - 1) {
+          logWarning(
+            s"More than ${params.maxMalformedLogPerPartition} malformed 
records have been " +
+            "found on this partition. Malformed records from now on will not 
be logged.")
+        }
         None
       } else if (params.failFast && schemaFields.length != tokens.length) {
         throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
@@ -109,23 +125,21 @@ object CSVRelation extends Logging {
           Some(row)
         } catch {
           case NonFatal(e) if params.dropMalformed =>
-            logWarning("Parse exception. " +
-              s"Dropping malformed line: 
${tokens.mkString(params.delimiter.toString)}")
+            if (numMalformedRows < params.maxMalformedLogPerPartition) {
+              logWarning("Parse exception. " +
+                s"Dropping malformed line: 
${tokens.mkString(params.delimiter.toString)}")
+            }
+            if (numMalformedRows == params.maxMalformedLogPerPartition - 1) {
+              logWarning(
+                s"More than ${params.maxMalformedLogPerPartition} malformed 
records have been " +
+                "found on this partition. Malformed records from now on will 
not be logged.")
+            }
             None
         }
       }
     }
   }
 
-  def parseCsv(
-      tokenizedRDD: RDD[Array[String]],
-      schema: StructType,
-      requiredColumns: Array[String],
-      options: CSVOptions): RDD[InternalRow] = {
-    val parser = csvParser(schema, requiredColumns, options)
-    tokenizedRDD.flatMap(parser(_).toSeq)
-  }
-
   // Skips the header line of each file if the `header` option is set to true.
   def dropHeaderLine(
       file: PartitionedFile, lines: Iterator[String], csvOptions: CSVOptions): 
Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to