This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a2392be592b [SPARK-41862][SQL] Fix correctness bug related to DEFAULT 
values in Orc reader
a2392be592b is described below

commit a2392be592bf6aa75391ea50cbab77cde152f8ce
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Wed Jan 4 09:30:42 2023 +0900

    [SPARK-41862][SQL] Fix correctness bug related to DEFAULT values in Orc 
reader
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a correctness bug related to column DEFAULT values in Orc 
reader.
    
    * https://github.com/apache/spark/pull/37280 introduced a performance 
regression in the Orc reader.
    * https://github.com/apache/spark/pull/39362 fixed the performance 
regression, but stopped the column DEFAULT feature from working, causing a 
temporary correctness regression that we agreed for me to fix later.
    * This PR restores column DEFAULT functionality for Orc scans and fixes the 
correctness regression while not reintroducing the performance regression.
    
    ### Why are the changes needed?
    
    This PR fixes a correctness bug.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    This PR updates a unit test to exercise that the Orc scan functionality is 
correct.
    
    Closes #39370 from dtenedor/fix-perf-bug-orc-reader.
    
    Authored-by: Daniel Tenedorio <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../datasources/orc/OrcDeserializer.scala          | 71 +++++-----------------
 .../org/apache/spark/sql/sources/InsertSuite.scala | 15 +----
 2 files changed, 19 insertions(+), 67 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
index 5b207a04ada..5bac404fd53 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
@@ -42,21 +42,26 @@ class OrcDeserializer(
   //   is always null in this case
   // - a function that updates target column `index` otherwise.
   private val fieldWriters: Array[WritableComparable[_] => Unit] = {
+    // Assume we create a table backed by Orc files. Then if we later run a 
command "ALTER TABLE t
+    // ADD COLUMN c DEFAULT <value>" on the Orc table, this adds one field to 
the Catalyst schema.
+    // Then if we query the old files with the new Catalyst schema, we should 
only apply the
+    // existence default value to the columns whose IDs are not explicitly 
requested.
+    if (requiredSchema.hasExistenceDefaultValues) {
+      for (i <- 0 until requiredSchema.existenceDefaultValues.size) {
+        requiredSchema.existenceDefaultsBitmask(i) =
+          if (requestedColIds(i) != -1) {
+            false
+          } else {
+            requiredSchema.existenceDefaultValues(i) != null
+          }
+      }
+    }
     requiredSchema.zipWithIndex
       .map { case (f, index) =>
         if (requestedColIds(index) == -1) {
           null
         } else {
-          // Create a RowUpdater instance for converting Orc objects to 
Catalyst rows. If any fields
-          // in the Orc result schema have associated existence default 
values, maintain a
-          // boolean array to track which fields have been explicitly assigned 
for each row.
-          val rowUpdater: RowUpdater =
-            if (requiredSchema.hasExistenceDefaultValues) {
-              resetExistenceDefaultsBitmask(requiredSchema)
-              new RowUpdaterWithBitmask(resultRow, 
requiredSchema.existenceDefaultsBitmask)
-            } else {
-              new RowUpdater(resultRow)
-            }
+          val rowUpdater = new RowUpdater(resultRow)
           val writer = newWriter(f.dataType, rowUpdater)
           (value: WritableComparable[_]) => writer(index, value)
         }
@@ -93,6 +98,7 @@ class OrcDeserializer(
       }
       targetColumnIndex += 1
     }
+    applyExistenceDefaultValuesToRow(requiredSchema, resultRow)
     resultRow
   }
 
@@ -288,49 +294,4 @@ class OrcDeserializer(
     override def setDouble(ordinal: Int, value: Double): Unit = 
array.setDouble(ordinal, value)
     override def setFloat(ordinal: Int, value: Float): Unit = 
array.setFloat(ordinal, value)
   }
-
-  /**
-   * Subclass of RowUpdater that also updates a boolean array bitmask. In this 
way, after all
-   * assignments are complete, it is possible to inspect the bitmask to 
determine which columns have
-   * been written at least once.
-   */
-  final class RowUpdaterWithBitmask(
-      row: InternalRow, bitmask: Array[Boolean]) extends RowUpdater(row) {
-    override def setNullAt(ordinal: Int): Unit = {
-      bitmask(ordinal) = false
-      super.setNullAt(ordinal)
-    }
-    override def set(ordinal: Int, value: Any): Unit = {
-      bitmask(ordinal) = false
-      super.set(ordinal, value)
-    }
-    override def setBoolean(ordinal: Int, value: Boolean): Unit = {
-      bitmask(ordinal) = false
-      super.setBoolean(ordinal, value)
-    }
-    override def setByte(ordinal: Int, value: Byte): Unit = {
-      bitmask(ordinal) = false
-      super.setByte(ordinal, value)
-    }
-    override def setShort(ordinal: Int, value: Short): Unit = {
-      bitmask(ordinal) = false
-      super.setShort(ordinal, value)
-    }
-    override def setInt(ordinal: Int, value: Int): Unit = {
-      bitmask(ordinal) = false
-      super.setInt(ordinal, value)
-    }
-    override def setLong(ordinal: Int, value: Long): Unit = {
-      bitmask(ordinal) = false
-      super.setLong(ordinal, value)
-    }
-    override def setDouble(ordinal: Int, value: Double): Unit = {
-      bitmask(ordinal) = false
-      super.setDouble(ordinal, value)
-    }
-    override def setFloat(ordinal: Int, value: Float): Unit = {
-      bitmask(ordinal) = false
-      super.setFloat(ordinal, value)
-    }
-  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 7c4a39d6ff4..5df9b2ae598 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1552,7 +1552,6 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
   test("INSERT rows, ALTER TABLE ADD COLUMNS with DEFAULTs, then SELECT them") 
{
     case class Config(
         sqlConf: Option[(String, String)],
-        insertNullsToStorage: Boolean = true,
         useDataFrames: Boolean = false)
     def runTest(dataSource: String, config: Config): Unit = {
       def insertIntoT(): Unit = {
@@ -1591,10 +1590,7 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
           sql("insert into t values(null, null, null)")
         }
         sql("alter table t add column (x boolean default true)")
-        // By default, INSERT commands into some tables (such as JSON) do not 
store NULL values.
-        // Therefore, if such destination columns have DEFAULT values, 
SELECTing the same columns
-        // will return the default values (instead of NULL) since nothing is 
present in storage.
-        val insertedSColumn = if (config.insertNullsToStorage) null else 
"abcdef"
+        val insertedSColumn = null
         checkAnswer(spark.table("t"),
           Seq(
             Row("xyz", 42, "abcdef", true),
@@ -1679,8 +1675,7 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
           Config(
             None),
           Config(
-            Some(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false"),
-            insertNullsToStorage = false))),
+            Some(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false")))),
       TestCase(
         dataSource = "parquet",
         Seq(
@@ -1944,11 +1939,7 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
               Row(Seq(Row(1, 2)), Seq(Map(false -> "def", true -> "jkl"))),
               Seq(Map(true -> "xyz"))),
             Row(2,
-              if (config.dataSource != "orc") {
-                null
-              } else {
-                Row(Seq(Row(1, 2)), Seq(Map(false -> "def", true -> "jkl")))
-              },
+              null,
               Seq(Map(true -> "xyz"))),
             Row(3,
               Row(Seq(Row(3, 4)), Seq(Map(false -> "mno", true -> "pqr"))),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to