Repository: spark
Updated Branches:
  refs/heads/master 78236334e -> 1faa57971


http://git-wip-us.apache.org/repos/asf/spark/blob/1faa5797/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
index 5222a47..d9d1e1b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
@@ -19,63 +19,71 @@ package org.apache.spark.sql.columnar
 
 import org.scalatest.FunSuite
 
-import org.apache.spark.sql.catalyst.types.DataType
+import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.execution.SparkSqlSerializer
 
+class TestNullableColumnBuilder[T <: DataType, JvmType](columnType: 
ColumnType[T, JvmType])
+  extends BasicColumnBuilder[T, JvmType](new NoopColumnStats[T, JvmType], 
columnType)
+  with NullableColumnBuilder
+
+object TestNullableColumnBuilder {
+  def apply[T <: DataType, JvmType](columnType: ColumnType[T, JvmType], 
initialSize: Int = 0) = {
+    val builder = new TestNullableColumnBuilder(columnType)
+    builder.initialize(initialSize)
+    builder
+  }
+}
+
 class NullableColumnBuilderSuite extends FunSuite {
-  import ColumnarTestData._
+  import ColumnarTestUtils._
 
   Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, 
GENERIC).foreach {
     testNullableColumnBuilder(_)
   }
 
   def testNullableColumnBuilder[T <: DataType, JvmType](columnType: 
ColumnType[T, JvmType]) {
-    val columnBuilder = ColumnBuilder(columnType.typeId)
     val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
 
     test(s"$typeName column builder: empty column") {
-      columnBuilder.initialize(4)
-
+      val columnBuilder = TestNullableColumnBuilder(columnType)
       val buffer = columnBuilder.build()
 
-      // For column type ID
-      assert(buffer.getInt() === columnType.typeId)
-      // For null count
-      assert(buffer.getInt === 0)
+      expectResult(columnType.typeId, "Wrong column type ID")(buffer.getInt())
+      expectResult(0, "Wrong null count")(buffer.getInt())
       assert(!buffer.hasRemaining)
     }
 
     test(s"$typeName column builder: buffer size auto growth") {
-      columnBuilder.initialize(4)
+      val columnBuilder = TestNullableColumnBuilder(columnType)
+      val randomRow = makeRandomRow(columnType)
 
-      (0 until 4) foreach { _ =>
-        columnBuilder.appendFrom(nonNullRandomRow, columnType.typeId)
+      (0 until 4).foreach { _ =>
+        columnBuilder.appendFrom(randomRow, 0)
       }
 
       val buffer = columnBuilder.build()
 
-      // For column type ID
-      assert(buffer.getInt() === columnType.typeId)
-      // For null count
-      assert(buffer.getInt() === 0)
+      expectResult(columnType.typeId, "Wrong column type ID")(buffer.getInt())
+      expectResult(0, "Wrong null count")(buffer.getInt())
     }
 
     test(s"$typeName column builder: null values") {
-      columnBuilder.initialize(4)
+      val columnBuilder = TestNullableColumnBuilder(columnType)
+      val randomRow = makeRandomRow(columnType)
+      val nullRow = makeNullRow(1)
 
-      (0 until 4) foreach { _ =>
-        columnBuilder.appendFrom(nonNullRandomRow, columnType.typeId)
-        columnBuilder.appendFrom(nullRow, columnType.typeId)
+      (0 until 4).foreach { _ =>
+        columnBuilder.appendFrom(randomRow, 0)
+        columnBuilder.appendFrom(nullRow, 0)
       }
 
       val buffer = columnBuilder.build()
 
-      // For column type ID
-      assert(buffer.getInt() === columnType.typeId)
-      // For null count
-      assert(buffer.getInt() === 4)
+      expectResult(columnType.typeId, "Wrong column type ID")(buffer.getInt())
+      expectResult(4, "Wrong null count")(buffer.getInt())
+
       // For null positions
-      (1 to 7 by 2).foreach(i => assert(buffer.getInt() === i))
+      (1 to 7 by 2).foreach(expectResult(_, "Wrong null 
position")(buffer.getInt()))
 
       // For non-null values
       (0 until 4).foreach { _ =>
@@ -84,7 +92,8 @@ class NullableColumnBuilderSuite extends FunSuite {
         } else {
           columnType.extract(buffer)
         }
-        assert(actual === nonNullRandomRow(columnType.typeId))
+
+        assert(actual === randomRow(0), "Extracted value didn't equal to the 
original one")
       }
 
       assert(!buffer.hasRemaining)

http://git-wip-us.apache.org/repos/asf/spark/blob/1faa5797/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
new file mode 100644
index 0000000..184691a
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar.compression
+
+import java.nio.ByteBuffer
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.catalyst.types.NativeType
+import org.apache.spark.sql.columnar._
+import org.apache.spark.sql.columnar.ColumnarTestUtils._
+import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+
+class DictionaryEncodingSuite extends FunSuite {
+  testDictionaryEncoding(new IntColumnStats,    INT)
+  testDictionaryEncoding(new LongColumnStats,   LONG)
+  testDictionaryEncoding(new StringColumnStats, STRING)
+
+  def testDictionaryEncoding[T <: NativeType](
+      columnStats: NativeColumnStats[T],
+      columnType: NativeColumnType[T]) {
+
+    val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
+
+    def buildDictionary(buffer: ByteBuffer) = {
+      (0 until buffer.getInt()).map(columnType.extract(buffer) -> 
_.toShort).toMap
+    }
+
+    test(s"$DictionaryEncoding with $typeName: simple case") {
+      // -------------
+      // Tests encoder
+      // -------------
+
+      val builder = TestCompressibleColumnBuilder(columnStats, columnType, 
DictionaryEncoding)
+      val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 2)
+
+      builder.initialize(0)
+      builder.appendFrom(rows(0), 0)
+      builder.appendFrom(rows(1), 0)
+      builder.appendFrom(rows(0), 0)
+      builder.appendFrom(rows(1), 0)
+
+      val buffer = builder.build()
+      val headerSize = CompressionScheme.columnHeaderSize(buffer)
+      // 4 extra bytes for dictionary size
+      val dictionarySize = 4 + values.map(columnType.actualSize).sum
+      // 4 `Short`s, 2 bytes each
+      val compressedSize = dictionarySize + 2 * 4
+      // 4 extra bytes for compression scheme type ID
+      expectResult(headerSize + 4 + compressedSize, "Wrong buffer 
capacity")(buffer.capacity)
+
+      // Skips column header
+      buffer.position(headerSize)
+      expectResult(DictionaryEncoding.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
+
+      val dictionary = buildDictionary(buffer)
+      Array[Short](0, 1).foreach { i =>
+        expectResult(i, "Wrong dictionary entry")(dictionary(values(i)))
+      }
+
+      Array[Short](0, 1, 0, 1).foreach {
+        expectResult(_, "Wrong column element value")(buffer.getShort())
+      }
+
+      // -------------
+      // Tests decoder
+      // -------------
+
+      // Rewinds, skips column header and 4 more bytes for compression scheme 
ID
+      buffer.rewind().position(headerSize + 4)
+
+      val decoder = new DictionaryEncoding.Decoder[T](buffer, columnType)
+
+      Array[Short](0, 1, 0, 1).foreach { i =>
+        expectResult(values(i), "Wrong decoded value")(decoder.next())
+      }
+
+      assert(!decoder.hasNext)
+    }
+  }
+
+  test(s"$DictionaryEncoding: overflow") {
+    val builder = TestCompressibleColumnBuilder(new IntColumnStats, INT, 
DictionaryEncoding)
+    builder.initialize(0)
+
+    (0 to Short.MaxValue).foreach { n =>
+      val row = new GenericMutableRow(1)
+      row.setInt(0, n)
+      builder.appendFrom(row, 0)
+    }
+
+    withClue("Dictionary overflowed, encoding should fail") {
+      intercept[Throwable] {
+        builder.build()
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1faa5797/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
new file mode 100644
index 0000000..2089ad1
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar.compression
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.catalyst.types.NativeType
+import org.apache.spark.sql.columnar._
+import org.apache.spark.sql.columnar.ColumnarTestUtils._
+
+class RunLengthEncodingSuite extends FunSuite {
+  testRunLengthEncoding(new BooleanColumnStats, BOOLEAN)
+  testRunLengthEncoding(new ByteColumnStats,    BYTE)
+  testRunLengthEncoding(new ShortColumnStats,   SHORT)
+  testRunLengthEncoding(new IntColumnStats,     INT)
+  testRunLengthEncoding(new LongColumnStats,    LONG)
+  testRunLengthEncoding(new StringColumnStats,  STRING)
+
+  def testRunLengthEncoding[T <: NativeType](
+      columnStats: NativeColumnStats[T],
+      columnType: NativeColumnType[T]) {
+
+    val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
+
+    test(s"$RunLengthEncoding with $typeName: simple case") {
+      // -------------
+      // Tests encoder
+      // -------------
+
+      val builder = TestCompressibleColumnBuilder(columnStats, columnType, 
RunLengthEncoding)
+      val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 2)
+
+      builder.initialize(0)
+      builder.appendFrom(rows(0), 0)
+      builder.appendFrom(rows(0), 0)
+      builder.appendFrom(rows(1), 0)
+      builder.appendFrom(rows(1), 0)
+
+      val buffer = builder.build()
+      val headerSize = CompressionScheme.columnHeaderSize(buffer)
+      // 4 extra bytes each run for run length
+      val compressedSize = values.map(columnType.actualSize(_) + 4).sum
+      // 4 extra bytes for compression scheme type ID
+      expectResult(headerSize + 4 + compressedSize, "Wrong buffer 
capacity")(buffer.capacity)
+
+      // Skips column header
+      buffer.position(headerSize)
+      expectResult(RunLengthEncoding.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
+
+      Array(0, 1).foreach { i =>
+        expectResult(values(i), "Wrong column element 
value")(columnType.extract(buffer))
+        expectResult(2, "Wrong run length")(buffer.getInt())
+      }
+
+      // -------------
+      // Tests decoder
+      // -------------
+
+      // Rewinds, skips column header and 4 more bytes for compression scheme 
ID
+      buffer.rewind().position(headerSize + 4)
+
+      val decoder = new RunLengthEncoding.Decoder[T](buffer, columnType)
+
+      Array(0, 0, 1, 1).foreach { i =>
+        expectResult(values(i), "Wrong decoded value")(decoder.next())
+      }
+
+      assert(!decoder.hasNext)
+    }
+
+    test(s"$RunLengthEncoding with $typeName: run length == 1") {
+      // -------------
+      // Tests encoder
+      // -------------
+
+      val builder = TestCompressibleColumnBuilder(columnStats, columnType, 
RunLengthEncoding)
+      val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 2)
+
+      builder.initialize(0)
+      builder.appendFrom(rows(0), 0)
+      builder.appendFrom(rows(1), 0)
+
+      val buffer = builder.build()
+      val headerSize = CompressionScheme.columnHeaderSize(buffer)
+      // 4 bytes each run for run length
+      val compressedSize = values.map(columnType.actualSize(_) + 4).sum
+      // 4 bytes for compression scheme type ID
+      expectResult(headerSize + 4 + compressedSize, "Wrong buffer 
capacity")(buffer.capacity)
+
+      // Skips column header
+      buffer.position(headerSize)
+      expectResult(RunLengthEncoding.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
+
+      Array(0, 1).foreach { i =>
+        expectResult(values(i), "Wrong column element 
value")(columnType.extract(buffer))
+        expectResult(1, "Wrong run length")(buffer.getInt())
+      }
+
+      // -------------
+      // Tests decoder
+      // -------------
+
+      // Rewinds, skips column header and 4 more bytes for compression scheme 
ID
+      buffer.rewind().position(headerSize + 4)
+
+      val decoder = new RunLengthEncoding.Decoder[T](buffer, columnType)
+
+      Array(0, 1).foreach { i =>
+        expectResult(values(i), "Wrong decoded value")(decoder.next())
+      }
+
+      assert(!decoder.hasNext)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1faa5797/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
new file mode 100644
index 0000000..e0ec812
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar.compression
+
+import org.apache.spark.sql.catalyst.types.NativeType
+import org.apache.spark.sql.columnar._
+
+class TestCompressibleColumnBuilder[T <: NativeType](
+    override val columnStats: NativeColumnStats[T],
+    override val columnType: NativeColumnType[T],
+    override val schemes: Seq[CompressionScheme])
+  extends NativeColumnBuilder(columnStats, columnType)
+  with NullableColumnBuilder
+  with CompressibleColumnBuilder[T] {
+
+  override protected def isWorthCompressing(encoder: Encoder) = true
+}
+
+object TestCompressibleColumnBuilder {
+  def apply[T <: NativeType](
+      columnStats: NativeColumnStats[T],
+      columnType: NativeColumnType[T],
+      scheme: CompressionScheme) = {
+
+    new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme))
+  }
+}
+

Reply via email to