This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 274142b [SPARK-26859][SQL] Fix field writer index bug in non-vectorized ORC deserializer 274142b is described below commit 274142be08eb3a4239046d7f7260c7284ed041c2 Author: Ivan Vergiliev <ivan.vergil...@gmail.com> AuthorDate: Wed Feb 20 21:49:38 2019 +0800 [SPARK-26859][SQL] Fix field writer index bug in non-vectorized ORC deserializer ## What changes were proposed in this pull request? This happens in a schema evolution use case only when a user specifies the schema manually and use non-vectorized ORC deserializer code path. There is a bug in `OrcDeserializer.scala` that results in `null`s being set at the wrong column position, and for state from previous records to remain uncleared in next records. There are more details for when exactly the bug gets triggered and what the outcome is in the [JIRA issue](https://jira.apache.org/jira/browse/SPARK-26859). The high-level summary is that this bug results in severe data correctness issues, but fortunately the set of conditions to expose the bug are complicated and make the surface area somewhat small. This change fixes the problem and adds a respective test. ## How was this patch tested? Pass the Jenkins with the newly added test cases. Closes #23766 from IvanVergiliev/fix-orc-deserializer. Lead-authored-by: Ivan Vergiliev <ivan.vergil...@gmail.com> Co-authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 096552ae4d6fcef5e20c54384a2687db41ba2fa1) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../datasources/orc/OrcDeserializer.scala | 34 +++++++++++-------- .../execution/datasources/ReadSchemaSuite.scala | 6 ++++ .../sql/execution/datasources/ReadSchemaTest.scala | 39 +++++++++++++++++++++- 3 files changed, 64 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala index 4ecc54b..decd5c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala @@ -37,28 +37,34 @@ class OrcDeserializer( private val resultRow = new SpecificInternalRow(requiredSchema.map(_.dataType)) + // `fieldWriters(index)` is + // - null if the respective source column is missing, since the output value + // is always null in this case + // - a function that updates target column `index` otherwise. private val fieldWriters: Array[WritableComparable[_] => Unit] = { requiredSchema.zipWithIndex - // The value of missing columns are always null, do not need writers. - .filterNot { case (_, index) => requestedColIds(index) == -1 } .map { case (f, index) => - val writer = newWriter(f.dataType, new RowUpdater(resultRow)) - (value: WritableComparable[_]) => writer(index, value) + if (requestedColIds(index) == -1) { + null + } else { + val writer = newWriter(f.dataType, new RowUpdater(resultRow)) + (value: WritableComparable[_]) => writer(index, value) + } }.toArray } - private val validColIds = requestedColIds.filterNot(_ == -1) - def deserialize(orcStruct: OrcStruct): InternalRow = { - var i = 0 - while (i < validColIds.length) { - val value = orcStruct.getFieldValue(validColIds(i)) - if (value == null) { - resultRow.setNullAt(i) - } else { - fieldWriters(i)(value) + var targetColumnIndex = 0 + while (targetColumnIndex < fieldWriters.length) { + if (fieldWriters(targetColumnIndex) != null) { + val value = orcStruct.getFieldValue(requestedColIds(targetColumnIndex)) + if (value == null) { + resultRow.setNullAt(targetColumnIndex) + } else { + fieldWriters(targetColumnIndex)(value) + } } - i += 1 + targetColumnIndex += 1 } resultRow } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala index 23c58e1..de234c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala @@ -72,6 +72,7 @@ class HeaderCSVReadSchemaSuite class JsonReadSchemaSuite extends ReadSchemaSuite + with AddColumnIntoTheMiddleTest with HideColumnInTheMiddleTest with ChangePositionTest with IntegralTypeTest @@ -84,6 +85,7 @@ class JsonReadSchemaSuite class OrcReadSchemaSuite extends ReadSchemaSuite + with AddColumnIntoTheMiddleTest with HideColumnInTheMiddleTest with ChangePositionTest { @@ -103,6 +105,7 @@ class OrcReadSchemaSuite class VectorizedOrcReadSchemaSuite extends ReadSchemaSuite + with AddColumnIntoTheMiddleTest with HideColumnInTheMiddleTest with ChangePositionTest with BooleanTypeTest @@ -125,6 +128,7 @@ class VectorizedOrcReadSchemaSuite class ParquetReadSchemaSuite extends ReadSchemaSuite + with AddColumnIntoTheMiddleTest with HideColumnInTheMiddleTest with ChangePositionTest { @@ -144,6 +148,7 @@ class ParquetReadSchemaSuite class VectorizedParquetReadSchemaSuite extends ReadSchemaSuite + with AddColumnIntoTheMiddleTest with HideColumnInTheMiddleTest with ChangePositionTest { @@ -163,6 +168,7 @@ class VectorizedParquetReadSchemaSuite class MergedParquetReadSchemaSuite extends ReadSchemaSuite + with AddColumnIntoTheMiddleTest with HideColumnInTheMiddleTest with ChangePositionTest { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaTest.scala index 2a5457e..17d9d43 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaTest.scala @@ -69,7 +69,7 @@ trait ReadSchemaTest extends QueryTest with SQLTestUtils with SharedSQLContext { } /** - * Add column (Case 1). + * Add column (Case 1-1). * This test suite assumes that the missing column should be `null`. */ trait AddColumnTest extends ReadSchemaTest { @@ -109,6 +109,43 @@ trait AddColumnTest extends ReadSchemaTest { } /** + * Add column into the middle (Case 1-2). + */ +trait AddColumnIntoTheMiddleTest extends ReadSchemaTest { + import testImplicits._ + + test("append column into middle") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val df1 = Seq((1, 2, "abc"), (4, 5, "def"), (8, 9, null)).toDF("col1", "col2", "col3") + val df2 = Seq((10, null, 20, null), (40, "uvw", 50, "xyz"), (80, null, 90, null)) + .toDF("col1", "col4", "col2", "col3") + + val dir1 = s"$path${File.separator}part=one" + val dir2 = s"$path${File.separator}part=two" + + df1.write.format(format).options(options).save(dir1) + df2.write.format(format).options(options).save(dir2) + + val df = spark.read + .schema(df2.schema) + .format(format) + .options(options) + .load(path) + + checkAnswer(df, Seq( + Row(1, null, 2, "abc", "one"), + Row(4, null, 5, "def", "one"), + Row(8, null, 9, null, "one"), + Row(10, null, 20, null, "two"), + Row(40, "uvw", 50, "xyz", "two"), + Row(80, null, 90, null, "two"))) + } + } +} + +/** * Hide column (Case 2-1). */ trait HideColumnAtTheEndTest extends ReadSchemaTest { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org