This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new b3f7989 [SPARK-32672][SQL] Fix data corruption in boolean bit set
compression
b3f7989 is described below
commit b3f79890f9a7d70c7d29f2ca1058540a4ed50e5e
Author: Robert (Bobby) Evans <[email protected]>
AuthorDate: Sat Aug 22 11:07:14 2020 +0900
[SPARK-32672][SQL] Fix data corruption in boolean bit set compression
## What changes were proposed in this pull request?
This fixed SPARK-32672 a data corruption. Essentially the BooleanBitSet
CompressionScheme would miss nulls at the end of a CompressedBatch. The values
would then default to false.
### Why are the changes needed?
It fixes data corruption
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
I manually tested it against the original issue that was producing errors
for me. I also added in a unit test.
Closes #29506 from revans2/SPARK-32672.
Authored-by: Robert (Bobby) Evans <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 12f4331b9eb563cb0cfbf6a241d1d085ca4f7676)
Signed-off-by: HyukjinKwon <[email protected]>
---
.../columnar/compression/compressionSchemes.scala | 6 ++---
.../columnar/compression/BooleanBitSetSuite.scala | 26 ++++++++++++++++++++++
2 files changed, 29 insertions(+), 3 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
index 00a1d54..3cc59af 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
@@ -318,7 +318,8 @@ private[columnar] case object RunLengthEncoding extends
CompressionScheme {
var valueCountLocal = 0
var currentValueLocal: Long = 0
- while (valueCountLocal < runLocal || (pos < capacity)) {
+ while (pos < capacity) {
+ assert(valueCountLocal <= runLocal)
if (pos != nextNullIndex) {
if (valueCountLocal == runLocal) {
currentValueLocal = getFunction(buffer)
@@ -616,7 +617,6 @@ private[columnar] case object BooleanBitSet extends
CompressionScheme {
override def hasNext: Boolean = visited < count
override def decompress(columnVector: WritableColumnVector, capacity:
Int): Unit = {
- val countLocal = count
var currentWordLocal: Long = 0
var visitedLocal: Int = 0
val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder())
@@ -626,7 +626,7 @@ private[columnar] case object BooleanBitSet extends
CompressionScheme {
var pos = 0
var seenNulls = 0
- while (visitedLocal < countLocal) {
+ while (pos < capacity) {
if (pos != nextNullIndex) {
val bit = visitedLocal % BITS_PER_LONG
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
index 2d71a42..1c229a33 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
@@ -156,4 +156,30 @@ class BooleanBitSetSuite extends SparkFunSuite {
test(s"$BooleanBitSet: multiple words and 1 more bit for decompression()") {
skeletonForDecompress(BITS_PER_LONG * 2 + 1)
}
+
+ test(s"$BooleanBitSet: Only nulls for decompression()") {
+ val builder = TestCompressibleColumnBuilder(new NoopColumnStats, BOOLEAN,
BooleanBitSet)
+ val numRows = 10
+
+ val rows = Seq.fill[InternalRow](numRows)({
+ val row = new GenericInternalRow(1)
+ row.setNullAt(0)
+ row
+ })
+ rows.foreach(builder.appendFrom(_, 0))
+ val buffer = builder.build()
+
+ // Rewinds, skips column header and 4 more bytes for compression scheme ID
+ val headerSize = CompressionScheme.columnHeaderSize(buffer)
+ buffer.position(headerSize)
+ assertResult(BooleanBitSet.typeId, "Wrong compression scheme
ID")(buffer.getInt())
+
+ val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
+ val columnVector = new OnHeapColumnVector(numRows, BooleanType)
+ decoder.decompress(columnVector, numRows)
+
+ (0 until numRows).foreach { rowNum =>
+ assert(columnVector.isNullAt(rowNum))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]