This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new b3f7989  [SPARK-32672][SQL] Fix data corruption in boolean bit set 
compression
b3f7989 is described below

commit b3f79890f9a7d70c7d29f2ca1058540a4ed50e5e
Author: Robert (Bobby) Evans <[email protected]>
AuthorDate: Sat Aug 22 11:07:14 2020 +0900

    [SPARK-32672][SQL] Fix data corruption in boolean bit set compression
    
    ## What changes were proposed in this pull request?
    
    This fixed SPARK-32672 a data corruption.  Essentially the BooleanBitSet 
CompressionScheme would miss nulls at the end of a CompressedBatch.  The values 
would then default to false.
    
    ### Why are the changes needed?
    It fixes data corruption
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    I manually tested it against the original issue that was producing errors 
for me.  I also added in a unit test.
    
    Closes #29506 from revans2/SPARK-32672.
    
    Authored-by: Robert (Bobby) Evans <[email protected]>
    Signed-off-by: HyukjinKwon <[email protected]>
    (cherry picked from commit 12f4331b9eb563cb0cfbf6a241d1d085ca4f7676)
    Signed-off-by: HyukjinKwon <[email protected]>
---
 .../columnar/compression/compressionSchemes.scala  |  6 ++---
 .../columnar/compression/BooleanBitSetSuite.scala  | 26 ++++++++++++++++++++++
 2 files changed, 29 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
index 00a1d54..3cc59af 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
@@ -318,7 +318,8 @@ private[columnar] case object RunLengthEncoding extends 
CompressionScheme {
       var valueCountLocal = 0
       var currentValueLocal: Long = 0
 
-      while (valueCountLocal < runLocal || (pos < capacity)) {
+      while (pos < capacity) {
+        assert(valueCountLocal <= runLocal)
         if (pos != nextNullIndex) {
           if (valueCountLocal == runLocal) {
             currentValueLocal = getFunction(buffer)
@@ -616,7 +617,6 @@ private[columnar] case object BooleanBitSet extends 
CompressionScheme {
     override def hasNext: Boolean = visited < count
 
     override def decompress(columnVector: WritableColumnVector, capacity: 
Int): Unit = {
-      val countLocal = count
       var currentWordLocal: Long = 0
       var visitedLocal: Int = 0
       val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder())
@@ -626,7 +626,7 @@ private[columnar] case object BooleanBitSet extends 
CompressionScheme {
       var pos = 0
       var seenNulls = 0
 
-      while (visitedLocal < countLocal) {
+      while (pos < capacity) {
         if (pos != nextNullIndex) {
           val bit = visitedLocal % BITS_PER_LONG
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
index 2d71a42..1c229a33 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
@@ -156,4 +156,30 @@ class BooleanBitSetSuite extends SparkFunSuite {
   test(s"$BooleanBitSet: multiple words and 1 more bit for decompression()") {
     skeletonForDecompress(BITS_PER_LONG * 2 + 1)
   }
+
+  test(s"$BooleanBitSet: Only nulls for decompression()") {
+    val builder = TestCompressibleColumnBuilder(new NoopColumnStats, BOOLEAN, 
BooleanBitSet)
+    val numRows = 10
+
+    val rows = Seq.fill[InternalRow](numRows)({
+      val row = new GenericInternalRow(1)
+      row.setNullAt(0)
+      row
+    })
+    rows.foreach(builder.appendFrom(_, 0))
+    val buffer = builder.build()
+
+    // Rewinds, skips column header and 4 more bytes for compression scheme ID
+    val headerSize = CompressionScheme.columnHeaderSize(buffer)
+    buffer.position(headerSize)
+    assertResult(BooleanBitSet.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
+
+    val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
+    val columnVector = new OnHeapColumnVector(numRows, BooleanType)
+    decoder.decompress(columnVector, numRows)
+
+    (0 until numRows).foreach { rowNum =>
+      assert(columnVector.isNullAt(rowNum))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to