This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a2392be592b [SPARK-41862][SQL] Fix correctness bug related to DEFAULT
values in Orc reader
a2392be592b is described below
commit a2392be592bf6aa75391ea50cbab77cde152f8ce
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Wed Jan 4 09:30:42 2023 +0900
[SPARK-41862][SQL] Fix correctness bug related to DEFAULT values in Orc
reader
### What changes were proposed in this pull request?
This PR fixes a correctness bug related to column DEFAULT values in Orc
reader.
* https://github.com/apache/spark/pull/37280 introduced a performance
regression in the Orc reader.
* https://github.com/apache/spark/pull/39362 fixed the performance
regression, but stopped the column DEFAULT feature from working, causing a
temporary correctness regression that we agreed for me to fix later.
* This PR restores column DEFAULT functionality for Orc scans and fixes the
correctness regression while not reintroducing the performance regression.
### Why are the changes needed?
This PR fixes a correctness bug.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
This PR updates a unit test to exercise that the Orc scan functionality is
correct.
Closes #39370 from dtenedor/fix-perf-bug-orc-reader.
Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../datasources/orc/OrcDeserializer.scala | 71 +++++-----------------
.../org/apache/spark/sql/sources/InsertSuite.scala | 15 +----
2 files changed, 19 insertions(+), 67 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
index 5b207a04ada..5bac404fd53 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
@@ -42,21 +42,26 @@ class OrcDeserializer(
// is always null in this case
// - a function that updates target column `index` otherwise.
private val fieldWriters: Array[WritableComparable[_] => Unit] = {
+ // Assume we create a table backed by Orc files. Then if we later run a
command "ALTER TABLE t
+ // ADD COLUMN c DEFAULT <value>" on the Orc table, this adds one field to
the Catalyst schema.
+ // Then if we query the old files with the new Catalyst schema, we should
only apply the
+ // existence default value to the columns whose IDs are not explicitly
requested.
+ if (requiredSchema.hasExistenceDefaultValues) {
+ for (i <- 0 until requiredSchema.existenceDefaultValues.size) {
+ requiredSchema.existenceDefaultsBitmask(i) =
+ if (requestedColIds(i) != -1) {
+ false
+ } else {
+ requiredSchema.existenceDefaultValues(i) != null
+ }
+ }
+ }
requiredSchema.zipWithIndex
.map { case (f, index) =>
if (requestedColIds(index) == -1) {
null
} else {
- // Create a RowUpdater instance for converting Orc objects to
Catalyst rows. If any fields
- // in the Orc result schema have associated existence default
values, maintain a
- // boolean array to track which fields have been explicitly assigned
for each row.
- val rowUpdater: RowUpdater =
- if (requiredSchema.hasExistenceDefaultValues) {
- resetExistenceDefaultsBitmask(requiredSchema)
- new RowUpdaterWithBitmask(resultRow,
requiredSchema.existenceDefaultsBitmask)
- } else {
- new RowUpdater(resultRow)
- }
+ val rowUpdater = new RowUpdater(resultRow)
val writer = newWriter(f.dataType, rowUpdater)
(value: WritableComparable[_]) => writer(index, value)
}
@@ -93,6 +98,7 @@ class OrcDeserializer(
}
targetColumnIndex += 1
}
+ applyExistenceDefaultValuesToRow(requiredSchema, resultRow)
resultRow
}
@@ -288,49 +294,4 @@ class OrcDeserializer(
override def setDouble(ordinal: Int, value: Double): Unit =
array.setDouble(ordinal, value)
override def setFloat(ordinal: Int, value: Float): Unit =
array.setFloat(ordinal, value)
}
-
- /**
- * Subclass of RowUpdater that also updates a boolean array bitmask. In this
way, after all
- * assignments are complete, it is possible to inspect the bitmask to
determine which columns have
- * been written at least once.
- */
- final class RowUpdaterWithBitmask(
- row: InternalRow, bitmask: Array[Boolean]) extends RowUpdater(row) {
- override def setNullAt(ordinal: Int): Unit = {
- bitmask(ordinal) = false
- super.setNullAt(ordinal)
- }
- override def set(ordinal: Int, value: Any): Unit = {
- bitmask(ordinal) = false
- super.set(ordinal, value)
- }
- override def setBoolean(ordinal: Int, value: Boolean): Unit = {
- bitmask(ordinal) = false
- super.setBoolean(ordinal, value)
- }
- override def setByte(ordinal: Int, value: Byte): Unit = {
- bitmask(ordinal) = false
- super.setByte(ordinal, value)
- }
- override def setShort(ordinal: Int, value: Short): Unit = {
- bitmask(ordinal) = false
- super.setShort(ordinal, value)
- }
- override def setInt(ordinal: Int, value: Int): Unit = {
- bitmask(ordinal) = false
- super.setInt(ordinal, value)
- }
- override def setLong(ordinal: Int, value: Long): Unit = {
- bitmask(ordinal) = false
- super.setLong(ordinal, value)
- }
- override def setDouble(ordinal: Int, value: Double): Unit = {
- bitmask(ordinal) = false
- super.setDouble(ordinal, value)
- }
- override def setFloat(ordinal: Int, value: Float): Unit = {
- bitmask(ordinal) = false
- super.setFloat(ordinal, value)
- }
- }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 7c4a39d6ff4..5df9b2ae598 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1552,7 +1552,6 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
test("INSERT rows, ALTER TABLE ADD COLUMNS with DEFAULTs, then SELECT them")
{
case class Config(
sqlConf: Option[(String, String)],
- insertNullsToStorage: Boolean = true,
useDataFrames: Boolean = false)
def runTest(dataSource: String, config: Config): Unit = {
def insertIntoT(): Unit = {
@@ -1591,10 +1590,7 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
sql("insert into t values(null, null, null)")
}
sql("alter table t add column (x boolean default true)")
- // By default, INSERT commands into some tables (such as JSON) do not
store NULL values.
- // Therefore, if such destination columns have DEFAULT values,
SELECTing the same columns
- // will return the default values (instead of NULL) since nothing is
present in storage.
- val insertedSColumn = if (config.insertNullsToStorage) null else
"abcdef"
+ val insertedSColumn = null
checkAnswer(spark.table("t"),
Seq(
Row("xyz", 42, "abcdef", true),
@@ -1679,8 +1675,7 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
Config(
None),
Config(
- Some(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false"),
- insertNullsToStorage = false))),
+ Some(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false")))),
TestCase(
dataSource = "parquet",
Seq(
@@ -1944,11 +1939,7 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
Row(Seq(Row(1, 2)), Seq(Map(false -> "def", true -> "jkl"))),
Seq(Map(true -> "xyz"))),
Row(2,
- if (config.dataSource != "orc") {
- null
- } else {
- Row(Seq(Row(1, 2)), Seq(Map(false -> "def", true -> "jkl")))
- },
+ null,
Seq(Map(true -> "xyz"))),
Row(3,
Row(Seq(Row(3, 4)), Seq(Map(false -> "mno", true -> "pqr"))),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]