Repository: spark Updated Branches: refs/heads/master 10e37f6eb -> d2b2932d8
[SPARK-22092] Reallocation in OffHeapColumnVector.reserveInternal corrupts struct and array data ## What changes were proposed in this pull request? `OffHeapColumnVector.reserveInternal()` will only copy already inserted values during reallocation if `data != null`. In vectors containing arrays or structs this is incorrect, since there field `data` is not used at all. We need to check `nulls` instead. ## How was this patch tested? Adds new tests to `ColumnVectorSuite` that reproduce the errors. Author: Ala Luszczak <a...@databricks.com> Closes #19308 from ala/vector-realloc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2b2932d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2b2932d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2b2932d Branch: refs/heads/master Commit: d2b2932d8be01dee31983121f6fffd16177bf48a Parents: 10e37f6 Author: Ala Luszczak <a...@databricks.com> Authored: Fri Sep 22 15:31:43 2017 +0200 Committer: Herman van Hovell <hvanhov...@databricks.com> Committed: Fri Sep 22 15:31:43 2017 +0200 ---------------------------------------------------------------------- .../vectorized/OffHeapColumnVector.java | 2 +- .../vectorized/ColumnVectorSuite.scala | 26 ++++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d2b2932d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 3568275..e1d3685 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -515,7 +515,7 @@ public final class OffHeapColumnVector extends WritableColumnVector { // Split out the slow path. @Override protected void reserveInternal(int newCapacity) { - int oldCapacity = (this.data == 0L) ? 0 : capacity; + int oldCapacity = (nulls == 0L) ? 0 : capacity; if (this.resultArray != null) { this.lengthData = Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4); http://git-wip-us.apache.org/repos/asf/spark/blob/d2b2932d/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index 998067a..f7b06c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -198,4 +198,30 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(0, IntegerType) === 456) assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(1, DoubleType) === 5.67) } + + test("[SPARK-22092] off-heap column vector reallocation corrupts array data") { + val arrayType = ArrayType(IntegerType, true) + testVector = new OffHeapColumnVector(8, arrayType) + + val data = testVector.arrayData() + (0 until 8).foreach(i => data.putInt(i, i)) + (0 until 8).foreach(i => testVector.putArray(i, i, 1)) + + // Increase vector's capacity and reallocate the data to new bigger buffers. + testVector.reserve(16) + + // Check that none of the values got lost/overwritten. + val array = new ColumnVector.Array(testVector) + (0 until 8).foreach { i => + assert(array.get(i, arrayType).asInstanceOf[ArrayData].toIntArray() === Array(i)) + } + } + + test("[SPARK-22092] off-heap column vector reallocation corrupts struct nullability") { + val structType = new StructType().add("int", IntegerType).add("double", DoubleType) + testVector = new OffHeapColumnVector(8, structType) + (0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else testVector.putNotNull(i)) + testVector.reserve(16) + (0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0))) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org