http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala deleted file mode 100644 index 6b74014..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql._ -import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.test.SQLTestData._ - -class PartitionBatchPruningSuite extends SparkFunSuite with SharedSQLContext { - import testImplicits._ - - private lazy val originalColumnBatchSize = sqlContext.conf.columnBatchSize - private lazy val originalInMemoryPartitionPruning = sqlContext.conf.inMemoryPartitionPruning - - override protected def beforeAll(): Unit = { - super.beforeAll() - // Make a table with 5 partitions, 2 batches per partition, 10 elements per batch - sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, 10) - - val pruningData = sparkContext.makeRDD((1 to 100).map { key => - val string = if (((key - 1) / 10) % 2 == 0) null else key.toString - TestData(key, string) - }, 5).toDF() - pruningData.registerTempTable("pruningData") - - // Enable in-memory partition pruning - sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true) - // Enable in-memory table scan accumulators - sqlContext.setConf("spark.sql.inMemoryTableScanStatistics.enable", "true") - sqlContext.cacheTable("pruningData") - } - - override protected def afterAll(): Unit = { - try { - sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize) - sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) - sqlContext.uncacheTable("pruningData") - } finally { - super.afterAll() - } - } - - // Comparisons - checkBatchPruning("SELECT key FROM pruningData WHERE key = 1", 1, 1)(Seq(1)) - checkBatchPruning("SELECT key FROM pruningData WHERE 1 = key", 1, 1)(Seq(1)) - checkBatchPruning("SELECT key FROM pruningData WHERE key < 12", 1, 2)(1 to 11) - checkBatchPruning("SELECT key FROM pruningData WHERE key <= 11", 1, 2)(1 to 11) - checkBatchPruning("SELECT key FROM pruningData WHERE key > 88", 1, 2)(89 to 100) - checkBatchPruning("SELECT key FROM pruningData WHERE key >= 89", 1, 2)(89 to 100) - checkBatchPruning("SELECT key FROM pruningData WHERE 12 > key", 1, 2)(1 to 11) - checkBatchPruning("SELECT key FROM pruningData WHERE 11 >= key", 1, 2)(1 to 11) - checkBatchPruning("SELECT key FROM pruningData WHERE 88 < key", 1, 2)(89 to 100) - checkBatchPruning("SELECT key FROM pruningData WHERE 89 <= key", 1, 2)(89 to 100) - - // IS NULL - checkBatchPruning("SELECT key FROM pruningData WHERE value IS NULL", 5, 5) { - (1 to 10) ++ (21 to 30) ++ (41 to 50) ++ (61 to 70) ++ (81 to 90) - } - - // IS NOT NULL - checkBatchPruning("SELECT key FROM pruningData WHERE value IS NOT NULL", 5, 5) { - (11 to 20) ++ (31 to 40) ++ (51 to 60) ++ (71 to 80) ++ (91 to 100) - } - - // Conjunction and disjunction - checkBatchPruning("SELECT key FROM pruningData WHERE key > 8 AND key <= 21", 2, 3)(9 to 21) - checkBatchPruning("SELECT key FROM pruningData WHERE key < 2 OR key > 99", 2, 2)(Seq(1, 100)) - checkBatchPruning("SELECT key FROM pruningData WHERE key < 12 AND key IS NOT NULL", 1, 2)(1 to 11) - checkBatchPruning("SELECT key FROM pruningData WHERE key < 2 OR (key > 78 AND key < 92)", 3, 4) { - Seq(1) ++ (79 to 91) - } - checkBatchPruning("SELECT key FROM pruningData WHERE NOT (key < 88)", 1, 2) { - // Although the `NOT` operator isn't supported directly, the optimizer can transform - // `NOT (a < b)` to `b >= a` - 88 to 100 - } - - // With unsupported predicate - { - val seq = (1 to 30).mkString(", ") - checkBatchPruning(s"SELECT key FROM pruningData WHERE NOT (key IN ($seq))", 5, 10)(31 to 100) - checkBatchPruning(s"SELECT key FROM pruningData WHERE NOT (key IN ($seq)) AND key > 88", 1, 2) { - 89 to 100 - } - } - - def checkBatchPruning( - query: String, - expectedReadPartitions: Int, - expectedReadBatches: Int)( - expectedQueryResult: => Seq[Int]): Unit = { - - test(query) { - val df = sql(query) - val queryExecution = df.queryExecution - - assertResult(expectedQueryResult.toArray, s"Wrong query result: $queryExecution") { - df.collect().map(_(0)).toArray - } - - val (readPartitions, readBatches) = df.queryExecution.executedPlan.collect { - case in: InMemoryColumnarTableScan => (in.readPartitions.value, in.readBatches.value) - }.head - - assert(readBatches === expectedReadBatches, s"Wrong number of read batches: $queryExecution") - assert( - readPartitions === expectedReadPartitions, - s"Wrong number of read partitions: $queryExecution") - } - } -}
http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala deleted file mode 100644 index 9a2948c..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar.compression - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.GenericMutableRow -import org.apache.spark.sql.columnar.ColumnarTestUtils._ -import org.apache.spark.sql.columnar.{BOOLEAN, NoopColumnStats} - -class BooleanBitSetSuite extends SparkFunSuite { - import BooleanBitSet._ - - def skeleton(count: Int) { - // ------------- - // Tests encoder - // ------------- - - val builder = TestCompressibleColumnBuilder(new NoopColumnStats, BOOLEAN, BooleanBitSet) - val rows = Seq.fill[InternalRow](count)(makeRandomRow(BOOLEAN)) - val values = rows.map(_.getBoolean(0)) - - rows.foreach(builder.appendFrom(_, 0)) - val buffer = builder.build() - - // Column type ID + null count + null positions - val headerSize = CompressionScheme.columnHeaderSize(buffer) - - // Compression scheme ID + element count + bitset words - val compressedSize = 4 + 4 + { - val extra = if (count % BITS_PER_LONG == 0) 0 else 1 - (count / BITS_PER_LONG + extra) * 8 - } - - // 4 extra bytes for compression scheme type ID - assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) - - // Skips column header - buffer.position(headerSize) - assertResult(BooleanBitSet.typeId, "Wrong compression scheme ID")(buffer.getInt()) - assertResult(count, "Wrong element count")(buffer.getInt()) - - var word = 0: Long - for (i <- 0 until count) { - val bit = i % BITS_PER_LONG - word = if (bit == 0) buffer.getLong() else word - assertResult(values(i), s"Wrong value in compressed buffer, index=$i") { - (word & ((1: Long) << bit)) != 0 - } - } - - // ------------- - // Tests decoder - // ------------- - - // Rewinds, skips column header and 4 more bytes for compression scheme ID - buffer.rewind().position(headerSize + 4) - - val decoder = BooleanBitSet.decoder(buffer, BOOLEAN) - val mutableRow = new GenericMutableRow(1) - if (values.nonEmpty) { - values.foreach { - assert(decoder.hasNext) - assertResult(_, "Wrong decoded value") { - decoder.next(mutableRow, 0) - mutableRow.getBoolean(0) - } - } - } - assert(!decoder.hasNext) - } - - test(s"$BooleanBitSet: empty") { - skeleton(0) - } - - test(s"$BooleanBitSet: less than 1 word") { - skeleton(BITS_PER_LONG - 1) - } - - test(s"$BooleanBitSet: exactly 1 word") { - skeleton(BITS_PER_LONG) - } - - test(s"$BooleanBitSet: multiple whole words") { - skeleton(BITS_PER_LONG * 2) - } - - test(s"$BooleanBitSet: multiple words and 1 more bit") { - skeleton(BITS_PER_LONG * 2 + 1) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala deleted file mode 100644 index acfab65..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar.compression - -import java.nio.ByteBuffer - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.GenericMutableRow -import org.apache.spark.sql.columnar._ -import org.apache.spark.sql.columnar.ColumnarTestUtils._ -import org.apache.spark.sql.types.AtomicType - -class DictionaryEncodingSuite extends SparkFunSuite { - testDictionaryEncoding(new IntColumnStats, INT) - testDictionaryEncoding(new LongColumnStats, LONG) - testDictionaryEncoding(new StringColumnStats, STRING) - - def testDictionaryEncoding[T <: AtomicType]( - columnStats: ColumnStats, - columnType: NativeColumnType[T]) { - - val typeName = columnType.getClass.getSimpleName.stripSuffix("$") - - def buildDictionary(buffer: ByteBuffer) = { - (0 until buffer.getInt()).map(columnType.extract(buffer) -> _.toShort).toMap - } - - def stableDistinct(seq: Seq[Int]): Seq[Int] = if (seq.isEmpty) { - Seq.empty - } else { - seq.head +: seq.tail.filterNot(_ == seq.head) - } - - def skeleton(uniqueValueCount: Int, inputSeq: Seq[Int]) { - // ------------- - // Tests encoder - // ------------- - - val builder = TestCompressibleColumnBuilder(columnStats, columnType, DictionaryEncoding) - val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount) - val dictValues = stableDistinct(inputSeq) - - inputSeq.foreach(i => builder.appendFrom(rows(i), 0)) - - if (dictValues.length > DictionaryEncoding.MAX_DICT_SIZE) { - withClue("Dictionary overflowed, compression should fail") { - intercept[Throwable] { - builder.build() - } - } - } else { - val buffer = builder.build() - val headerSize = CompressionScheme.columnHeaderSize(buffer) - // 4 extra bytes for dictionary size - val dictionarySize = 4 + rows.map(columnType.actualSize(_, 0)).sum - // 2 bytes for each `Short` - val compressedSize = 4 + dictionarySize + 2 * inputSeq.length - // 4 extra bytes for compression scheme type ID - assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) - - // Skips column header - buffer.position(headerSize) - assertResult(DictionaryEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt()) - - val dictionary = buildDictionary(buffer).toMap - - dictValues.foreach { i => - assertResult(i, "Wrong dictionary entry") { - dictionary(values(i)) - } - } - - inputSeq.foreach { i => - assertResult(i.toShort, "Wrong column element value")(buffer.getShort()) - } - - // ------------- - // Tests decoder - // ------------- - - // Rewinds, skips column header and 4 more bytes for compression scheme ID - buffer.rewind().position(headerSize + 4) - - val decoder = DictionaryEncoding.decoder(buffer, columnType) - val mutableRow = new GenericMutableRow(1) - - if (inputSeq.nonEmpty) { - inputSeq.foreach { i => - assert(decoder.hasNext) - assertResult(values(i), "Wrong decoded value") { - decoder.next(mutableRow, 0) - columnType.getField(mutableRow, 0) - } - } - } - - assert(!decoder.hasNext) - } - } - - test(s"$DictionaryEncoding with $typeName: empty") { - skeleton(0, Seq.empty) - } - - test(s"$DictionaryEncoding with $typeName: simple case") { - skeleton(2, Seq(0, 1, 0, 1)) - } - - test(s"$DictionaryEncoding with $typeName: dictionary overflow") { - skeleton(DictionaryEncoding.MAX_DICT_SIZE + 1, 0 to DictionaryEncoding.MAX_DICT_SIZE) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala deleted file mode 100644 index 2111e9f..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar.compression - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.GenericMutableRow -import org.apache.spark.sql.columnar._ -import org.apache.spark.sql.columnar.ColumnarTestUtils._ -import org.apache.spark.sql.types.IntegralType - -class IntegralDeltaSuite extends SparkFunSuite { - testIntegralDelta(new IntColumnStats, INT, IntDelta) - testIntegralDelta(new LongColumnStats, LONG, LongDelta) - - def testIntegralDelta[I <: IntegralType]( - columnStats: ColumnStats, - columnType: NativeColumnType[I], - scheme: CompressionScheme) { - - def skeleton(input: Seq[I#InternalType]) { - // ------------- - // Tests encoder - // ------------- - - val builder = TestCompressibleColumnBuilder(columnStats, columnType, scheme) - val deltas = if (input.isEmpty) { - Seq.empty[Long] - } else { - (input.tail, input.init).zipped.map { - case (x: Int, y: Int) => (x - y).toLong - case (x: Long, y: Long) => x - y - } - } - - input.map { value => - val row = new GenericMutableRow(1) - columnType.setField(row, 0, value) - builder.appendFrom(row, 0) - } - - val buffer = builder.build() - // Column type ID + null count + null positions - val headerSize = CompressionScheme.columnHeaderSize(buffer) - - // Compression scheme ID + compressed contents - val compressedSize = 4 + (if (deltas.isEmpty) { - 0 - } else { - val oneBoolean = columnType.defaultSize - 1 + oneBoolean + deltas.map { - d => if (math.abs(d) <= Byte.MaxValue) 1 else 1 + oneBoolean - }.sum - }) - - // 4 extra bytes for compression scheme type ID - assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) - - buffer.position(headerSize) - assertResult(scheme.typeId, "Wrong compression scheme ID")(buffer.getInt()) - - if (input.nonEmpty) { - assertResult(Byte.MinValue, "The first byte should be an escaping mark")(buffer.get()) - assertResult(input.head, "The first value is wrong")(columnType.extract(buffer)) - - (input.tail, deltas).zipped.foreach { (value, delta) => - if (math.abs(delta) <= Byte.MaxValue) { - assertResult(delta, "Wrong delta")(buffer.get()) - } else { - assertResult(Byte.MinValue, "Expecting escaping mark here")(buffer.get()) - assertResult(value, "Wrong value")(columnType.extract(buffer)) - } - } - } - - // ------------- - // Tests decoder - // ------------- - - // Rewinds, skips column header and 4 more bytes for compression scheme ID - buffer.rewind().position(headerSize + 4) - - val decoder = scheme.decoder(buffer, columnType) - val mutableRow = new GenericMutableRow(1) - - if (input.nonEmpty) { - input.foreach{ - assert(decoder.hasNext) - assertResult(_, "Wrong decoded value") { - decoder.next(mutableRow, 0) - columnType.getField(mutableRow, 0) - } - } - } - assert(!decoder.hasNext) - } - - test(s"$scheme: empty column") { - skeleton(Seq.empty) - } - - test(s"$scheme: simple case") { - val input = columnType match { - case INT => Seq(2: Int, 1: Int, 2: Int, 130: Int) - case LONG => Seq(2: Long, 1: Long, 2: Long, 130: Long) - } - - skeleton(input.map(_.asInstanceOf[I#InternalType])) - } - - test(s"$scheme: long random series") { - // Have to workaround with `Any` since no `ClassTag[I#JvmType]` available here. - val input = Array.fill[Any](10000)(makeRandomValue(columnType)) - skeleton(input.map(_.asInstanceOf[I#InternalType])) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala deleted file mode 100644 index 67ec08f..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar.compression - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.GenericMutableRow -import org.apache.spark.sql.columnar._ -import org.apache.spark.sql.columnar.ColumnarTestUtils._ -import org.apache.spark.sql.types.AtomicType - -class RunLengthEncodingSuite extends SparkFunSuite { - testRunLengthEncoding(new NoopColumnStats, BOOLEAN) - testRunLengthEncoding(new ByteColumnStats, BYTE) - testRunLengthEncoding(new ShortColumnStats, SHORT) - testRunLengthEncoding(new IntColumnStats, INT) - testRunLengthEncoding(new LongColumnStats, LONG) - testRunLengthEncoding(new StringColumnStats, STRING) - - def testRunLengthEncoding[T <: AtomicType]( - columnStats: ColumnStats, - columnType: NativeColumnType[T]) { - - val typeName = columnType.getClass.getSimpleName.stripSuffix("$") - - def skeleton(uniqueValueCount: Int, inputRuns: Seq[(Int, Int)]) { - // ------------- - // Tests encoder - // ------------- - - val builder = TestCompressibleColumnBuilder(columnStats, columnType, RunLengthEncoding) - val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount) - val inputSeq = inputRuns.flatMap { case (index, run) => - Seq.fill(run)(index) - } - - inputSeq.foreach(i => builder.appendFrom(rows(i), 0)) - val buffer = builder.build() - - // Column type ID + null count + null positions - val headerSize = CompressionScheme.columnHeaderSize(buffer) - - // Compression scheme ID + compressed contents - val compressedSize = 4 + inputRuns.map { case (index, _) => - // 4 extra bytes each run for run length - columnType.actualSize(rows(index), 0) + 4 - }.sum - - // 4 extra bytes for compression scheme type ID - assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) - - // Skips column header - buffer.position(headerSize) - assertResult(RunLengthEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt()) - - inputRuns.foreach { case (index, run) => - assertResult(values(index), "Wrong column element value")(columnType.extract(buffer)) - assertResult(run, "Wrong run length")(buffer.getInt()) - } - - // ------------- - // Tests decoder - // ------------- - - // Rewinds, skips column header and 4 more bytes for compression scheme ID - buffer.rewind().position(headerSize + 4) - - val decoder = RunLengthEncoding.decoder(buffer, columnType) - val mutableRow = new GenericMutableRow(1) - - if (inputSeq.nonEmpty) { - inputSeq.foreach { i => - assert(decoder.hasNext) - assertResult(values(i), "Wrong decoded value") { - decoder.next(mutableRow, 0) - columnType.getField(mutableRow, 0) - } - } - } - - assert(!decoder.hasNext) - } - - test(s"$RunLengthEncoding with $typeName: empty column") { - skeleton(0, Seq.empty) - } - - test(s"$RunLengthEncoding with $typeName: simple case") { - skeleton(2, Seq(0 -> 2, 1 ->2)) - } - - test(s"$RunLengthEncoding with $typeName: run length == 1") { - skeleton(2, Seq(0 -> 1, 1 ->1)) - } - - test(s"$RunLengthEncoding with $typeName: single long run") { - skeleton(1, Seq(0 -> 1000)) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala deleted file mode 100644 index 5268dfe..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar.compression - -import org.apache.spark.sql.columnar._ -import org.apache.spark.sql.types.AtomicType - -class TestCompressibleColumnBuilder[T <: AtomicType]( - override val columnStats: ColumnStats, - override val columnType: NativeColumnType[T], - override val schemes: Seq[CompressionScheme]) - extends NativeColumnBuilder(columnStats, columnType) - with NullableColumnBuilder - with CompressibleColumnBuilder[T] { - - override protected def isWorthCompressing(encoder: Encoder[T]) = true -} - -object TestCompressibleColumnBuilder { - def apply[T <: AtomicType]( - columnStats: ColumnStats, - columnType: NativeColumnType[T], - scheme: CompressionScheme): TestCompressibleColumnBuilder[T] = { - - val builder = new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme)) - builder.initialize(0, "", useCompression = true) - builder - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnStatsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnStatsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnStatsSuite.scala new file mode 100644 index 0000000..b2d04f7 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnStatsSuite.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types._ + +class ColumnStatsSuite extends SparkFunSuite { + testColumnStats(classOf[BooleanColumnStats], BOOLEAN, createRow(true, false, 0)) + testColumnStats(classOf[ByteColumnStats], BYTE, createRow(Byte.MaxValue, Byte.MinValue, 0)) + testColumnStats(classOf[ShortColumnStats], SHORT, createRow(Short.MaxValue, Short.MinValue, 0)) + testColumnStats(classOf[IntColumnStats], INT, createRow(Int.MaxValue, Int.MinValue, 0)) + testColumnStats(classOf[LongColumnStats], LONG, createRow(Long.MaxValue, Long.MinValue, 0)) + testColumnStats(classOf[FloatColumnStats], FLOAT, createRow(Float.MaxValue, Float.MinValue, 0)) + testColumnStats(classOf[DoubleColumnStats], DOUBLE, + createRow(Double.MaxValue, Double.MinValue, 0)) + testColumnStats(classOf[StringColumnStats], STRING, createRow(null, null, 0)) + testDecimalColumnStats(createRow(null, null, 0)) + + def createRow(values: Any*): GenericInternalRow = new GenericInternalRow(values.toArray) + + def testColumnStats[T <: AtomicType, U <: ColumnStats]( + columnStatsClass: Class[U], + columnType: NativeColumnType[T], + initialStatistics: GenericInternalRow): Unit = { + + val columnStatsName = columnStatsClass.getSimpleName + + test(s"$columnStatsName: empty") { + val columnStats = columnStatsClass.newInstance() + columnStats.collectedStatistics.values.zip(initialStatistics.values).foreach { + case (actual, expected) => assert(actual === expected) + } + } + + test(s"$columnStatsName: non-empty") { + import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ + + val columnStats = columnStatsClass.newInstance() + val rows = Seq.fill(10)(makeRandomRow(columnType)) ++ Seq.fill(10)(makeNullRow(1)) + rows.foreach(columnStats.gatherStats(_, 0)) + + val values = rows.take(10).map(_.get(0, columnType.dataType).asInstanceOf[T#InternalType]) + val ordering = columnType.dataType.ordering.asInstanceOf[Ordering[T#InternalType]] + val stats = columnStats.collectedStatistics + + assertResult(values.min(ordering), "Wrong lower bound")(stats.values(0)) + assertResult(values.max(ordering), "Wrong upper bound")(stats.values(1)) + assertResult(10, "Wrong null count")(stats.values(2)) + assertResult(20, "Wrong row count")(stats.values(3)) + assertResult(stats.values(4), "Wrong size in bytes") { + rows.map { row => + if (row.isNullAt(0)) 4 else columnType.actualSize(row, 0) + }.sum + } + } + } + + def testDecimalColumnStats[T <: AtomicType, U <: ColumnStats]( + initialStatistics: GenericInternalRow): Unit = { + + val columnStatsName = classOf[DecimalColumnStats].getSimpleName + val columnType = COMPACT_DECIMAL(15, 10) + + test(s"$columnStatsName: empty") { + val columnStats = new DecimalColumnStats(15, 10) + columnStats.collectedStatistics.values.zip(initialStatistics.values).foreach { + case (actual, expected) => assert(actual === expected) + } + } + + test(s"$columnStatsName: non-empty") { + import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ + + val columnStats = new DecimalColumnStats(15, 10) + val rows = Seq.fill(10)(makeRandomRow(columnType)) ++ Seq.fill(10)(makeNullRow(1)) + rows.foreach(columnStats.gatherStats(_, 0)) + + val values = rows.take(10).map(_.get(0, columnType.dataType).asInstanceOf[T#InternalType]) + val ordering = columnType.dataType.ordering.asInstanceOf[Ordering[T#InternalType]] + val stats = columnStats.collectedStatistics + + assertResult(values.min(ordering), "Wrong lower bound")(stats.values(0)) + assertResult(values.max(ordering), "Wrong upper bound")(stats.values(1)) + assertResult(10, "Wrong null count")(stats.values(2)) + assertResult(20, "Wrong row count")(stats.values(3)) + assertResult(stats.values(4), "Wrong size in bytes") { + rows.map { row => + if (row.isNullAt(0)) 4 else columnType.actualSize(row, 0) + }.sum + } + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala new file mode 100644 index 0000000..34dd969 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.nio.{ByteOrder, ByteBuffer} + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, GenericMutableRow} +import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ +import org.apache.spark.sql.types._ +import org.apache.spark.{Logging, SparkFunSuite} + + +class ColumnTypeSuite extends SparkFunSuite with Logging { + private val DEFAULT_BUFFER_SIZE = 512 + private val MAP_TYPE = MAP(MapType(IntegerType, StringType)) + private val ARRAY_TYPE = ARRAY(ArrayType(IntegerType)) + private val STRUCT_TYPE = STRUCT(StructType(StructField("a", StringType) :: Nil)) + + test("defaultSize") { + val checks = Map( + NULL-> 0, BOOLEAN -> 1, BYTE -> 1, SHORT -> 2, INT -> 4, LONG -> 8, + FLOAT -> 4, DOUBLE -> 8, COMPACT_DECIMAL(15, 10) -> 8, LARGE_DECIMAL(20, 10) -> 12, + STRING -> 8, BINARY -> 16, STRUCT_TYPE -> 20, ARRAY_TYPE -> 16, MAP_TYPE -> 32) + + checks.foreach { case (columnType, expectedSize) => + assertResult(expectedSize, s"Wrong defaultSize for $columnType") { + columnType.defaultSize + } + } + } + + test("actualSize") { + def checkActualSize( + columnType: ColumnType[_], + value: Any, + expected: Int): Unit = { + + assertResult(expected, s"Wrong actualSize for $columnType") { + val row = new GenericMutableRow(1) + row.update(0, CatalystTypeConverters.convertToCatalyst(value)) + val proj = UnsafeProjection.create(Array[DataType](columnType.dataType)) + columnType.actualSize(proj(row), 0) + } + } + + checkActualSize(NULL, null, 0) + checkActualSize(BOOLEAN, true, 1) + checkActualSize(BYTE, Byte.MaxValue, 1) + checkActualSize(SHORT, Short.MaxValue, 2) + checkActualSize(INT, Int.MaxValue, 4) + checkActualSize(LONG, Long.MaxValue, 8) + checkActualSize(FLOAT, Float.MaxValue, 4) + checkActualSize(DOUBLE, Double.MaxValue, 8) + checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length) + checkActualSize(BINARY, Array.fill[Byte](4)(0.toByte), 4 + 4) + checkActualSize(COMPACT_DECIMAL(15, 10), Decimal(0, 15, 10), 8) + checkActualSize(LARGE_DECIMAL(20, 10), Decimal(0, 20, 10), 5) + checkActualSize(ARRAY_TYPE, Array[Any](1), 16) + checkActualSize(MAP_TYPE, Map(1 -> "a"), 29) + checkActualSize(STRUCT_TYPE, Row("hello"), 28) + } + + testNativeColumnType(BOOLEAN) + testNativeColumnType(BYTE) + testNativeColumnType(SHORT) + testNativeColumnType(INT) + testNativeColumnType(LONG) + testNativeColumnType(FLOAT) + testNativeColumnType(DOUBLE) + testNativeColumnType(COMPACT_DECIMAL(15, 10)) + testNativeColumnType(STRING) + + testColumnType(NULL) + testColumnType(BINARY) + testColumnType(LARGE_DECIMAL(20, 10)) + testColumnType(STRUCT_TYPE) + testColumnType(ARRAY_TYPE) + testColumnType(MAP_TYPE) + + def testNativeColumnType[T <: AtomicType](columnType: NativeColumnType[T]): Unit = { + testColumnType[T#InternalType](columnType) + } + + def testColumnType[JvmType](columnType: ColumnType[JvmType]): Unit = { + + val buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE).order(ByteOrder.nativeOrder()) + val proj = UnsafeProjection.create(Array[DataType](columnType.dataType)) + val converter = CatalystTypeConverters.createToScalaConverter(columnType.dataType) + val seq = (0 until 4).map(_ => proj(makeRandomRow(columnType)).copy()) + + test(s"$columnType append/extract") { + buffer.rewind() + seq.foreach(columnType.append(_, 0, buffer)) + + buffer.rewind() + seq.foreach { row => + logInfo("buffer = " + buffer + ", expected = " + row) + val expected = converter(row.get(0, columnType.dataType)) + val extracted = converter(columnType.extract(buffer)) + assert(expected === extracted, + s"Extracted value didn't equal to the original one. $expected != $extracted, buffer =" + + dumpBuffer(buffer.duplicate().rewind().asInstanceOf[ByteBuffer])) + } + } + } + + private def dumpBuffer(buff: ByteBuffer): Any = { + val sb = new StringBuilder() + while (buff.hasRemaining) { + val b = buff.get() + sb.append(Integer.toHexString(b & 0xff)).append(' ') + } + if (sb.nonEmpty) sb.setLength(sb.length - 1) + sb.toString() + } + + test("column type for decimal types with different precision") { + (1 to 18).foreach { i => + assertResult(COMPACT_DECIMAL(i, 0)) { + ColumnType(DecimalType(i, 0)) + } + } + + assertResult(LARGE_DECIMAL(19, 0)) { + ColumnType(DecimalType(19, 0)) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala new file mode 100644 index 0000000..9cae65e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import scala.collection.immutable.HashSet +import scala.util.Random + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow} +import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayBasedMapData} +import org.apache.spark.sql.types.{AtomicType, Decimal} +import org.apache.spark.unsafe.types.UTF8String + +object ColumnarTestUtils { + def makeNullRow(length: Int): GenericMutableRow = { + val row = new GenericMutableRow(length) + (0 until length).foreach(row.setNullAt) + row + } + + def makeRandomValue[JvmType](columnType: ColumnType[JvmType]): JvmType = { + def randomBytes(length: Int) = { + val bytes = new Array[Byte](length) + Random.nextBytes(bytes) + bytes + } + + (columnType match { + case NULL => null + case BOOLEAN => Random.nextBoolean() + case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte + case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort + case INT => Random.nextInt() + case LONG => Random.nextLong() + case FLOAT => Random.nextFloat() + case DOUBLE => Random.nextDouble() + case STRING => UTF8String.fromString(Random.nextString(Random.nextInt(32))) + case BINARY => randomBytes(Random.nextInt(32)) + case COMPACT_DECIMAL(precision, scale) => Decimal(Random.nextLong() % 100, precision, scale) + case LARGE_DECIMAL(precision, scale) => Decimal(Random.nextLong(), precision, scale) + case STRUCT(_) => + new GenericInternalRow(Array[Any](UTF8String.fromString(Random.nextString(10)))) + case ARRAY(_) => + new GenericArrayData(Array[Any](Random.nextInt(), Random.nextInt())) + case MAP(_) => + ArrayBasedMapData( + Map(Random.nextInt() -> UTF8String.fromString(Random.nextString(Random.nextInt(32))))) + }).asInstanceOf[JvmType] + } + + def makeRandomValues( + head: ColumnType[_], + tail: ColumnType[_]*): Seq[Any] = makeRandomValues(Seq(head) ++ tail) + + def makeRandomValues(columnTypes: Seq[ColumnType[_]]): Seq[Any] = { + columnTypes.map(makeRandomValue(_)) + } + + def makeUniqueRandomValues[JvmType]( + columnType: ColumnType[JvmType], + count: Int): Seq[JvmType] = { + + Iterator.iterate(HashSet.empty[JvmType]) { set => + set + Iterator.continually(makeRandomValue(columnType)).filterNot(set.contains).next() + }.drop(count).next().toSeq + } + + def makeRandomRow( + head: ColumnType[_], + tail: ColumnType[_]*): InternalRow = makeRandomRow(Seq(head) ++ tail) + + def makeRandomRow(columnTypes: Seq[ColumnType[_]]): InternalRow = { + val row = new GenericMutableRow(columnTypes.length) + makeRandomValues(columnTypes).zipWithIndex.foreach { case (value, index) => + row(index) = value + } + row + } + + def makeUniqueValuesAndSingleValueRows[T <: AtomicType]( + columnType: NativeColumnType[T], + count: Int): (Seq[T#InternalType], Seq[GenericMutableRow]) = { + + val values = makeUniqueRandomValues(columnType, count) + val rows = values.map { value => + val row = new GenericMutableRow(1) + row(0) = value + row + } + + (values, rows) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala new file mode 100644 index 0000000..25afed2 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.test.SQLTestData._ +import org.apache.spark.sql.types._ +import org.apache.spark.storage.StorageLevel.MEMORY_ONLY + +class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + setupTestData() + + test("simple columnar query") { + val plan = sqlContext.executePlan(testData.logicalPlan).executedPlan + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) + + checkAnswer(scan, testData.collect().toSeq) + } + + test("default size avoids broadcast") { + // TODO: Improve this test when we have better statistics + sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)) + .toDF().registerTempTable("sizeTst") + sqlContext.cacheTable("sizeTst") + assert( + sqlContext.table("sizeTst").queryExecution.analyzed.statistics.sizeInBytes > + sqlContext.conf.autoBroadcastJoinThreshold) + } + + test("projection") { + val plan = sqlContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) + + checkAnswer(scan, testData.collect().map { + case Row(key: Int, value: String) => value -> key + }.map(Row.fromTuple)) + } + + test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { + val plan = sqlContext.executePlan(testData.logicalPlan).executedPlan + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) + + checkAnswer(scan, testData.collect().toSeq) + checkAnswer(scan, testData.collect().toSeq) + } + + test("SPARK-1678 regression: compression must not lose repeated values") { + checkAnswer( + sql("SELECT * FROM repeatedData"), + repeatedData.collect().toSeq.map(Row.fromTuple)) + + sqlContext.cacheTable("repeatedData") + + checkAnswer( + sql("SELECT * FROM repeatedData"), + repeatedData.collect().toSeq.map(Row.fromTuple)) + } + + test("with null values") { + checkAnswer( + sql("SELECT * FROM nullableRepeatedData"), + nullableRepeatedData.collect().toSeq.map(Row.fromTuple)) + + sqlContext.cacheTable("nullableRepeatedData") + + checkAnswer( + sql("SELECT * FROM nullableRepeatedData"), + nullableRepeatedData.collect().toSeq.map(Row.fromTuple)) + } + + test("SPARK-2729 regression: timestamp data type") { + val timestamps = (0 to 3).map(i => Tuple1(new Timestamp(i))).toDF("time") + timestamps.registerTempTable("timestamps") + + checkAnswer( + sql("SELECT time FROM timestamps"), + timestamps.collect().toSeq) + + sqlContext.cacheTable("timestamps") + + checkAnswer( + sql("SELECT time FROM timestamps"), + timestamps.collect().toSeq) + } + + test("SPARK-3320 regression: batched column buffer building should work with empty partitions") { + checkAnswer( + sql("SELECT * FROM withEmptyParts"), + withEmptyParts.collect().toSeq.map(Row.fromTuple)) + + sqlContext.cacheTable("withEmptyParts") + + checkAnswer( + sql("SELECT * FROM withEmptyParts"), + withEmptyParts.collect().toSeq.map(Row.fromTuple)) + } + + test("SPARK-4182 Caching complex types") { + complexData.cache().count() + // Shouldn't throw + complexData.count() + complexData.unpersist() + } + + test("decimal type") { + // Casting is required here because ScalaReflection can't capture decimal precision information. + val df = (1 to 10) + .map(i => Tuple1(Decimal(i, 15, 10))) + .toDF("dec") + .select($"dec" cast DecimalType(15, 10)) + + assert(df.schema.head.dataType === DecimalType(15, 10)) + + df.cache().registerTempTable("test_fixed_decimal") + checkAnswer( + sql("SELECT * FROM test_fixed_decimal"), + (1 to 10).map(i => Row(Decimal(i, 15, 10).toJavaBigDecimal))) + } + + test("test different data types") { + // Create the schema. + val struct = + StructType( + StructField("f1", FloatType, true) :: + StructField("f2", ArrayType(BooleanType), true) :: Nil) + val dataTypes = + Seq(StringType, BinaryType, NullType, BooleanType, + ByteType, ShortType, IntegerType, LongType, + FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), + DateType, TimestampType, + ArrayType(IntegerType), MapType(StringType, LongType), struct) + val fields = dataTypes.zipWithIndex.map { case (dataType, index) => + StructField(s"col$index", dataType, true) + } + val allColumns = fields.map(_.name).mkString(",") + val schema = StructType(fields) + + // Create a RDD for the schema + val rdd = + sparkContext.parallelize((1 to 10000), 10).map { i => + Row( + s"str${i}: test cache.", + s"binary${i}: test cache.".getBytes("UTF-8"), + null, + i % 2 == 0, + i.toByte, + i.toShort, + i, + Long.MaxValue - i.toLong, + (i + 0.25).toFloat, + (i + 0.75), + BigDecimal(Long.MaxValue.toString + ".12345"), + new java.math.BigDecimal(s"${i % 9 + 1}" + ".23456"), + new Date(i), + new Timestamp(i * 1000000L), + (i to i + 10).toSeq, + (i to i + 10).map(j => s"map_key_$j" -> (Long.MaxValue - j)).toMap, + Row((i - 0.25).toFloat, Seq(true, false, null))) + } + sqlContext.createDataFrame(rdd, schema).registerTempTable("InMemoryCache_different_data_types") + // Cache the table. + sql("cache table InMemoryCache_different_data_types") + // Make sure the table is indeed cached. + sqlContext.table("InMemoryCache_different_data_types").queryExecution.executedPlan + assert( + sqlContext.isCached("InMemoryCache_different_data_types"), + "InMemoryCache_different_data_types should be cached.") + // Issue a query and check the results. + checkAnswer( + sql(s"SELECT DISTINCT ${allColumns} FROM InMemoryCache_different_data_types"), + sqlContext.table("InMemoryCache_different_data_types").collect()) + sqlContext.dropTempTable("InMemoryCache_different_data_types") + } + + test("SPARK-10422: String column in InMemoryColumnarCache needs to override clone method") { + val df = sqlContext.range(1, 100).selectExpr("id % 10 as id") + .rdd.map(id => Tuple1(s"str_$id")).toDF("i") + val cached = df.cache() + // count triggers the caching action. It should not throw. + cached.count() + + // Make sure, the DataFrame is indeed cached. + assert(sqlContext.cacheManager.lookupCachedData(cached).nonEmpty) + + // Check result. + checkAnswer( + cached, + sqlContext.range(1, 100).selectExpr("id % 10 as id") + .rdd.map(id => Tuple1(s"str_$id")).toDF("i") + ) + + // Drop the cache. + cached.unpersist() + } + + test("SPARK-10859: Predicates pushed to InMemoryColumnarTableScan are not evaluated correctly") { + val data = sqlContext.range(10).selectExpr("id", "cast(id as string) as s") + data.cache() + assert(data.count() === 10) + assert(data.filter($"s" === "3").count() === 1) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala new file mode 100644 index 0000000..35dc9a2 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.nio.ByteBuffer + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, GenericMutableRow} +import org.apache.spark.sql.types._ + +class TestNullableColumnAccessor[JvmType]( + buffer: ByteBuffer, + columnType: ColumnType[JvmType]) + extends BasicColumnAccessor(buffer, columnType) + with NullableColumnAccessor + +object TestNullableColumnAccessor { + def apply[JvmType](buffer: ByteBuffer, columnType: ColumnType[JvmType]) + : TestNullableColumnAccessor[JvmType] = { + new TestNullableColumnAccessor(buffer, columnType) + } +} + +class NullableColumnAccessorSuite extends SparkFunSuite { + import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ + + Seq( + NULL, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, + STRING, BINARY, COMPACT_DECIMAL(15, 10), LARGE_DECIMAL(20, 10), + STRUCT(StructType(StructField("a", StringType) :: Nil)), + ARRAY(ArrayType(IntegerType)), MAP(MapType(IntegerType, StringType))) + .foreach { + testNullableColumnAccessor(_) + } + + def testNullableColumnAccessor[JvmType]( + columnType: ColumnType[JvmType]): Unit = { + + val typeName = columnType.getClass.getSimpleName.stripSuffix("$") + val nullRow = makeNullRow(1) + + test(s"Nullable $typeName column accessor: empty column") { + val builder = TestNullableColumnBuilder(columnType) + val accessor = TestNullableColumnAccessor(builder.build(), columnType) + assert(!accessor.hasNext) + } + + test(s"Nullable $typeName column accessor: access null values") { + val builder = TestNullableColumnBuilder(columnType) + val randomRow = makeRandomRow(columnType) + val proj = UnsafeProjection.create(Array[DataType](columnType.dataType)) + + (0 until 4).foreach { _ => + builder.appendFrom(proj(randomRow), 0) + builder.appendFrom(proj(nullRow), 0) + } + + val accessor = TestNullableColumnAccessor(builder.build(), columnType) + val row = new GenericMutableRow(1) + val converter = CatalystTypeConverters.createToScalaConverter(columnType.dataType) + + (0 until 4).foreach { _ => + assert(accessor.hasNext) + accessor.extractTo(row, 0) + assert(converter(row.get(0, columnType.dataType)) + === converter(randomRow.get(0, columnType.dataType))) + + assert(accessor.hasNext) + accessor.extractTo(row, 0) + assert(row.isNullAt(0)) + } + + assert(!accessor.hasNext) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala new file mode 100644 index 0000000..93be3e1 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, GenericMutableRow} +import org.apache.spark.sql.types._ + +class TestNullableColumnBuilder[JvmType](columnType: ColumnType[JvmType]) + extends BasicColumnBuilder[JvmType](new NoopColumnStats, columnType) + with NullableColumnBuilder + +object TestNullableColumnBuilder { + def apply[JvmType](columnType: ColumnType[JvmType], initialSize: Int = 0) + : TestNullableColumnBuilder[JvmType] = { + val builder = new TestNullableColumnBuilder(columnType) + builder.initialize(initialSize) + builder + } +} + +class NullableColumnBuilderSuite extends SparkFunSuite { + import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ + + Seq( + BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, + STRING, BINARY, COMPACT_DECIMAL(15, 10), LARGE_DECIMAL(20, 10), + STRUCT(StructType(StructField("a", StringType) :: Nil)), + ARRAY(ArrayType(IntegerType)), MAP(MapType(IntegerType, StringType))) + .foreach { + testNullableColumnBuilder(_) + } + + def testNullableColumnBuilder[JvmType]( + columnType: ColumnType[JvmType]): Unit = { + + val typeName = columnType.getClass.getSimpleName.stripSuffix("$") + val dataType = columnType.dataType + val proj = UnsafeProjection.create(Array[DataType](dataType)) + val converter = CatalystTypeConverters.createToScalaConverter(dataType) + + test(s"$typeName column builder: empty column") { + val columnBuilder = TestNullableColumnBuilder(columnType) + val buffer = columnBuilder.build() + + assertResult(0, "Wrong null count")(buffer.getInt()) + assert(!buffer.hasRemaining) + } + + test(s"$typeName column builder: buffer size auto growth") { + val columnBuilder = TestNullableColumnBuilder(columnType) + val randomRow = makeRandomRow(columnType) + + (0 until 4).foreach { _ => + columnBuilder.appendFrom(proj(randomRow), 0) + } + + val buffer = columnBuilder.build() + + assertResult(0, "Wrong null count")(buffer.getInt()) + } + + test(s"$typeName column builder: null values") { + val columnBuilder = TestNullableColumnBuilder(columnType) + val randomRow = makeRandomRow(columnType) + val nullRow = makeNullRow(1) + + (0 until 4).foreach { _ => + columnBuilder.appendFrom(proj(randomRow), 0) + columnBuilder.appendFrom(proj(nullRow), 0) + } + + val buffer = columnBuilder.build() + + assertResult(4, "Wrong null count")(buffer.getInt()) + + // For null positions + (1 to 7 by 2).foreach(assertResult(_, "Wrong null position")(buffer.getInt())) + + // For non-null values + val actual = new GenericMutableRow(new Array[Any](1)) + (0 until 4).foreach { _ => + columnType.extract(buffer, actual, 0) + assert(converter(actual.get(0, dataType)) === converter(randomRow.get(0, dataType)), + "Extracted value didn't equal to the original one") + } + + assert(!buffer.hasRemaining) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala new file mode 100644 index 0000000..d762f7b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.test.SQLTestData._ + +class PartitionBatchPruningSuite extends SparkFunSuite with SharedSQLContext { + import testImplicits._ + + private lazy val originalColumnBatchSize = sqlContext.conf.columnBatchSize + private lazy val originalInMemoryPartitionPruning = sqlContext.conf.inMemoryPartitionPruning + + override protected def beforeAll(): Unit = { + super.beforeAll() + // Make a table with 5 partitions, 2 batches per partition, 10 elements per batch + sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, 10) + + val pruningData = sparkContext.makeRDD((1 to 100).map { key => + val string = if (((key - 1) / 10) % 2 == 0) null else key.toString + TestData(key, string) + }, 5).toDF() + pruningData.registerTempTable("pruningData") + + // Enable in-memory partition pruning + sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true) + // Enable in-memory table scan accumulators + sqlContext.setConf("spark.sql.inMemoryTableScanStatistics.enable", "true") + sqlContext.cacheTable("pruningData") + } + + override protected def afterAll(): Unit = { + try { + sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize) + sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) + sqlContext.uncacheTable("pruningData") + } finally { + super.afterAll() + } + } + + // Comparisons + checkBatchPruning("SELECT key FROM pruningData WHERE key = 1", 1, 1)(Seq(1)) + checkBatchPruning("SELECT key FROM pruningData WHERE 1 = key", 1, 1)(Seq(1)) + checkBatchPruning("SELECT key FROM pruningData WHERE key < 12", 1, 2)(1 to 11) + checkBatchPruning("SELECT key FROM pruningData WHERE key <= 11", 1, 2)(1 to 11) + checkBatchPruning("SELECT key FROM pruningData WHERE key > 88", 1, 2)(89 to 100) + checkBatchPruning("SELECT key FROM pruningData WHERE key >= 89", 1, 2)(89 to 100) + checkBatchPruning("SELECT key FROM pruningData WHERE 12 > key", 1, 2)(1 to 11) + checkBatchPruning("SELECT key FROM pruningData WHERE 11 >= key", 1, 2)(1 to 11) + checkBatchPruning("SELECT key FROM pruningData WHERE 88 < key", 1, 2)(89 to 100) + checkBatchPruning("SELECT key FROM pruningData WHERE 89 <= key", 1, 2)(89 to 100) + + // IS NULL + checkBatchPruning("SELECT key FROM pruningData WHERE value IS NULL", 5, 5) { + (1 to 10) ++ (21 to 30) ++ (41 to 50) ++ (61 to 70) ++ (81 to 90) + } + + // IS NOT NULL + checkBatchPruning("SELECT key FROM pruningData WHERE value IS NOT NULL", 5, 5) { + (11 to 20) ++ (31 to 40) ++ (51 to 60) ++ (71 to 80) ++ (91 to 100) + } + + // Conjunction and disjunction + checkBatchPruning("SELECT key FROM pruningData WHERE key > 8 AND key <= 21", 2, 3)(9 to 21) + checkBatchPruning("SELECT key FROM pruningData WHERE key < 2 OR key > 99", 2, 2)(Seq(1, 100)) + checkBatchPruning("SELECT key FROM pruningData WHERE key < 12 AND key IS NOT NULL", 1, 2)(1 to 11) + checkBatchPruning("SELECT key FROM pruningData WHERE key < 2 OR (key > 78 AND key < 92)", 3, 4) { + Seq(1) ++ (79 to 91) + } + checkBatchPruning("SELECT key FROM pruningData WHERE NOT (key < 88)", 1, 2) { + // Although the `NOT` operator isn't supported directly, the optimizer can transform + // `NOT (a < b)` to `b >= a` + 88 to 100 + } + + // With unsupported predicate + { + val seq = (1 to 30).mkString(", ") + checkBatchPruning(s"SELECT key FROM pruningData WHERE NOT (key IN ($seq))", 5, 10)(31 to 100) + checkBatchPruning(s"SELECT key FROM pruningData WHERE NOT (key IN ($seq)) AND key > 88", 1, 2) { + 89 to 100 + } + } + + def checkBatchPruning( + query: String, + expectedReadPartitions: Int, + expectedReadBatches: Int)( + expectedQueryResult: => Seq[Int]): Unit = { + + test(query) { + val df = sql(query) + val queryExecution = df.queryExecution + + assertResult(expectedQueryResult.toArray, s"Wrong query result: $queryExecution") { + df.collect().map(_(0)).toArray + } + + val (readPartitions, readBatches) = df.queryExecution.executedPlan.collect { + case in: InMemoryColumnarTableScan => (in.readPartitions.value, in.readBatches.value) + }.head + + assert(readBatches === expectedReadBatches, s"Wrong number of read batches: $queryExecution") + assert( + readPartitions === expectedReadPartitions, + s"Wrong number of read partitions: $queryExecution") + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala new file mode 100644 index 0000000..ccbddef --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ +import org.apache.spark.sql.execution.columnar.{BOOLEAN, NoopColumnStats} + +class BooleanBitSetSuite extends SparkFunSuite { + import BooleanBitSet._ + + def skeleton(count: Int) { + // ------------- + // Tests encoder + // ------------- + + val builder = TestCompressibleColumnBuilder(new NoopColumnStats, BOOLEAN, BooleanBitSet) + val rows = Seq.fill[InternalRow](count)(makeRandomRow(BOOLEAN)) + val values = rows.map(_.getBoolean(0)) + + rows.foreach(builder.appendFrom(_, 0)) + val buffer = builder.build() + + // Column type ID + null count + null positions + val headerSize = CompressionScheme.columnHeaderSize(buffer) + + // Compression scheme ID + element count + bitset words + val compressedSize = 4 + 4 + { + val extra = if (count % BITS_PER_LONG == 0) 0 else 1 + (count / BITS_PER_LONG + extra) * 8 + } + + // 4 extra bytes for compression scheme type ID + assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) + + // Skips column header + buffer.position(headerSize) + assertResult(BooleanBitSet.typeId, "Wrong compression scheme ID")(buffer.getInt()) + assertResult(count, "Wrong element count")(buffer.getInt()) + + var word = 0: Long + for (i <- 0 until count) { + val bit = i % BITS_PER_LONG + word = if (bit == 0) buffer.getLong() else word + assertResult(values(i), s"Wrong value in compressed buffer, index=$i") { + (word & ((1: Long) << bit)) != 0 + } + } + + // ------------- + // Tests decoder + // ------------- + + // Rewinds, skips column header and 4 more bytes for compression scheme ID + buffer.rewind().position(headerSize + 4) + + val decoder = BooleanBitSet.decoder(buffer, BOOLEAN) + val mutableRow = new GenericMutableRow(1) + if (values.nonEmpty) { + values.foreach { + assert(decoder.hasNext) + assertResult(_, "Wrong decoded value") { + decoder.next(mutableRow, 0) + mutableRow.getBoolean(0) + } + } + } + assert(!decoder.hasNext) + } + + test(s"$BooleanBitSet: empty") { + skeleton(0) + } + + test(s"$BooleanBitSet: less than 1 word") { + skeleton(BITS_PER_LONG - 1) + } + + test(s"$BooleanBitSet: exactly 1 word") { + skeleton(BITS_PER_LONG) + } + + test(s"$BooleanBitSet: multiple whole words") { + skeleton(BITS_PER_LONG * 2) + } + + test(s"$BooleanBitSet: multiple words and 1 more bit") { + skeleton(BITS_PER_LONG * 2 + 1) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala new file mode 100644 index 0000000..830ca02 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import java.nio.ByteBuffer + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.execution.columnar._ +import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ +import org.apache.spark.sql.types.AtomicType + +class DictionaryEncodingSuite extends SparkFunSuite { + testDictionaryEncoding(new IntColumnStats, INT) + testDictionaryEncoding(new LongColumnStats, LONG) + testDictionaryEncoding(new StringColumnStats, STRING) + + def testDictionaryEncoding[T <: AtomicType]( + columnStats: ColumnStats, + columnType: NativeColumnType[T]) { + + val typeName = columnType.getClass.getSimpleName.stripSuffix("$") + + def buildDictionary(buffer: ByteBuffer) = { + (0 until buffer.getInt()).map(columnType.extract(buffer) -> _.toShort).toMap + } + + def stableDistinct(seq: Seq[Int]): Seq[Int] = if (seq.isEmpty) { + Seq.empty + } else { + seq.head +: seq.tail.filterNot(_ == seq.head) + } + + def skeleton(uniqueValueCount: Int, inputSeq: Seq[Int]) { + // ------------- + // Tests encoder + // ------------- + + val builder = TestCompressibleColumnBuilder(columnStats, columnType, DictionaryEncoding) + val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount) + val dictValues = stableDistinct(inputSeq) + + inputSeq.foreach(i => builder.appendFrom(rows(i), 0)) + + if (dictValues.length > DictionaryEncoding.MAX_DICT_SIZE) { + withClue("Dictionary overflowed, compression should fail") { + intercept[Throwable] { + builder.build() + } + } + } else { + val buffer = builder.build() + val headerSize = CompressionScheme.columnHeaderSize(buffer) + // 4 extra bytes for dictionary size + val dictionarySize = 4 + rows.map(columnType.actualSize(_, 0)).sum + // 2 bytes for each `Short` + val compressedSize = 4 + dictionarySize + 2 * inputSeq.length + // 4 extra bytes for compression scheme type ID + assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) + + // Skips column header + buffer.position(headerSize) + assertResult(DictionaryEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt()) + + val dictionary = buildDictionary(buffer).toMap + + dictValues.foreach { i => + assertResult(i, "Wrong dictionary entry") { + dictionary(values(i)) + } + } + + inputSeq.foreach { i => + assertResult(i.toShort, "Wrong column element value")(buffer.getShort()) + } + + // ------------- + // Tests decoder + // ------------- + + // Rewinds, skips column header and 4 more bytes for compression scheme ID + buffer.rewind().position(headerSize + 4) + + val decoder = DictionaryEncoding.decoder(buffer, columnType) + val mutableRow = new GenericMutableRow(1) + + if (inputSeq.nonEmpty) { + inputSeq.foreach { i => + assert(decoder.hasNext) + assertResult(values(i), "Wrong decoded value") { + decoder.next(mutableRow, 0) + columnType.getField(mutableRow, 0) + } + } + } + + assert(!decoder.hasNext) + } + } + + test(s"$DictionaryEncoding with $typeName: empty") { + skeleton(0, Seq.empty) + } + + test(s"$DictionaryEncoding with $typeName: simple case") { + skeleton(2, Seq(0, 1, 0, 1)) + } + + test(s"$DictionaryEncoding with $typeName: dictionary overflow") { + skeleton(DictionaryEncoding.MAX_DICT_SIZE + 1, 0 to DictionaryEncoding.MAX_DICT_SIZE) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala new file mode 100644 index 0000000..988a577 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.execution.columnar._ +import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ +import org.apache.spark.sql.types.IntegralType + +class IntegralDeltaSuite extends SparkFunSuite { + testIntegralDelta(new IntColumnStats, INT, IntDelta) + testIntegralDelta(new LongColumnStats, LONG, LongDelta) + + def testIntegralDelta[I <: IntegralType]( + columnStats: ColumnStats, + columnType: NativeColumnType[I], + scheme: CompressionScheme) { + + def skeleton(input: Seq[I#InternalType]) { + // ------------- + // Tests encoder + // ------------- + + val builder = TestCompressibleColumnBuilder(columnStats, columnType, scheme) + val deltas = if (input.isEmpty) { + Seq.empty[Long] + } else { + (input.tail, input.init).zipped.map { + case (x: Int, y: Int) => (x - y).toLong + case (x: Long, y: Long) => x - y + } + } + + input.map { value => + val row = new GenericMutableRow(1) + columnType.setField(row, 0, value) + builder.appendFrom(row, 0) + } + + val buffer = builder.build() + // Column type ID + null count + null positions + val headerSize = CompressionScheme.columnHeaderSize(buffer) + + // Compression scheme ID + compressed contents + val compressedSize = 4 + (if (deltas.isEmpty) { + 0 + } else { + val oneBoolean = columnType.defaultSize + 1 + oneBoolean + deltas.map { + d => if (math.abs(d) <= Byte.MaxValue) 1 else 1 + oneBoolean + }.sum + }) + + // 4 extra bytes for compression scheme type ID + assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) + + buffer.position(headerSize) + assertResult(scheme.typeId, "Wrong compression scheme ID")(buffer.getInt()) + + if (input.nonEmpty) { + assertResult(Byte.MinValue, "The first byte should be an escaping mark")(buffer.get()) + assertResult(input.head, "The first value is wrong")(columnType.extract(buffer)) + + (input.tail, deltas).zipped.foreach { (value, delta) => + if (math.abs(delta) <= Byte.MaxValue) { + assertResult(delta, "Wrong delta")(buffer.get()) + } else { + assertResult(Byte.MinValue, "Expecting escaping mark here")(buffer.get()) + assertResult(value, "Wrong value")(columnType.extract(buffer)) + } + } + } + + // ------------- + // Tests decoder + // ------------- + + // Rewinds, skips column header and 4 more bytes for compression scheme ID + buffer.rewind().position(headerSize + 4) + + val decoder = scheme.decoder(buffer, columnType) + val mutableRow = new GenericMutableRow(1) + + if (input.nonEmpty) { + input.foreach{ + assert(decoder.hasNext) + assertResult(_, "Wrong decoded value") { + decoder.next(mutableRow, 0) + columnType.getField(mutableRow, 0) + } + } + } + assert(!decoder.hasNext) + } + + test(s"$scheme: empty column") { + skeleton(Seq.empty) + } + + test(s"$scheme: simple case") { + val input = columnType match { + case INT => Seq(2: Int, 1: Int, 2: Int, 130: Int) + case LONG => Seq(2: Long, 1: Long, 2: Long, 130: Long) + } + + skeleton(input.map(_.asInstanceOf[I#InternalType])) + } + + test(s"$scheme: long random series") { + // Have to workaround with `Any` since no `ClassTag[I#JvmType]` available here. + val input = Array.fill[Any](10000)(makeRandomValue(columnType)) + skeleton(input.map(_.asInstanceOf[I#InternalType])) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala new file mode 100644 index 0000000..ce3affb --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.execution.columnar._ +import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ +import org.apache.spark.sql.types.AtomicType + +class RunLengthEncodingSuite extends SparkFunSuite { + testRunLengthEncoding(new NoopColumnStats, BOOLEAN) + testRunLengthEncoding(new ByteColumnStats, BYTE) + testRunLengthEncoding(new ShortColumnStats, SHORT) + testRunLengthEncoding(new IntColumnStats, INT) + testRunLengthEncoding(new LongColumnStats, LONG) + testRunLengthEncoding(new StringColumnStats, STRING) + + def testRunLengthEncoding[T <: AtomicType]( + columnStats: ColumnStats, + columnType: NativeColumnType[T]) { + + val typeName = columnType.getClass.getSimpleName.stripSuffix("$") + + def skeleton(uniqueValueCount: Int, inputRuns: Seq[(Int, Int)]) { + // ------------- + // Tests encoder + // ------------- + + val builder = TestCompressibleColumnBuilder(columnStats, columnType, RunLengthEncoding) + val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount) + val inputSeq = inputRuns.flatMap { case (index, run) => + Seq.fill(run)(index) + } + + inputSeq.foreach(i => builder.appendFrom(rows(i), 0)) + val buffer = builder.build() + + // Column type ID + null count + null positions + val headerSize = CompressionScheme.columnHeaderSize(buffer) + + // Compression scheme ID + compressed contents + val compressedSize = 4 + inputRuns.map { case (index, _) => + // 4 extra bytes each run for run length + columnType.actualSize(rows(index), 0) + 4 + }.sum + + // 4 extra bytes for compression scheme type ID + assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) + + // Skips column header + buffer.position(headerSize) + assertResult(RunLengthEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt()) + + inputRuns.foreach { case (index, run) => + assertResult(values(index), "Wrong column element value")(columnType.extract(buffer)) + assertResult(run, "Wrong run length")(buffer.getInt()) + } + + // ------------- + // Tests decoder + // ------------- + + // Rewinds, skips column header and 4 more bytes for compression scheme ID + buffer.rewind().position(headerSize + 4) + + val decoder = RunLengthEncoding.decoder(buffer, columnType) + val mutableRow = new GenericMutableRow(1) + + if (inputSeq.nonEmpty) { + inputSeq.foreach { i => + assert(decoder.hasNext) + assertResult(values(i), "Wrong decoded value") { + decoder.next(mutableRow, 0) + columnType.getField(mutableRow, 0) + } + } + } + + assert(!decoder.hasNext) + } + + test(s"$RunLengthEncoding with $typeName: empty column") { + skeleton(0, Seq.empty) + } + + test(s"$RunLengthEncoding with $typeName: simple case") { + skeleton(2, Seq(0 -> 2, 1 ->2)) + } + + test(s"$RunLengthEncoding with $typeName: run length == 1") { + skeleton(2, Seq(0 -> 1, 1 ->1)) + } + + test(s"$RunLengthEncoding with $typeName: single long run") { + skeleton(1, Seq(0 -> 1000)) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
