This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new df9d2f81954 [SPARK-39653][SQL] Clean up `ColumnVectorUtils#populate(WritableColumnVector, InternalRow, int)` from `ColumnVectorUtils` df9d2f81954 is described below commit df9d2f81954ff5c14a60c1dc3dad69fb5e6a8152 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Mon Jul 4 11:56:12 2022 -0500 [SPARK-39653][SQL] Clean up `ColumnVectorUtils#populate(WritableColumnVector, InternalRow, int)` from `ColumnVectorUtils` ### What changes were proposed in this pull request? After SPARK-39638 and SPARK-39231, `ColumnVectorUtils#populate(WritableColumnVector, InternalRow, int)` method in `ColumnVectorUtils` only used by `ConstantColumnVectorBenchmark` and `ColumnVectorSuite`. So this pr do following changes: - Clean up `ColumnVectorUtils#populate(WritableColumnVector, InternalRow, int)` from `ColumnVectorUtils` - Added a simplified version `populate` method for `ConstantColumnVectorBenchmark` - Clean up `SPARK-38018: ColumnVectorUtils.populate to handle CalendarIntervalType correctly` from `ColumnVectorSuite` due this scenario no longer exists, and the similar scenarios using `ConstantColumnVector` have been covered by `fill calendar interval` in `ColumnVectorUtils`. ### Why are the changes needed? Clean up useless code ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Execute `ConstantColumnVectorBenchmark` manually with [Benchmark GitHub Action](https://github.com/LuciferYang/spark/runs/7147111541), the result file can be produced successfully. Since the result has no obvious change, so not update in the current pr Closes #37045 from LuciferYang/SPARK-39653. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../execution/vectorized/ColumnVectorUtils.java | 62 ---------------------- .../benchmark/ConstantColumnVectorBenchmark.scala | 34 ++++++++---- .../execution/vectorized/ColumnVectorSuite.scala | 11 +--- 3 files changed, 26 insertions(+), 81 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index 7c885863ff0..a6960f733de 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -42,68 +42,6 @@ import org.apache.spark.unsafe.types.UTF8String; * These utilities are mostly used to convert ColumnVectors into other formats. */ public class ColumnVectorUtils { - /** - * Populates the entire `col` with `row[fieldIdx]` - */ - public static void populate(WritableColumnVector col, InternalRow row, int fieldIdx) { - int capacity = col.capacity; - DataType t = col.dataType(); - - if (row.isNullAt(fieldIdx)) { - col.putNulls(0, capacity); - } else { - if (t == DataTypes.BooleanType) { - col.putBooleans(0, capacity, row.getBoolean(fieldIdx)); - } else if (t == DataTypes.BinaryType) { - col.putByteArray(0, row.getBinary(fieldIdx)); - } else if (t == DataTypes.ByteType) { - col.putBytes(0, capacity, row.getByte(fieldIdx)); - } else if (t == DataTypes.ShortType) { - col.putShorts(0, capacity, row.getShort(fieldIdx)); - } else if (t == DataTypes.IntegerType) { - col.putInts(0, capacity, row.getInt(fieldIdx)); - } else if (t == DataTypes.LongType) { - col.putLongs(0, capacity, row.getLong(fieldIdx)); - } else if (t == DataTypes.FloatType) { - col.putFloats(0, capacity, row.getFloat(fieldIdx)); - } else if (t == DataTypes.DoubleType) { - col.putDoubles(0, capacity, row.getDouble(fieldIdx)); - } else if (t == DataTypes.StringType) { - UTF8String v = row.getUTF8String(fieldIdx); - byte[] bytes = v.getBytes(); - for (int i = 0; i < capacity; i++) { - col.putByteArray(i, bytes); - } - } else if (t instanceof DecimalType) { - DecimalType dt = (DecimalType)t; - Decimal d = row.getDecimal(fieldIdx, dt.precision(), dt.scale()); - if (dt.precision() <= Decimal.MAX_INT_DIGITS()) { - col.putInts(0, capacity, (int)d.toUnscaledLong()); - } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) { - col.putLongs(0, capacity, d.toUnscaledLong()); - } else { - final BigInteger integer = d.toJavaBigDecimal().unscaledValue(); - byte[] bytes = integer.toByteArray(); - for (int i = 0; i < capacity; i++) { - col.putByteArray(i, bytes, 0, bytes.length); - } - } - } else if (t instanceof CalendarIntervalType) { - CalendarInterval c = (CalendarInterval)row.get(fieldIdx, t); - col.getChild(0).putInts(0, capacity, c.months); - col.getChild(1).putInts(0, capacity, c.days); - col.getChild(2).putLongs(0, capacity, c.microseconds); - } else if (t instanceof DateType || t instanceof YearMonthIntervalType) { - col.putInts(0, capacity, row.getInt(fieldIdx)); - } else if (t instanceof TimestampType || t instanceof TimestampNTZType || - t instanceof DayTimeIntervalType) { - col.putLongs(0, capacity, row.getLong(fieldIdx)); - } else { - throw new RuntimeException(String.format("DataType %s is not supported" + - " in column vectorized reader.", t.sql())); - } - } - } /** * Populates the value of `row[fieldIdx]` into `ConstantColumnVector`. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ConstantColumnVectorBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ConstantColumnVectorBenchmark.scala index 9e4902f2fb5..8046a4b6cc5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ConstantColumnVectorBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ConstantColumnVectorBenchmark.scala @@ -22,7 +22,7 @@ import org.apache.commons.lang3.RandomStringUtils import org.apache.spark.benchmark.Benchmark import org.apache.spark.benchmark.BenchmarkBase import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.vectorized.{ColumnVectorUtils, ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector} +import org.apache.spark.sql.execution.vectorized.{ColumnVectorUtils, ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector} import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnVector import org.apache.spark.unsafe.UTF8StringBuilder @@ -41,6 +41,22 @@ import org.apache.spark.unsafe.UTF8StringBuilder */ object ConstantColumnVectorBenchmark extends BenchmarkBase { + private def populate( + col: WritableColumnVector, batchSize: Int, row: InternalRow, fieldIdx: Int): Unit = { + col.dataType() match { + case IntegerType => col.putInts(0, batchSize, row.getInt(fieldIdx)) + case LongType => col.putLongs(0, batchSize, row.getLong(fieldIdx)) + case FloatType => col.putFloats(0, batchSize, row.getFloat(fieldIdx)) + case DoubleType => col.putDoubles(0, batchSize, row.getDouble(fieldIdx)) + case StringType => + val v = row.getUTF8String(fieldIdx) + val bytes = v.getBytes + (0 until batchSize).foreach { i => + col.putByteArray(i, bytes) + } + } + } + private def readValues(dataType: DataType, batchSize: Int, vector: ColumnVector): Unit = { dataType match { case IntegerType => @@ -86,14 +102,14 @@ object ConstantColumnVectorBenchmark extends BenchmarkBase { benchmark.addCase("OnHeapColumnVector") { _: Int => for (_ <- 0 until valuesPerIteration) { onHeapColumnVector.reset() - ColumnVectorUtils.populate(onHeapColumnVector, row, 0) + populate(onHeapColumnVector, batchSize, row, 0) } } benchmark.addCase("OffHeapColumnVector") { _: Int => for (_ <- 0 until valuesPerIteration) { offHeapColumnVector.reset() - ColumnVectorUtils.populate(offHeapColumnVector, row, 0) + populate(offHeapColumnVector, batchSize, row, 0) } } @@ -114,9 +130,9 @@ object ConstantColumnVectorBenchmark extends BenchmarkBase { val constantColumnVector = new ConstantColumnVector(batchSize, dataType) onHeapColumnVector.reset() - ColumnVectorUtils.populate(onHeapColumnVector, row, 0) + populate(onHeapColumnVector, batchSize, row, 0) offHeapColumnVector.reset() - ColumnVectorUtils.populate(offHeapColumnVector, row, 0) + populate(offHeapColumnVector, batchSize, row, 0) ColumnVectorUtils.populate(constantColumnVector, row, 0) val other = if (dataType == StringType) { @@ -184,7 +200,7 @@ object ConstantColumnVectorBenchmark extends BenchmarkBase { benchmark.addCase("OnHeapColumnVector") { _: Int => onHeapColumnVector.reset() - ColumnVectorUtils.populate(onHeapColumnVector, row, 0) + populate(onHeapColumnVector, batchSize, row, 0) for (_ <- 0 until valuesPerIteration) { readValues(dataType, batchSize, onHeapColumnVector) } @@ -192,7 +208,7 @@ object ConstantColumnVectorBenchmark extends BenchmarkBase { benchmark.addCase("OffHeapColumnVector") { _: Int => offHeapColumnVector.reset() - ColumnVectorUtils.populate(offHeapColumnVector, row, 0) + populate(offHeapColumnVector, batchSize, row, 0) for (_ <- 0 until valuesPerIteration) { readValues(dataType, batchSize, offHeapColumnVector) } @@ -229,13 +245,13 @@ object ConstantColumnVectorBenchmark extends BenchmarkBase { } benchmark.addCase("OnHeapColumnVector") { _: Int => - for (i <- 0 until valuesPerIteration) { + for (_ <- 0 until valuesPerIteration) { (0 until batchSize).foreach(onHeapColumnVector.isNullAt) } } benchmark.addCase("OffHeapColumnVector") { _: Int => - for (i <- 0 until valuesPerIteration) { + for (_ <- 0 until valuesPerIteration) { (0 until batchSize).foreach(offHeapColumnVector.isNullAt) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index 4cf2376a3fc..cdf41ed651d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.columnar.ColumnAccessor import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarArray -import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} +import org.apache.spark.unsafe.types.UTF8String class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { private def withVector( @@ -605,14 +605,5 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { } } } - - test("SPARK-38018: ColumnVectorUtils.populate to handle CalendarIntervalType correctly") { - val vector = new OnHeapColumnVector(5, CalendarIntervalType) - val row = new SpecificInternalRow(Array(CalendarIntervalType)) - val interval = new CalendarInterval(3, 5, 1000000) - row.setInterval(0, interval) - ColumnVectorUtils.populate(vector, row, 0) - assert(vector.getInterval(0) === interval) - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org