This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 274142b  [SPARK-26859][SQL] Fix field writer index bug in 
non-vectorized ORC deserializer
274142b is described below

commit 274142be08eb3a4239046d7f7260c7284ed041c2
Author: Ivan Vergiliev <ivan.vergil...@gmail.com>
AuthorDate: Wed Feb 20 21:49:38 2019 +0800

    [SPARK-26859][SQL] Fix field writer index bug in non-vectorized ORC 
deserializer
    
    ## What changes were proposed in this pull request?
    
    This happens in a schema evolution use case only when a user specifies the 
schema manually and use non-vectorized ORC deserializer code path.
    
    There is a bug in `OrcDeserializer.scala` that results in `null`s being set 
at the wrong column position, and for state from previous records to remain 
uncleared in next records. There are more details for when exactly the bug gets 
triggered and what the outcome is in the [JIRA 
issue](https://jira.apache.org/jira/browse/SPARK-26859).
    
    The high-level summary is that this bug results in severe data correctness 
issues, but fortunately the set of conditions to expose the bug are complicated 
and make the surface area somewhat small.
    
    This change fixes the problem and adds a respective test.
    
    ## How was this patch tested?
    
    Pass the Jenkins with the newly added test cases.
    
    Closes #23766 from IvanVergiliev/fix-orc-deserializer.
    
    Lead-authored-by: Ivan Vergiliev <ivan.vergil...@gmail.com>
    Co-authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 096552ae4d6fcef5e20c54384a2687db41ba2fa1)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../datasources/orc/OrcDeserializer.scala          | 34 +++++++++++--------
 .../execution/datasources/ReadSchemaSuite.scala    |  6 ++++
 .../sql/execution/datasources/ReadSchemaTest.scala | 39 +++++++++++++++++++++-
 3 files changed, 64 insertions(+), 15 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
index 4ecc54b..decd5c5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
@@ -37,28 +37,34 @@ class OrcDeserializer(
 
   private val resultRow = new 
SpecificInternalRow(requiredSchema.map(_.dataType))
 
+  // `fieldWriters(index)` is
+  // - null if the respective source column is missing, since the output value
+  //   is always null in this case
+  // - a function that updates target column `index` otherwise.
   private val fieldWriters: Array[WritableComparable[_] => Unit] = {
     requiredSchema.zipWithIndex
-      // The value of missing columns are always null, do not need writers.
-      .filterNot { case (_, index) => requestedColIds(index) == -1 }
       .map { case (f, index) =>
-        val writer = newWriter(f.dataType, new RowUpdater(resultRow))
-        (value: WritableComparable[_]) => writer(index, value)
+        if (requestedColIds(index) == -1) {
+          null
+        } else {
+          val writer = newWriter(f.dataType, new RowUpdater(resultRow))
+          (value: WritableComparable[_]) => writer(index, value)
+        }
       }.toArray
   }
 
-  private val validColIds = requestedColIds.filterNot(_ == -1)
-
   def deserialize(orcStruct: OrcStruct): InternalRow = {
-    var i = 0
-    while (i < validColIds.length) {
-      val value = orcStruct.getFieldValue(validColIds(i))
-      if (value == null) {
-        resultRow.setNullAt(i)
-      } else {
-        fieldWriters(i)(value)
+    var targetColumnIndex = 0
+    while (targetColumnIndex < fieldWriters.length) {
+      if (fieldWriters(targetColumnIndex) != null) {
+        val value = orcStruct.getFieldValue(requestedColIds(targetColumnIndex))
+        if (value == null) {
+          resultRow.setNullAt(targetColumnIndex)
+        } else {
+          fieldWriters(targetColumnIndex)(value)
+        }
       }
-      i += 1
+      targetColumnIndex += 1
     }
     resultRow
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala
index 23c58e1..de234c1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala
@@ -72,6 +72,7 @@ class HeaderCSVReadSchemaSuite
 
 class JsonReadSchemaSuite
   extends ReadSchemaSuite
+  with AddColumnIntoTheMiddleTest
   with HideColumnInTheMiddleTest
   with ChangePositionTest
   with IntegralTypeTest
@@ -84,6 +85,7 @@ class JsonReadSchemaSuite
 
 class OrcReadSchemaSuite
   extends ReadSchemaSuite
+  with AddColumnIntoTheMiddleTest
   with HideColumnInTheMiddleTest
   with ChangePositionTest {
 
@@ -103,6 +105,7 @@ class OrcReadSchemaSuite
 
 class VectorizedOrcReadSchemaSuite
   extends ReadSchemaSuite
+  with AddColumnIntoTheMiddleTest
   with HideColumnInTheMiddleTest
   with ChangePositionTest
   with BooleanTypeTest
@@ -125,6 +128,7 @@ class VectorizedOrcReadSchemaSuite
 
 class ParquetReadSchemaSuite
   extends ReadSchemaSuite
+  with AddColumnIntoTheMiddleTest
   with HideColumnInTheMiddleTest
   with ChangePositionTest {
 
@@ -144,6 +148,7 @@ class ParquetReadSchemaSuite
 
 class VectorizedParquetReadSchemaSuite
   extends ReadSchemaSuite
+  with AddColumnIntoTheMiddleTest
   with HideColumnInTheMiddleTest
   with ChangePositionTest {
 
@@ -163,6 +168,7 @@ class VectorizedParquetReadSchemaSuite
 
 class MergedParquetReadSchemaSuite
   extends ReadSchemaSuite
+  with AddColumnIntoTheMiddleTest
   with HideColumnInTheMiddleTest
   with ChangePositionTest {
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaTest.scala
index 2a5457e..17d9d43 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaTest.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaTest.scala
@@ -69,7 +69,7 @@ trait ReadSchemaTest extends QueryTest with SQLTestUtils with 
SharedSQLContext {
 }
 
 /**
- * Add column (Case 1).
+ * Add column (Case 1-1).
  * This test suite assumes that the missing column should be `null`.
  */
 trait AddColumnTest extends ReadSchemaTest {
@@ -109,6 +109,43 @@ trait AddColumnTest extends ReadSchemaTest {
 }
 
 /**
+ * Add column into the middle (Case 1-2).
+ */
+trait AddColumnIntoTheMiddleTest extends ReadSchemaTest {
+  import testImplicits._
+
+  test("append column into middle") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      val df1 = Seq((1, 2, "abc"), (4, 5, "def"), (8, 9, null)).toDF("col1", 
"col2", "col3")
+      val df2 = Seq((10, null, 20, null), (40, "uvw", 50, "xyz"), (80, null, 
90, null))
+        .toDF("col1", "col4", "col2", "col3")
+
+      val dir1 = s"$path${File.separator}part=one"
+      val dir2 = s"$path${File.separator}part=two"
+
+      df1.write.format(format).options(options).save(dir1)
+      df2.write.format(format).options(options).save(dir2)
+
+      val df = spark.read
+        .schema(df2.schema)
+        .format(format)
+        .options(options)
+        .load(path)
+
+      checkAnswer(df, Seq(
+        Row(1, null, 2, "abc", "one"),
+        Row(4, null, 5, "def", "one"),
+        Row(8, null, 9, null, "one"),
+        Row(10, null, 20, null, "two"),
+        Row(40, "uvw", 50, "xyz", "two"),
+        Row(80, null, 90, null, "two")))
+    }
+  }
+}
+
+/**
  * Hide column (Case 2-1).
  */
 trait HideColumnAtTheEndTest extends ReadSchemaTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to