Repository: spark
Updated Branches:
  refs/heads/master 982f3223b -> d556b3170


[SPARK-18699][SQL][FOLLOWUP] Add explanation in CSV parser and minor cleanup

## What changes were proposed in this pull request?

This PR suggests adding some comments in `UnivocityParser` logics to explain 
what happens. Also, it proposes, IMHO, a little bit cleaner (at least easy for 
me to explain).

## How was this patch tested?

Unit tests in `CSVSuite`.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #17142 from HyukjinKwon/SPARK-18699.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d556b317
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d556b317
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d556b317

Branch: refs/heads/master
Commit: d556b317038455dc25e193f3add723fccdc54958
Parents: 982f322
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Fri Mar 3 00:50:58 2017 -0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Mar 3 00:50:58 2017 -0800

----------------------------------------------------------------------
 .../datasources/csv/UnivocityParser.scala       | 97 ++++++++++++++------
 1 file changed, 68 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d556b317/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 804031a..3b3b87e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -54,39 +54,77 @@ private[csv] class UnivocityParser(
 
   private val dataSchema = StructType(schema.filter(_.name != 
options.columnNameOfCorruptRecord))
 
-  private val valueConverters =
-    dataSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, 
options)).toArray
-
   private val tokenizer = new CsvParser(options.asParserSettings)
 
   private var numMalformedRecords = 0
 
   private val row = new GenericInternalRow(requiredSchema.length)
 
-  // This gets the raw input that is parsed lately.
+  // In `PERMISSIVE` parse mode, we should be able to put the raw malformed 
row into the field
+  // specified in `columnNameOfCorruptRecord`. The raw input is retrieved by 
this method.
   private def getCurrentInput(): String = 
tokenizer.getContext.currentParsedContent().stripLineEnd
 
-  // This parser loads an `indexArr._1`-th position value in input tokens,
-  // then put the value in `row(indexArr._2)`.
-  private val indexArr: Array[(Int, Int)] = {
-    val fields = if (options.dropMalformed) {
-      // If `dropMalformed` is enabled, then it needs to parse all the values
-      // so that we can decide which row is malformed.
-      requiredSchema ++ schema.filterNot(requiredSchema.contains(_))
-    } else {
-      requiredSchema
-    }
-    // TODO: Revisit this; we need to clean up code here for readability.
-    // See an URL below for related discussions:
-    // https://github.com/apache/spark/pull/16928#discussion_r102636720
-    val fieldsWithIndexes = fields.zipWithIndex
-    corruptFieldIndex.map { case corrFieldIndex =>
-      fieldsWithIndexes.filter { case (_, i) => i != corrFieldIndex }
-    }.getOrElse {
-      fieldsWithIndexes
-    }.map { case (f, i) =>
-      (dataSchema.indexOf(f), i)
-    }.toArray
+  // This parser loads an `tokenIndexArr`-th position value in input tokens,
+  // then put the value in `row(rowIndexArr)`.
+  //
+  // For example, let's say there is CSV data as below:
+  //
+  //   a,b,c
+  //   1,2,A
+  //
+  // Also, let's say `columnNameOfCorruptRecord` is set to "_unparsed", 
`header` is `true`
+  // by user and the user selects "c", "b", "_unparsed" and "a" fields. In 
this case, we need
+  // to map those values below:
+  //
+  //   required schema - ["c", "b", "_unparsed", "a"]
+  //   CSV data schema - ["a", "b", "c"]
+  //   required CSV data schema - ["c", "b", "a"]
+  //
+  // with the input tokens,
+  //
+  //   input tokens - [1, 2, "A"]
+  //
+  // Each input token is placed in each output row's position by mapping 
these. In this case,
+  //
+  //   output row - ["A", 2, null, 1]
+  //
+  // In more details,
+  // - `valueConverters`, input tokens - CSV data schema
+  //   `valueConverters` keeps the positions of input token indices (by its 
index) to each
+  //   value's converter (by its value) in an order of CSV data schema. In 
this case,
+  //   [string->int, string->int, string->string].
+  //
+  // - `tokenIndexArr`, input tokens - required CSV data schema
+  //   `tokenIndexArr` keeps the positions of input token indices (by its 
index) to reordered
+  //   fields given the required CSV data schema (by its value). In this case, 
[2, 1, 0].
+  //
+  // - `rowIndexArr`, input tokens - required schema
+  //   `rowIndexArr` keeps the positions of input token indices (by its index) 
to reordered
+  //   field indices given the required schema (by its value). In this case, 
[0, 1, 3].
+  private val valueConverters: Array[ValueConverter] =
+    dataSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, 
options)).toArray
+
+  // Only used to create both `tokenIndexArr` and `rowIndexArr`. This variable 
means
+  // the fields that we should try to convert.
+  private val reorderedFields = if (options.dropMalformed) {
+    // If `dropMalformed` is enabled, then it needs to parse all the values
+    // so that we can decide which row is malformed.
+    requiredSchema ++ schema.filterNot(requiredSchema.contains(_))
+  } else {
+    requiredSchema
+  }
+
+  private val tokenIndexArr: Array[Int] = {
+    reorderedFields
+      .filter(_.name != options.columnNameOfCorruptRecord)
+      .map(f => dataSchema.indexOf(f)).toArray
+  }
+
+  private val rowIndexArr: Array[Int] = if (corruptFieldIndex.isDefined) {
+    val corrFieldIndex = corruptFieldIndex.get
+    reorderedFields.indices.filter(_ != corrFieldIndex).toArray
+  } else {
+    reorderedFields.indices.toArray
   }
 
   /**
@@ -200,14 +238,15 @@ private[csv] class UnivocityParser(
   private def convert(tokens: Array[String]): Option[InternalRow] = {
     convertWithParseMode(tokens) { tokens =>
       var i: Int = 0
-      while (i < indexArr.length) {
-        val (pos, rowIdx) = indexArr(i)
+      while (i < tokenIndexArr.length) {
         // It anyway needs to try to parse since it decides if this row is 
malformed
         // or not after trying to cast in `DROPMALFORMED` mode even if the 
casted
         // value is not stored in the row.
-        val value = valueConverters(pos).apply(tokens(pos))
+        val from = tokenIndexArr(i)
+        val to = rowIndexArr(i)
+        val value = valueConverters(from).apply(tokens(from))
         if (i < requiredSchema.length) {
-          row(rowIdx) = value
+          row(to) = value
         }
         i += 1
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to