http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
deleted file mode 100644
index 6b74014..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql._
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.test.SQLTestData._
-
-class PartitionBatchPruningSuite extends SparkFunSuite with SharedSQLContext {
-  import testImplicits._
-
-  private lazy val originalColumnBatchSize = sqlContext.conf.columnBatchSize
-  private lazy val originalInMemoryPartitionPruning = 
sqlContext.conf.inMemoryPartitionPruning
-
-  override protected def beforeAll(): Unit = {
-    super.beforeAll()
-    // Make a table with 5 partitions, 2 batches per partition, 10 elements 
per batch
-    sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, 10)
-
-    val pruningData = sparkContext.makeRDD((1 to 100).map { key =>
-      val string = if (((key - 1) / 10) % 2 == 0) null else key.toString
-      TestData(key, string)
-    }, 5).toDF()
-    pruningData.registerTempTable("pruningData")
-
-    // Enable in-memory partition pruning
-    sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
-    // Enable in-memory table scan accumulators
-    sqlContext.setConf("spark.sql.inMemoryTableScanStatistics.enable", "true")
-    sqlContext.cacheTable("pruningData")
-  }
-
-  override protected def afterAll(): Unit = {
-    try {
-      sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
-      sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, 
originalInMemoryPartitionPruning)
-      sqlContext.uncacheTable("pruningData")
-    } finally {
-      super.afterAll()
-    }
-  }
-
-  // Comparisons
-  checkBatchPruning("SELECT key FROM pruningData WHERE key = 1", 1, 1)(Seq(1))
-  checkBatchPruning("SELECT key FROM pruningData WHERE 1 = key", 1, 1)(Seq(1))
-  checkBatchPruning("SELECT key FROM pruningData WHERE key < 12", 1, 2)(1 to 
11)
-  checkBatchPruning("SELECT key FROM pruningData WHERE key <= 11", 1, 2)(1 to 
11)
-  checkBatchPruning("SELECT key FROM pruningData WHERE key > 88", 1, 2)(89 to 
100)
-  checkBatchPruning("SELECT key FROM pruningData WHERE key >= 89", 1, 2)(89 to 
100)
-  checkBatchPruning("SELECT key FROM pruningData WHERE 12 > key", 1, 2)(1 to 
11)
-  checkBatchPruning("SELECT key FROM pruningData WHERE 11 >= key", 1, 2)(1 to 
11)
-  checkBatchPruning("SELECT key FROM pruningData WHERE 88 < key", 1, 2)(89 to 
100)
-  checkBatchPruning("SELECT key FROM pruningData WHERE 89 <= key", 1, 2)(89 to 
100)
-
-  // IS NULL
-  checkBatchPruning("SELECT key FROM pruningData WHERE value IS NULL", 5, 5) {
-    (1 to 10) ++ (21 to 30) ++ (41 to 50) ++ (61 to 70) ++ (81 to 90)
-  }
-
-  // IS NOT NULL
-  checkBatchPruning("SELECT key FROM pruningData WHERE value IS NOT NULL", 5, 
5) {
-    (11 to 20) ++ (31 to 40) ++ (51 to 60) ++ (71 to 80) ++ (91 to 100)
-  }
-
-  // Conjunction and disjunction
-  checkBatchPruning("SELECT key FROM pruningData WHERE key > 8 AND key <= 21", 
2, 3)(9 to 21)
-  checkBatchPruning("SELECT key FROM pruningData WHERE key < 2 OR key > 99", 
2, 2)(Seq(1, 100))
-  checkBatchPruning("SELECT key FROM pruningData WHERE key < 12 AND key IS NOT 
NULL", 1, 2)(1 to 11)
-  checkBatchPruning("SELECT key FROM pruningData WHERE key < 2 OR (key > 78 
AND key < 92)", 3, 4) {
-    Seq(1) ++ (79 to 91)
-  }
-  checkBatchPruning("SELECT key FROM pruningData WHERE NOT (key < 88)", 1, 2) {
-    // Although the `NOT` operator isn't supported directly, the optimizer can 
transform
-    // `NOT (a < b)` to `b >= a`
-    88 to 100
-  }
-
-  // With unsupported predicate
-  {
-    val seq = (1 to 30).mkString(", ")
-    checkBatchPruning(s"SELECT key FROM pruningData WHERE NOT (key IN 
($seq))", 5, 10)(31 to 100)
-    checkBatchPruning(s"SELECT key FROM pruningData WHERE NOT (key IN ($seq)) 
AND key > 88", 1, 2) {
-      89 to 100
-    }
-  }
-
-  def checkBatchPruning(
-      query: String,
-      expectedReadPartitions: Int,
-      expectedReadBatches: Int)(
-      expectedQueryResult: => Seq[Int]): Unit = {
-
-    test(query) {
-      val df = sql(query)
-      val queryExecution = df.queryExecution
-
-      assertResult(expectedQueryResult.toArray, s"Wrong query result: 
$queryExecution") {
-        df.collect().map(_(0)).toArray
-      }
-
-      val (readPartitions, readBatches) = 
df.queryExecution.executedPlan.collect {
-        case in: InMemoryColumnarTableScan => (in.readPartitions.value, 
in.readBatches.value)
-      }.head
-
-      assert(readBatches === expectedReadBatches, s"Wrong number of read 
batches: $queryExecution")
-      assert(
-        readPartitions === expectedReadPartitions,
-        s"Wrong number of read partitions: $queryExecution")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
deleted file mode 100644
index 9a2948c..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar.compression
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.columnar.ColumnarTestUtils._
-import org.apache.spark.sql.columnar.{BOOLEAN, NoopColumnStats}
-
-class BooleanBitSetSuite extends SparkFunSuite {
-  import BooleanBitSet._
-
-  def skeleton(count: Int) {
-    // -------------
-    // Tests encoder
-    // -------------
-
-    val builder = TestCompressibleColumnBuilder(new NoopColumnStats, BOOLEAN, 
BooleanBitSet)
-    val rows = Seq.fill[InternalRow](count)(makeRandomRow(BOOLEAN))
-    val values = rows.map(_.getBoolean(0))
-
-    rows.foreach(builder.appendFrom(_, 0))
-    val buffer = builder.build()
-
-    // Column type ID + null count + null positions
-    val headerSize = CompressionScheme.columnHeaderSize(buffer)
-
-    // Compression scheme ID + element count + bitset words
-    val compressedSize = 4 + 4 + {
-      val extra = if (count % BITS_PER_LONG == 0) 0 else 1
-      (count / BITS_PER_LONG + extra) * 8
-    }
-
-    // 4 extra bytes for compression scheme type ID
-    assertResult(headerSize + compressedSize, "Wrong buffer 
capacity")(buffer.capacity)
-
-    // Skips column header
-    buffer.position(headerSize)
-    assertResult(BooleanBitSet.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
-    assertResult(count, "Wrong element count")(buffer.getInt())
-
-    var word = 0: Long
-    for (i <- 0 until count) {
-      val bit = i % BITS_PER_LONG
-      word = if (bit == 0) buffer.getLong() else word
-      assertResult(values(i), s"Wrong value in compressed buffer, index=$i") {
-        (word & ((1: Long) << bit)) != 0
-      }
-    }
-
-    // -------------
-    // Tests decoder
-    // -------------
-
-    // Rewinds, skips column header and 4 more bytes for compression scheme ID
-    buffer.rewind().position(headerSize + 4)
-
-    val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
-    val mutableRow = new GenericMutableRow(1)
-    if (values.nonEmpty) {
-      values.foreach {
-        assert(decoder.hasNext)
-        assertResult(_, "Wrong decoded value") {
-          decoder.next(mutableRow, 0)
-          mutableRow.getBoolean(0)
-        }
-      }
-    }
-    assert(!decoder.hasNext)
-  }
-
-  test(s"$BooleanBitSet: empty") {
-    skeleton(0)
-  }
-
-  test(s"$BooleanBitSet: less than 1 word") {
-    skeleton(BITS_PER_LONG - 1)
-  }
-
-  test(s"$BooleanBitSet: exactly 1 word") {
-    skeleton(BITS_PER_LONG)
-  }
-
-  test(s"$BooleanBitSet: multiple whole words") {
-    skeleton(BITS_PER_LONG * 2)
-  }
-
-  test(s"$BooleanBitSet: multiple words and 1 more bit") {
-    skeleton(BITS_PER_LONG * 2 + 1)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
deleted file mode 100644
index acfab65..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar.compression
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.columnar._
-import org.apache.spark.sql.columnar.ColumnarTestUtils._
-import org.apache.spark.sql.types.AtomicType
-
-class DictionaryEncodingSuite extends SparkFunSuite {
-  testDictionaryEncoding(new IntColumnStats, INT)
-  testDictionaryEncoding(new LongColumnStats, LONG)
-  testDictionaryEncoding(new StringColumnStats, STRING)
-
-  def testDictionaryEncoding[T <: AtomicType](
-      columnStats: ColumnStats,
-      columnType: NativeColumnType[T]) {
-
-    val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
-
-    def buildDictionary(buffer: ByteBuffer) = {
-      (0 until buffer.getInt()).map(columnType.extract(buffer) -> 
_.toShort).toMap
-    }
-
-    def stableDistinct(seq: Seq[Int]): Seq[Int] = if (seq.isEmpty) {
-      Seq.empty
-    } else {
-      seq.head +: seq.tail.filterNot(_ == seq.head)
-    }
-
-    def skeleton(uniqueValueCount: Int, inputSeq: Seq[Int]) {
-      // -------------
-      // Tests encoder
-      // -------------
-
-      val builder = TestCompressibleColumnBuilder(columnStats, columnType, 
DictionaryEncoding)
-      val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 
uniqueValueCount)
-      val dictValues = stableDistinct(inputSeq)
-
-      inputSeq.foreach(i => builder.appendFrom(rows(i), 0))
-
-      if (dictValues.length > DictionaryEncoding.MAX_DICT_SIZE) {
-        withClue("Dictionary overflowed, compression should fail") {
-          intercept[Throwable] {
-            builder.build()
-          }
-        }
-      } else {
-        val buffer = builder.build()
-        val headerSize = CompressionScheme.columnHeaderSize(buffer)
-        // 4 extra bytes for dictionary size
-        val dictionarySize = 4 + rows.map(columnType.actualSize(_, 0)).sum
-        // 2 bytes for each `Short`
-        val compressedSize = 4 + dictionarySize + 2 * inputSeq.length
-        // 4 extra bytes for compression scheme type ID
-        assertResult(headerSize + compressedSize, "Wrong buffer 
capacity")(buffer.capacity)
-
-        // Skips column header
-        buffer.position(headerSize)
-        assertResult(DictionaryEncoding.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
-
-        val dictionary = buildDictionary(buffer).toMap
-
-        dictValues.foreach { i =>
-          assertResult(i, "Wrong dictionary entry") {
-            dictionary(values(i))
-          }
-        }
-
-        inputSeq.foreach { i =>
-          assertResult(i.toShort, "Wrong column element 
value")(buffer.getShort())
-        }
-
-        // -------------
-        // Tests decoder
-        // -------------
-
-        // Rewinds, skips column header and 4 more bytes for compression 
scheme ID
-        buffer.rewind().position(headerSize + 4)
-
-        val decoder = DictionaryEncoding.decoder(buffer, columnType)
-        val mutableRow = new GenericMutableRow(1)
-
-        if (inputSeq.nonEmpty) {
-          inputSeq.foreach { i =>
-            assert(decoder.hasNext)
-            assertResult(values(i), "Wrong decoded value") {
-              decoder.next(mutableRow, 0)
-              columnType.getField(mutableRow, 0)
-            }
-          }
-        }
-
-        assert(!decoder.hasNext)
-      }
-    }
-
-    test(s"$DictionaryEncoding with $typeName: empty") {
-      skeleton(0, Seq.empty)
-    }
-
-    test(s"$DictionaryEncoding with $typeName: simple case") {
-      skeleton(2, Seq(0, 1, 0, 1))
-    }
-
-    test(s"$DictionaryEncoding with $typeName: dictionary overflow") {
-      skeleton(DictionaryEncoding.MAX_DICT_SIZE + 1, 0 to 
DictionaryEncoding.MAX_DICT_SIZE)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
deleted file mode 100644
index 2111e9f..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar.compression
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.columnar._
-import org.apache.spark.sql.columnar.ColumnarTestUtils._
-import org.apache.spark.sql.types.IntegralType
-
-class IntegralDeltaSuite extends SparkFunSuite {
-  testIntegralDelta(new IntColumnStats, INT, IntDelta)
-  testIntegralDelta(new LongColumnStats, LONG, LongDelta)
-
-  def testIntegralDelta[I <: IntegralType](
-      columnStats: ColumnStats,
-      columnType: NativeColumnType[I],
-      scheme: CompressionScheme) {
-
-    def skeleton(input: Seq[I#InternalType]) {
-      // -------------
-      // Tests encoder
-      // -------------
-
-      val builder = TestCompressibleColumnBuilder(columnStats, columnType, 
scheme)
-      val deltas = if (input.isEmpty) {
-        Seq.empty[Long]
-      } else {
-        (input.tail, input.init).zipped.map {
-          case (x: Int, y: Int) => (x - y).toLong
-          case (x: Long, y: Long) => x - y
-        }
-      }
-
-      input.map { value =>
-        val row = new GenericMutableRow(1)
-        columnType.setField(row, 0, value)
-        builder.appendFrom(row, 0)
-      }
-
-      val buffer = builder.build()
-      // Column type ID + null count + null positions
-      val headerSize = CompressionScheme.columnHeaderSize(buffer)
-
-      // Compression scheme ID + compressed contents
-      val compressedSize = 4 + (if (deltas.isEmpty) {
-        0
-      } else {
-        val oneBoolean = columnType.defaultSize
-        1 + oneBoolean + deltas.map {
-          d => if (math.abs(d) <= Byte.MaxValue) 1 else 1 + oneBoolean
-        }.sum
-      })
-
-      // 4 extra bytes for compression scheme type ID
-      assertResult(headerSize + compressedSize, "Wrong buffer 
capacity")(buffer.capacity)
-
-      buffer.position(headerSize)
-      assertResult(scheme.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
-
-      if (input.nonEmpty) {
-        assertResult(Byte.MinValue, "The first byte should be an escaping 
mark")(buffer.get())
-        assertResult(input.head, "The first value is 
wrong")(columnType.extract(buffer))
-
-        (input.tail, deltas).zipped.foreach { (value, delta) =>
-          if (math.abs(delta) <= Byte.MaxValue) {
-            assertResult(delta, "Wrong delta")(buffer.get())
-          } else {
-            assertResult(Byte.MinValue, "Expecting escaping mark 
here")(buffer.get())
-            assertResult(value, "Wrong value")(columnType.extract(buffer))
-          }
-        }
-      }
-
-      // -------------
-      // Tests decoder
-      // -------------
-
-      // Rewinds, skips column header and 4 more bytes for compression scheme 
ID
-      buffer.rewind().position(headerSize + 4)
-
-      val decoder = scheme.decoder(buffer, columnType)
-      val mutableRow = new GenericMutableRow(1)
-
-      if (input.nonEmpty) {
-        input.foreach{
-          assert(decoder.hasNext)
-          assertResult(_, "Wrong decoded value") {
-            decoder.next(mutableRow, 0)
-            columnType.getField(mutableRow, 0)
-          }
-        }
-      }
-      assert(!decoder.hasNext)
-    }
-
-    test(s"$scheme: empty column") {
-      skeleton(Seq.empty)
-    }
-
-    test(s"$scheme: simple case") {
-      val input = columnType match {
-        case INT => Seq(2: Int, 1: Int, 2: Int, 130: Int)
-        case LONG => Seq(2: Long, 1: Long, 2: Long, 130: Long)
-      }
-
-      skeleton(input.map(_.asInstanceOf[I#InternalType]))
-    }
-
-    test(s"$scheme: long random series") {
-      // Have to workaround with `Any` since no `ClassTag[I#JvmType]` 
available here.
-      val input = Array.fill[Any](10000)(makeRandomValue(columnType))
-      skeleton(input.map(_.asInstanceOf[I#InternalType]))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
deleted file mode 100644
index 67ec08f..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar.compression
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.columnar._
-import org.apache.spark.sql.columnar.ColumnarTestUtils._
-import org.apache.spark.sql.types.AtomicType
-
-class RunLengthEncodingSuite extends SparkFunSuite {
-  testRunLengthEncoding(new NoopColumnStats, BOOLEAN)
-  testRunLengthEncoding(new ByteColumnStats, BYTE)
-  testRunLengthEncoding(new ShortColumnStats, SHORT)
-  testRunLengthEncoding(new IntColumnStats, INT)
-  testRunLengthEncoding(new LongColumnStats, LONG)
-  testRunLengthEncoding(new StringColumnStats, STRING)
-
-  def testRunLengthEncoding[T <: AtomicType](
-      columnStats: ColumnStats,
-      columnType: NativeColumnType[T]) {
-
-    val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
-
-    def skeleton(uniqueValueCount: Int, inputRuns: Seq[(Int, Int)]) {
-      // -------------
-      // Tests encoder
-      // -------------
-
-      val builder = TestCompressibleColumnBuilder(columnStats, columnType, 
RunLengthEncoding)
-      val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 
uniqueValueCount)
-      val inputSeq = inputRuns.flatMap { case (index, run) =>
-        Seq.fill(run)(index)
-      }
-
-      inputSeq.foreach(i => builder.appendFrom(rows(i), 0))
-      val buffer = builder.build()
-
-      // Column type ID + null count + null positions
-      val headerSize = CompressionScheme.columnHeaderSize(buffer)
-
-      // Compression scheme ID + compressed contents
-      val compressedSize = 4 + inputRuns.map { case (index, _) =>
-        // 4 extra bytes each run for run length
-        columnType.actualSize(rows(index), 0) + 4
-      }.sum
-
-      // 4 extra bytes for compression scheme type ID
-      assertResult(headerSize + compressedSize, "Wrong buffer 
capacity")(buffer.capacity)
-
-      // Skips column header
-      buffer.position(headerSize)
-      assertResult(RunLengthEncoding.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
-
-      inputRuns.foreach { case (index, run) =>
-        assertResult(values(index), "Wrong column element 
value")(columnType.extract(buffer))
-        assertResult(run, "Wrong run length")(buffer.getInt())
-      }
-
-      // -------------
-      // Tests decoder
-      // -------------
-
-      // Rewinds, skips column header and 4 more bytes for compression scheme 
ID
-      buffer.rewind().position(headerSize + 4)
-
-      val decoder = RunLengthEncoding.decoder(buffer, columnType)
-      val mutableRow = new GenericMutableRow(1)
-
-      if (inputSeq.nonEmpty) {
-        inputSeq.foreach { i =>
-          assert(decoder.hasNext)
-          assertResult(values(i), "Wrong decoded value") {
-            decoder.next(mutableRow, 0)
-            columnType.getField(mutableRow, 0)
-          }
-        }
-      }
-
-      assert(!decoder.hasNext)
-    }
-
-    test(s"$RunLengthEncoding with $typeName: empty column") {
-      skeleton(0, Seq.empty)
-    }
-
-    test(s"$RunLengthEncoding with $typeName: simple case") {
-      skeleton(2, Seq(0 -> 2, 1 ->2))
-    }
-
-    test(s"$RunLengthEncoding with $typeName: run length == 1") {
-      skeleton(2, Seq(0 -> 1, 1 ->1))
-    }
-
-    test(s"$RunLengthEncoding with $typeName: single long run") {
-      skeleton(1, Seq(0 -> 1000))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
deleted file mode 100644
index 5268dfe..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar.compression
-
-import org.apache.spark.sql.columnar._
-import org.apache.spark.sql.types.AtomicType
-
-class TestCompressibleColumnBuilder[T <: AtomicType](
-    override val columnStats: ColumnStats,
-    override val columnType: NativeColumnType[T],
-    override val schemes: Seq[CompressionScheme])
-  extends NativeColumnBuilder(columnStats, columnType)
-  with NullableColumnBuilder
-  with CompressibleColumnBuilder[T] {
-
-  override protected def isWorthCompressing(encoder: Encoder[T]) = true
-}
-
-object TestCompressibleColumnBuilder {
-  def apply[T <: AtomicType](
-      columnStats: ColumnStats,
-      columnType: NativeColumnType[T],
-      scheme: CompressionScheme): TestCompressibleColumnBuilder[T] = {
-
-    val builder = new TestCompressibleColumnBuilder(columnStats, columnType, 
Seq(scheme))
-    builder.initialize(0, "", useCompression = true)
-    builder
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnStatsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnStatsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnStatsSuite.scala
new file mode 100644
index 0000000..b2d04f7
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnStatsSuite.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types._
+
+class ColumnStatsSuite extends SparkFunSuite {
+  testColumnStats(classOf[BooleanColumnStats], BOOLEAN, createRow(true, false, 
0))
+  testColumnStats(classOf[ByteColumnStats], BYTE, createRow(Byte.MaxValue, 
Byte.MinValue, 0))
+  testColumnStats(classOf[ShortColumnStats], SHORT, createRow(Short.MaxValue, 
Short.MinValue, 0))
+  testColumnStats(classOf[IntColumnStats], INT, createRow(Int.MaxValue, 
Int.MinValue, 0))
+  testColumnStats(classOf[LongColumnStats], LONG, createRow(Long.MaxValue, 
Long.MinValue, 0))
+  testColumnStats(classOf[FloatColumnStats], FLOAT, createRow(Float.MaxValue, 
Float.MinValue, 0))
+  testColumnStats(classOf[DoubleColumnStats], DOUBLE,
+    createRow(Double.MaxValue, Double.MinValue, 0))
+  testColumnStats(classOf[StringColumnStats], STRING, createRow(null, null, 0))
+  testDecimalColumnStats(createRow(null, null, 0))
+
+  def createRow(values: Any*): GenericInternalRow = new 
GenericInternalRow(values.toArray)
+
+  def testColumnStats[T <: AtomicType, U <: ColumnStats](
+      columnStatsClass: Class[U],
+      columnType: NativeColumnType[T],
+      initialStatistics: GenericInternalRow): Unit = {
+
+    val columnStatsName = columnStatsClass.getSimpleName
+
+    test(s"$columnStatsName: empty") {
+      val columnStats = columnStatsClass.newInstance()
+      
columnStats.collectedStatistics.values.zip(initialStatistics.values).foreach {
+        case (actual, expected) => assert(actual === expected)
+      }
+    }
+
+    test(s"$columnStatsName: non-empty") {
+      import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
+
+      val columnStats = columnStatsClass.newInstance()
+      val rows = Seq.fill(10)(makeRandomRow(columnType)) ++ 
Seq.fill(10)(makeNullRow(1))
+      rows.foreach(columnStats.gatherStats(_, 0))
+
+      val values = rows.take(10).map(_.get(0, 
columnType.dataType).asInstanceOf[T#InternalType])
+      val ordering = 
columnType.dataType.ordering.asInstanceOf[Ordering[T#InternalType]]
+      val stats = columnStats.collectedStatistics
+
+      assertResult(values.min(ordering), "Wrong lower bound")(stats.values(0))
+      assertResult(values.max(ordering), "Wrong upper bound")(stats.values(1))
+      assertResult(10, "Wrong null count")(stats.values(2))
+      assertResult(20, "Wrong row count")(stats.values(3))
+      assertResult(stats.values(4), "Wrong size in bytes") {
+        rows.map { row =>
+          if (row.isNullAt(0)) 4 else columnType.actualSize(row, 0)
+        }.sum
+      }
+    }
+  }
+
+  def testDecimalColumnStats[T <: AtomicType, U <: ColumnStats](
+      initialStatistics: GenericInternalRow): Unit = {
+
+    val columnStatsName = classOf[DecimalColumnStats].getSimpleName
+    val columnType = COMPACT_DECIMAL(15, 10)
+
+    test(s"$columnStatsName: empty") {
+      val columnStats = new DecimalColumnStats(15, 10)
+      
columnStats.collectedStatistics.values.zip(initialStatistics.values).foreach {
+        case (actual, expected) => assert(actual === expected)
+      }
+    }
+
+    test(s"$columnStatsName: non-empty") {
+      import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
+
+      val columnStats = new DecimalColumnStats(15, 10)
+      val rows = Seq.fill(10)(makeRandomRow(columnType)) ++ 
Seq.fill(10)(makeNullRow(1))
+      rows.foreach(columnStats.gatherStats(_, 0))
+
+      val values = rows.take(10).map(_.get(0, 
columnType.dataType).asInstanceOf[T#InternalType])
+      val ordering = 
columnType.dataType.ordering.asInstanceOf[Ordering[T#InternalType]]
+      val stats = columnStats.collectedStatistics
+
+      assertResult(values.min(ordering), "Wrong lower bound")(stats.values(0))
+      assertResult(values.max(ordering), "Wrong upper bound")(stats.values(1))
+      assertResult(10, "Wrong null count")(stats.values(2))
+      assertResult(20, "Wrong row count")(stats.values(3))
+      assertResult(stats.values(4), "Wrong size in bytes") {
+        rows.map { row =>
+          if (row.isNullAt(0)) 4 else columnType.actualSize(row, 0)
+        }.sum
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
new file mode 100644
index 0000000..34dd969
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import java.nio.{ByteOrder, ByteBuffer}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, 
GenericMutableRow}
+import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
+import org.apache.spark.sql.types._
+import org.apache.spark.{Logging, SparkFunSuite}
+
+
+class ColumnTypeSuite extends SparkFunSuite with Logging {
+  private val DEFAULT_BUFFER_SIZE = 512
+  private val MAP_TYPE = MAP(MapType(IntegerType, StringType))
+  private val ARRAY_TYPE = ARRAY(ArrayType(IntegerType))
+  private val STRUCT_TYPE = STRUCT(StructType(StructField("a", StringType) :: 
Nil))
+
+  test("defaultSize") {
+    val checks = Map(
+      NULL-> 0, BOOLEAN -> 1, BYTE -> 1, SHORT -> 2, INT -> 4, LONG -> 8,
+      FLOAT -> 4, DOUBLE -> 8, COMPACT_DECIMAL(15, 10) -> 8, LARGE_DECIMAL(20, 
10) -> 12,
+      STRING -> 8, BINARY -> 16, STRUCT_TYPE -> 20, ARRAY_TYPE -> 16, MAP_TYPE 
-> 32)
+
+    checks.foreach { case (columnType, expectedSize) =>
+      assertResult(expectedSize, s"Wrong defaultSize for $columnType") {
+        columnType.defaultSize
+      }
+    }
+  }
+
+  test("actualSize") {
+    def checkActualSize(
+        columnType: ColumnType[_],
+        value: Any,
+        expected: Int): Unit = {
+
+      assertResult(expected, s"Wrong actualSize for $columnType") {
+        val row = new GenericMutableRow(1)
+        row.update(0, CatalystTypeConverters.convertToCatalyst(value))
+        val proj = 
UnsafeProjection.create(Array[DataType](columnType.dataType))
+        columnType.actualSize(proj(row), 0)
+      }
+    }
+
+    checkActualSize(NULL, null, 0)
+    checkActualSize(BOOLEAN, true, 1)
+    checkActualSize(BYTE, Byte.MaxValue, 1)
+    checkActualSize(SHORT, Short.MaxValue, 2)
+    checkActualSize(INT, Int.MaxValue, 4)
+    checkActualSize(LONG, Long.MaxValue, 8)
+    checkActualSize(FLOAT, Float.MaxValue, 4)
+    checkActualSize(DOUBLE, Double.MaxValue, 8)
+    checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length)
+    checkActualSize(BINARY, Array.fill[Byte](4)(0.toByte), 4 + 4)
+    checkActualSize(COMPACT_DECIMAL(15, 10), Decimal(0, 15, 10), 8)
+    checkActualSize(LARGE_DECIMAL(20, 10), Decimal(0, 20, 10), 5)
+    checkActualSize(ARRAY_TYPE, Array[Any](1), 16)
+    checkActualSize(MAP_TYPE, Map(1 -> "a"), 29)
+    checkActualSize(STRUCT_TYPE, Row("hello"), 28)
+  }
+
+  testNativeColumnType(BOOLEAN)
+  testNativeColumnType(BYTE)
+  testNativeColumnType(SHORT)
+  testNativeColumnType(INT)
+  testNativeColumnType(LONG)
+  testNativeColumnType(FLOAT)
+  testNativeColumnType(DOUBLE)
+  testNativeColumnType(COMPACT_DECIMAL(15, 10))
+  testNativeColumnType(STRING)
+
+  testColumnType(NULL)
+  testColumnType(BINARY)
+  testColumnType(LARGE_DECIMAL(20, 10))
+  testColumnType(STRUCT_TYPE)
+  testColumnType(ARRAY_TYPE)
+  testColumnType(MAP_TYPE)
+
+  def testNativeColumnType[T <: AtomicType](columnType: NativeColumnType[T]): 
Unit = {
+    testColumnType[T#InternalType](columnType)
+  }
+
+  def testColumnType[JvmType](columnType: ColumnType[JvmType]): Unit = {
+
+    val buffer = 
ByteBuffer.allocate(DEFAULT_BUFFER_SIZE).order(ByteOrder.nativeOrder())
+    val proj = UnsafeProjection.create(Array[DataType](columnType.dataType))
+    val converter = 
CatalystTypeConverters.createToScalaConverter(columnType.dataType)
+    val seq = (0 until 4).map(_ => proj(makeRandomRow(columnType)).copy())
+
+    test(s"$columnType append/extract") {
+      buffer.rewind()
+      seq.foreach(columnType.append(_, 0, buffer))
+
+      buffer.rewind()
+      seq.foreach { row =>
+        logInfo("buffer = " + buffer + ", expected = " + row)
+        val expected = converter(row.get(0, columnType.dataType))
+        val extracted = converter(columnType.extract(buffer))
+        assert(expected === extracted,
+          s"Extracted value didn't equal to the original one. $expected != 
$extracted, buffer =" +
+          dumpBuffer(buffer.duplicate().rewind().asInstanceOf[ByteBuffer]))
+      }
+    }
+  }
+
+  private def dumpBuffer(buff: ByteBuffer): Any = {
+    val sb = new StringBuilder()
+    while (buff.hasRemaining) {
+      val b = buff.get()
+      sb.append(Integer.toHexString(b & 0xff)).append(' ')
+    }
+    if (sb.nonEmpty) sb.setLength(sb.length - 1)
+    sb.toString()
+  }
+
+  test("column type for decimal types with different precision") {
+    (1 to 18).foreach { i =>
+      assertResult(COMPACT_DECIMAL(i, 0)) {
+        ColumnType(DecimalType(i, 0))
+      }
+    }
+
+    assertResult(LARGE_DECIMAL(19, 0)) {
+      ColumnType(DecimalType(19, 0))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
new file mode 100644
index 0000000..9cae65e
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import scala.collection.immutable.HashSet
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
GenericMutableRow}
+import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayBasedMapData}
+import org.apache.spark.sql.types.{AtomicType, Decimal}
+import org.apache.spark.unsafe.types.UTF8String
+
+object ColumnarTestUtils {
+  def makeNullRow(length: Int): GenericMutableRow = {
+    val row = new GenericMutableRow(length)
+    (0 until length).foreach(row.setNullAt)
+    row
+  }
+
+  def makeRandomValue[JvmType](columnType: ColumnType[JvmType]): JvmType = {
+    def randomBytes(length: Int) = {
+      val bytes = new Array[Byte](length)
+      Random.nextBytes(bytes)
+      bytes
+    }
+
+    (columnType match {
+      case NULL => null
+      case BOOLEAN => Random.nextBoolean()
+      case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte
+      case SHORT => (Random.nextInt(Short.MaxValue * 2) - 
Short.MaxValue).toShort
+      case INT => Random.nextInt()
+      case LONG => Random.nextLong()
+      case FLOAT => Random.nextFloat()
+      case DOUBLE => Random.nextDouble()
+      case STRING => 
UTF8String.fromString(Random.nextString(Random.nextInt(32)))
+      case BINARY => randomBytes(Random.nextInt(32))
+      case COMPACT_DECIMAL(precision, scale) => Decimal(Random.nextLong() % 
100, precision, scale)
+      case LARGE_DECIMAL(precision, scale) => Decimal(Random.nextLong(), 
precision, scale)
+      case STRUCT(_) =>
+        new 
GenericInternalRow(Array[Any](UTF8String.fromString(Random.nextString(10))))
+      case ARRAY(_) =>
+        new GenericArrayData(Array[Any](Random.nextInt(), Random.nextInt()))
+      case MAP(_) =>
+        ArrayBasedMapData(
+          Map(Random.nextInt() -> 
UTF8String.fromString(Random.nextString(Random.nextInt(32)))))
+    }).asInstanceOf[JvmType]
+  }
+
+  def makeRandomValues(
+      head: ColumnType[_],
+      tail: ColumnType[_]*): Seq[Any] = makeRandomValues(Seq(head) ++ tail)
+
+  def makeRandomValues(columnTypes: Seq[ColumnType[_]]): Seq[Any] = {
+    columnTypes.map(makeRandomValue(_))
+  }
+
+  def makeUniqueRandomValues[JvmType](
+      columnType: ColumnType[JvmType],
+      count: Int): Seq[JvmType] = {
+
+    Iterator.iterate(HashSet.empty[JvmType]) { set =>
+      set + 
Iterator.continually(makeRandomValue(columnType)).filterNot(set.contains).next()
+    }.drop(count).next().toSeq
+  }
+
+  def makeRandomRow(
+      head: ColumnType[_],
+      tail: ColumnType[_]*): InternalRow = makeRandomRow(Seq(head) ++ tail)
+
+  def makeRandomRow(columnTypes: Seq[ColumnType[_]]): InternalRow = {
+    val row = new GenericMutableRow(columnTypes.length)
+    makeRandomValues(columnTypes).zipWithIndex.foreach { case (value, index) =>
+      row(index) = value
+    }
+    row
+  }
+
+  def makeUniqueValuesAndSingleValueRows[T <: AtomicType](
+      columnType: NativeColumnType[T],
+      count: Int): (Seq[T#InternalType], Seq[GenericMutableRow]) = {
+
+    val values = makeUniqueRandomValues(columnType, count)
+    val rows = values.map { value =>
+      val row = new GenericMutableRow(1)
+      row(0) = value
+      row
+    }
+
+    (values, rows)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
new file mode 100644
index 0000000..25afed2
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.test.SQLTestData._
+import org.apache.spark.sql.types._
+import org.apache.spark.storage.StorageLevel.MEMORY_ONLY
+
+class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  setupTestData()
+
+  test("simple columnar query") {
+    val plan = sqlContext.executePlan(testData.logicalPlan).executedPlan
+    val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, 
None)
+
+    checkAnswer(scan, testData.collect().toSeq)
+  }
+
+  test("default size avoids broadcast") {
+    // TODO: Improve this test when we have better statistics
+    sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString))
+      .toDF().registerTempTable("sizeTst")
+    sqlContext.cacheTable("sizeTst")
+    assert(
+      
sqlContext.table("sizeTst").queryExecution.analyzed.statistics.sizeInBytes >
+        sqlContext.conf.autoBroadcastJoinThreshold)
+  }
+
+  test("projection") {
+    val plan = sqlContext.executePlan(testData.select('value, 
'key).logicalPlan).executedPlan
+    val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, 
None)
+
+    checkAnswer(scan, testData.collect().map {
+      case Row(key: Int, value: String) => value -> key
+    }.map(Row.fromTuple))
+  }
+
+  test("SPARK-1436 regression: in-memory columns must be able to be accessed 
multiple times") {
+    val plan = sqlContext.executePlan(testData.logicalPlan).executedPlan
+    val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, 
None)
+
+    checkAnswer(scan, testData.collect().toSeq)
+    checkAnswer(scan, testData.collect().toSeq)
+  }
+
+  test("SPARK-1678 regression: compression must not lose repeated values") {
+    checkAnswer(
+      sql("SELECT * FROM repeatedData"),
+      repeatedData.collect().toSeq.map(Row.fromTuple))
+
+    sqlContext.cacheTable("repeatedData")
+
+    checkAnswer(
+      sql("SELECT * FROM repeatedData"),
+      repeatedData.collect().toSeq.map(Row.fromTuple))
+  }
+
+  test("with null values") {
+    checkAnswer(
+      sql("SELECT * FROM nullableRepeatedData"),
+      nullableRepeatedData.collect().toSeq.map(Row.fromTuple))
+
+    sqlContext.cacheTable("nullableRepeatedData")
+
+    checkAnswer(
+      sql("SELECT * FROM nullableRepeatedData"),
+      nullableRepeatedData.collect().toSeq.map(Row.fromTuple))
+  }
+
+  test("SPARK-2729 regression: timestamp data type") {
+    val timestamps = (0 to 3).map(i => Tuple1(new Timestamp(i))).toDF("time")
+    timestamps.registerTempTable("timestamps")
+
+    checkAnswer(
+      sql("SELECT time FROM timestamps"),
+      timestamps.collect().toSeq)
+
+    sqlContext.cacheTable("timestamps")
+
+    checkAnswer(
+      sql("SELECT time FROM timestamps"),
+      timestamps.collect().toSeq)
+  }
+
+  test("SPARK-3320 regression: batched column buffer building should work with 
empty partitions") {
+    checkAnswer(
+      sql("SELECT * FROM withEmptyParts"),
+      withEmptyParts.collect().toSeq.map(Row.fromTuple))
+
+    sqlContext.cacheTable("withEmptyParts")
+
+    checkAnswer(
+      sql("SELECT * FROM withEmptyParts"),
+      withEmptyParts.collect().toSeq.map(Row.fromTuple))
+  }
+
+  test("SPARK-4182 Caching complex types") {
+    complexData.cache().count()
+    // Shouldn't throw
+    complexData.count()
+    complexData.unpersist()
+  }
+
+  test("decimal type") {
+    // Casting is required here because ScalaReflection can't capture decimal 
precision information.
+    val df = (1 to 10)
+      .map(i => Tuple1(Decimal(i, 15, 10)))
+      .toDF("dec")
+      .select($"dec" cast DecimalType(15, 10))
+
+    assert(df.schema.head.dataType === DecimalType(15, 10))
+
+    df.cache().registerTempTable("test_fixed_decimal")
+    checkAnswer(
+      sql("SELECT * FROM test_fixed_decimal"),
+      (1 to 10).map(i => Row(Decimal(i, 15, 10).toJavaBigDecimal)))
+  }
+
+  test("test different data types") {
+    // Create the schema.
+    val struct =
+      StructType(
+        StructField("f1", FloatType, true) ::
+        StructField("f2", ArrayType(BooleanType), true) :: Nil)
+    val dataTypes =
+      Seq(StringType, BinaryType, NullType, BooleanType,
+        ByteType, ShortType, IntegerType, LongType,
+        FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
+        DateType, TimestampType,
+        ArrayType(IntegerType), MapType(StringType, LongType), struct)
+    val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
+      StructField(s"col$index", dataType, true)
+    }
+    val allColumns = fields.map(_.name).mkString(",")
+    val schema = StructType(fields)
+
+    // Create a RDD for the schema
+    val rdd =
+      sparkContext.parallelize((1 to 10000), 10).map { i =>
+        Row(
+          s"str${i}: test cache.",
+          s"binary${i}: test cache.".getBytes("UTF-8"),
+          null,
+          i % 2 == 0,
+          i.toByte,
+          i.toShort,
+          i,
+          Long.MaxValue - i.toLong,
+          (i + 0.25).toFloat,
+          (i + 0.75),
+          BigDecimal(Long.MaxValue.toString + ".12345"),
+          new java.math.BigDecimal(s"${i % 9 + 1}" + ".23456"),
+          new Date(i),
+          new Timestamp(i * 1000000L),
+          (i to i + 10).toSeq,
+          (i to i + 10).map(j => s"map_key_$j" -> (Long.MaxValue - j)).toMap,
+          Row((i - 0.25).toFloat, Seq(true, false, null)))
+      }
+    sqlContext.createDataFrame(rdd, 
schema).registerTempTable("InMemoryCache_different_data_types")
+    // Cache the table.
+    sql("cache table InMemoryCache_different_data_types")
+    // Make sure the table is indeed cached.
+    
sqlContext.table("InMemoryCache_different_data_types").queryExecution.executedPlan
+    assert(
+      sqlContext.isCached("InMemoryCache_different_data_types"),
+      "InMemoryCache_different_data_types should be cached.")
+    // Issue a query and check the results.
+    checkAnswer(
+      sql(s"SELECT DISTINCT ${allColumns} FROM 
InMemoryCache_different_data_types"),
+      sqlContext.table("InMemoryCache_different_data_types").collect())
+    sqlContext.dropTempTable("InMemoryCache_different_data_types")
+  }
+
+  test("SPARK-10422: String column in InMemoryColumnarCache needs to override 
clone method") {
+    val df = sqlContext.range(1, 100).selectExpr("id % 10 as id")
+      .rdd.map(id => Tuple1(s"str_$id")).toDF("i")
+    val cached = df.cache()
+    // count triggers the caching action. It should not throw.
+    cached.count()
+
+    // Make sure, the DataFrame is indeed cached.
+    assert(sqlContext.cacheManager.lookupCachedData(cached).nonEmpty)
+
+    // Check result.
+    checkAnswer(
+      cached,
+      sqlContext.range(1, 100).selectExpr("id % 10 as id")
+        .rdd.map(id => Tuple1(s"str_$id")).toDF("i")
+    )
+
+    // Drop the cache.
+    cached.unpersist()
+  }
+
+  test("SPARK-10859: Predicates pushed to InMemoryColumnarTableScan are not 
evaluated correctly") {
+    val data = sqlContext.range(10).selectExpr("id", "cast(id as string) as s")
+    data.cache()
+    assert(data.count() === 10)
+    assert(data.filter($"s" === "3").count() === 1)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
new file mode 100644
index 0000000..35dc9a2
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import java.nio.ByteBuffer
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, 
GenericMutableRow}
+import org.apache.spark.sql.types._
+
+class TestNullableColumnAccessor[JvmType](
+    buffer: ByteBuffer,
+    columnType: ColumnType[JvmType])
+  extends BasicColumnAccessor(buffer, columnType)
+  with NullableColumnAccessor
+
+object TestNullableColumnAccessor {
+  def apply[JvmType](buffer: ByteBuffer, columnType: ColumnType[JvmType])
+    : TestNullableColumnAccessor[JvmType] = {
+    new TestNullableColumnAccessor(buffer, columnType)
+  }
+}
+
+class NullableColumnAccessorSuite extends SparkFunSuite {
+  import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
+
+  Seq(
+    NULL, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE,
+    STRING, BINARY, COMPACT_DECIMAL(15, 10), LARGE_DECIMAL(20, 10),
+    STRUCT(StructType(StructField("a", StringType) :: Nil)),
+    ARRAY(ArrayType(IntegerType)), MAP(MapType(IntegerType, StringType)))
+    .foreach {
+    testNullableColumnAccessor(_)
+  }
+
+  def testNullableColumnAccessor[JvmType](
+      columnType: ColumnType[JvmType]): Unit = {
+
+    val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
+    val nullRow = makeNullRow(1)
+
+    test(s"Nullable $typeName column accessor: empty column") {
+      val builder = TestNullableColumnBuilder(columnType)
+      val accessor = TestNullableColumnAccessor(builder.build(), columnType)
+      assert(!accessor.hasNext)
+    }
+
+    test(s"Nullable $typeName column accessor: access null values") {
+      val builder = TestNullableColumnBuilder(columnType)
+      val randomRow = makeRandomRow(columnType)
+      val proj = UnsafeProjection.create(Array[DataType](columnType.dataType))
+
+      (0 until 4).foreach { _ =>
+        builder.appendFrom(proj(randomRow), 0)
+        builder.appendFrom(proj(nullRow), 0)
+      }
+
+      val accessor = TestNullableColumnAccessor(builder.build(), columnType)
+      val row = new GenericMutableRow(1)
+      val converter = 
CatalystTypeConverters.createToScalaConverter(columnType.dataType)
+
+      (0 until 4).foreach { _ =>
+        assert(accessor.hasNext)
+        accessor.extractTo(row, 0)
+        assert(converter(row.get(0, columnType.dataType))
+          === converter(randomRow.get(0, columnType.dataType)))
+
+        assert(accessor.hasNext)
+        accessor.extractTo(row, 0)
+        assert(row.isNullAt(0))
+      }
+
+      assert(!accessor.hasNext)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
new file mode 100644
index 0000000..93be3e1
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, 
GenericMutableRow}
+import org.apache.spark.sql.types._
+
+class TestNullableColumnBuilder[JvmType](columnType: ColumnType[JvmType])
+  extends BasicColumnBuilder[JvmType](new NoopColumnStats, columnType)
+  with NullableColumnBuilder
+
+object TestNullableColumnBuilder {
+  def apply[JvmType](columnType: ColumnType[JvmType], initialSize: Int = 0)
+    : TestNullableColumnBuilder[JvmType] = {
+    val builder = new TestNullableColumnBuilder(columnType)
+    builder.initialize(initialSize)
+    builder
+  }
+}
+
+class NullableColumnBuilderSuite extends SparkFunSuite {
+  import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
+
+  Seq(
+    BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE,
+    STRING, BINARY, COMPACT_DECIMAL(15, 10), LARGE_DECIMAL(20, 10),
+    STRUCT(StructType(StructField("a", StringType) :: Nil)),
+    ARRAY(ArrayType(IntegerType)), MAP(MapType(IntegerType, StringType)))
+    .foreach {
+    testNullableColumnBuilder(_)
+  }
+
+  def testNullableColumnBuilder[JvmType](
+      columnType: ColumnType[JvmType]): Unit = {
+
+    val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
+    val dataType = columnType.dataType
+    val proj = UnsafeProjection.create(Array[DataType](dataType))
+    val converter = CatalystTypeConverters.createToScalaConverter(dataType)
+
+    test(s"$typeName column builder: empty column") {
+      val columnBuilder = TestNullableColumnBuilder(columnType)
+      val buffer = columnBuilder.build()
+
+      assertResult(0, "Wrong null count")(buffer.getInt())
+      assert(!buffer.hasRemaining)
+    }
+
+    test(s"$typeName column builder: buffer size auto growth") {
+      val columnBuilder = TestNullableColumnBuilder(columnType)
+      val randomRow = makeRandomRow(columnType)
+
+      (0 until 4).foreach { _ =>
+        columnBuilder.appendFrom(proj(randomRow), 0)
+      }
+
+      val buffer = columnBuilder.build()
+
+      assertResult(0, "Wrong null count")(buffer.getInt())
+    }
+
+    test(s"$typeName column builder: null values") {
+      val columnBuilder = TestNullableColumnBuilder(columnType)
+      val randomRow = makeRandomRow(columnType)
+      val nullRow = makeNullRow(1)
+
+      (0 until 4).foreach { _ =>
+        columnBuilder.appendFrom(proj(randomRow), 0)
+        columnBuilder.appendFrom(proj(nullRow), 0)
+      }
+
+      val buffer = columnBuilder.build()
+
+      assertResult(4, "Wrong null count")(buffer.getInt())
+
+      // For null positions
+      (1 to 7 by 2).foreach(assertResult(_, "Wrong null 
position")(buffer.getInt()))
+
+      // For non-null values
+      val actual = new GenericMutableRow(new Array[Any](1))
+      (0 until 4).foreach { _ =>
+        columnType.extract(buffer, actual, 0)
+        assert(converter(actual.get(0, dataType)) === 
converter(randomRow.get(0, dataType)),
+          "Extracted value didn't equal to the original one")
+      }
+
+      assert(!buffer.hasRemaining)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
new file mode 100644
index 0000000..d762f7b
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.test.SQLTestData._
+
+class PartitionBatchPruningSuite extends SparkFunSuite with SharedSQLContext {
+  import testImplicits._
+
+  private lazy val originalColumnBatchSize = sqlContext.conf.columnBatchSize
+  private lazy val originalInMemoryPartitionPruning = 
sqlContext.conf.inMemoryPartitionPruning
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    // Make a table with 5 partitions, 2 batches per partition, 10 elements 
per batch
+    sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, 10)
+
+    val pruningData = sparkContext.makeRDD((1 to 100).map { key =>
+      val string = if (((key - 1) / 10) % 2 == 0) null else key.toString
+      TestData(key, string)
+    }, 5).toDF()
+    pruningData.registerTempTable("pruningData")
+
+    // Enable in-memory partition pruning
+    sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
+    // Enable in-memory table scan accumulators
+    sqlContext.setConf("spark.sql.inMemoryTableScanStatistics.enable", "true")
+    sqlContext.cacheTable("pruningData")
+  }
+
+  override protected def afterAll(): Unit = {
+    try {
+      sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
+      sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, 
originalInMemoryPartitionPruning)
+      sqlContext.uncacheTable("pruningData")
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  // Comparisons
+  checkBatchPruning("SELECT key FROM pruningData WHERE key = 1", 1, 1)(Seq(1))
+  checkBatchPruning("SELECT key FROM pruningData WHERE 1 = key", 1, 1)(Seq(1))
+  checkBatchPruning("SELECT key FROM pruningData WHERE key < 12", 1, 2)(1 to 
11)
+  checkBatchPruning("SELECT key FROM pruningData WHERE key <= 11", 1, 2)(1 to 
11)
+  checkBatchPruning("SELECT key FROM pruningData WHERE key > 88", 1, 2)(89 to 
100)
+  checkBatchPruning("SELECT key FROM pruningData WHERE key >= 89", 1, 2)(89 to 
100)
+  checkBatchPruning("SELECT key FROM pruningData WHERE 12 > key", 1, 2)(1 to 
11)
+  checkBatchPruning("SELECT key FROM pruningData WHERE 11 >= key", 1, 2)(1 to 
11)
+  checkBatchPruning("SELECT key FROM pruningData WHERE 88 < key", 1, 2)(89 to 
100)
+  checkBatchPruning("SELECT key FROM pruningData WHERE 89 <= key", 1, 2)(89 to 
100)
+
+  // IS NULL
+  checkBatchPruning("SELECT key FROM pruningData WHERE value IS NULL", 5, 5) {
+    (1 to 10) ++ (21 to 30) ++ (41 to 50) ++ (61 to 70) ++ (81 to 90)
+  }
+
+  // IS NOT NULL
+  checkBatchPruning("SELECT key FROM pruningData WHERE value IS NOT NULL", 5, 
5) {
+    (11 to 20) ++ (31 to 40) ++ (51 to 60) ++ (71 to 80) ++ (91 to 100)
+  }
+
+  // Conjunction and disjunction
+  checkBatchPruning("SELECT key FROM pruningData WHERE key > 8 AND key <= 21", 
2, 3)(9 to 21)
+  checkBatchPruning("SELECT key FROM pruningData WHERE key < 2 OR key > 99", 
2, 2)(Seq(1, 100))
+  checkBatchPruning("SELECT key FROM pruningData WHERE key < 12 AND key IS NOT 
NULL", 1, 2)(1 to 11)
+  checkBatchPruning("SELECT key FROM pruningData WHERE key < 2 OR (key > 78 
AND key < 92)", 3, 4) {
+    Seq(1) ++ (79 to 91)
+  }
+  checkBatchPruning("SELECT key FROM pruningData WHERE NOT (key < 88)", 1, 2) {
+    // Although the `NOT` operator isn't supported directly, the optimizer can 
transform
+    // `NOT (a < b)` to `b >= a`
+    88 to 100
+  }
+
+  // With unsupported predicate
+  {
+    val seq = (1 to 30).mkString(", ")
+    checkBatchPruning(s"SELECT key FROM pruningData WHERE NOT (key IN 
($seq))", 5, 10)(31 to 100)
+    checkBatchPruning(s"SELECT key FROM pruningData WHERE NOT (key IN ($seq)) 
AND key > 88", 1, 2) {
+      89 to 100
+    }
+  }
+
+  def checkBatchPruning(
+      query: String,
+      expectedReadPartitions: Int,
+      expectedReadBatches: Int)(
+      expectedQueryResult: => Seq[Int]): Unit = {
+
+    test(query) {
+      val df = sql(query)
+      val queryExecution = df.queryExecution
+
+      assertResult(expectedQueryResult.toArray, s"Wrong query result: 
$queryExecution") {
+        df.collect().map(_(0)).toArray
+      }
+
+      val (readPartitions, readBatches) = 
df.queryExecution.executedPlan.collect {
+        case in: InMemoryColumnarTableScan => (in.readPartitions.value, 
in.readBatches.value)
+      }.head
+
+      assert(readBatches === expectedReadBatches, s"Wrong number of read 
batches: $queryExecution")
+      assert(
+        readPartitions === expectedReadPartitions,
+        s"Wrong number of read partitions: $queryExecution")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
new file mode 100644
index 0000000..ccbddef
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar.compression
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
+import org.apache.spark.sql.execution.columnar.{BOOLEAN, NoopColumnStats}
+
+class BooleanBitSetSuite extends SparkFunSuite {
+  import BooleanBitSet._
+
+  def skeleton(count: Int) {
+    // -------------
+    // Tests encoder
+    // -------------
+
+    val builder = TestCompressibleColumnBuilder(new NoopColumnStats, BOOLEAN, 
BooleanBitSet)
+    val rows = Seq.fill[InternalRow](count)(makeRandomRow(BOOLEAN))
+    val values = rows.map(_.getBoolean(0))
+
+    rows.foreach(builder.appendFrom(_, 0))
+    val buffer = builder.build()
+
+    // Column type ID + null count + null positions
+    val headerSize = CompressionScheme.columnHeaderSize(buffer)
+
+    // Compression scheme ID + element count + bitset words
+    val compressedSize = 4 + 4 + {
+      val extra = if (count % BITS_PER_LONG == 0) 0 else 1
+      (count / BITS_PER_LONG + extra) * 8
+    }
+
+    // 4 extra bytes for compression scheme type ID
+    assertResult(headerSize + compressedSize, "Wrong buffer 
capacity")(buffer.capacity)
+
+    // Skips column header
+    buffer.position(headerSize)
+    assertResult(BooleanBitSet.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
+    assertResult(count, "Wrong element count")(buffer.getInt())
+
+    var word = 0: Long
+    for (i <- 0 until count) {
+      val bit = i % BITS_PER_LONG
+      word = if (bit == 0) buffer.getLong() else word
+      assertResult(values(i), s"Wrong value in compressed buffer, index=$i") {
+        (word & ((1: Long) << bit)) != 0
+      }
+    }
+
+    // -------------
+    // Tests decoder
+    // -------------
+
+    // Rewinds, skips column header and 4 more bytes for compression scheme ID
+    buffer.rewind().position(headerSize + 4)
+
+    val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
+    val mutableRow = new GenericMutableRow(1)
+    if (values.nonEmpty) {
+      values.foreach {
+        assert(decoder.hasNext)
+        assertResult(_, "Wrong decoded value") {
+          decoder.next(mutableRow, 0)
+          mutableRow.getBoolean(0)
+        }
+      }
+    }
+    assert(!decoder.hasNext)
+  }
+
+  test(s"$BooleanBitSet: empty") {
+    skeleton(0)
+  }
+
+  test(s"$BooleanBitSet: less than 1 word") {
+    skeleton(BITS_PER_LONG - 1)
+  }
+
+  test(s"$BooleanBitSet: exactly 1 word") {
+    skeleton(BITS_PER_LONG)
+  }
+
+  test(s"$BooleanBitSet: multiple whole words") {
+    skeleton(BITS_PER_LONG * 2)
+  }
+
+  test(s"$BooleanBitSet: multiple words and 1 more bit") {
+    skeleton(BITS_PER_LONG * 2 + 1)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
new file mode 100644
index 0000000..830ca02
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar.compression
+
+import java.nio.ByteBuffer
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.execution.columnar._
+import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
+import org.apache.spark.sql.types.AtomicType
+
+class DictionaryEncodingSuite extends SparkFunSuite {
+  testDictionaryEncoding(new IntColumnStats, INT)
+  testDictionaryEncoding(new LongColumnStats, LONG)
+  testDictionaryEncoding(new StringColumnStats, STRING)
+
+  def testDictionaryEncoding[T <: AtomicType](
+      columnStats: ColumnStats,
+      columnType: NativeColumnType[T]) {
+
+    val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
+
+    def buildDictionary(buffer: ByteBuffer) = {
+      (0 until buffer.getInt()).map(columnType.extract(buffer) -> 
_.toShort).toMap
+    }
+
+    def stableDistinct(seq: Seq[Int]): Seq[Int] = if (seq.isEmpty) {
+      Seq.empty
+    } else {
+      seq.head +: seq.tail.filterNot(_ == seq.head)
+    }
+
+    def skeleton(uniqueValueCount: Int, inputSeq: Seq[Int]) {
+      // -------------
+      // Tests encoder
+      // -------------
+
+      val builder = TestCompressibleColumnBuilder(columnStats, columnType, 
DictionaryEncoding)
+      val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 
uniqueValueCount)
+      val dictValues = stableDistinct(inputSeq)
+
+      inputSeq.foreach(i => builder.appendFrom(rows(i), 0))
+
+      if (dictValues.length > DictionaryEncoding.MAX_DICT_SIZE) {
+        withClue("Dictionary overflowed, compression should fail") {
+          intercept[Throwable] {
+            builder.build()
+          }
+        }
+      } else {
+        val buffer = builder.build()
+        val headerSize = CompressionScheme.columnHeaderSize(buffer)
+        // 4 extra bytes for dictionary size
+        val dictionarySize = 4 + rows.map(columnType.actualSize(_, 0)).sum
+        // 2 bytes for each `Short`
+        val compressedSize = 4 + dictionarySize + 2 * inputSeq.length
+        // 4 extra bytes for compression scheme type ID
+        assertResult(headerSize + compressedSize, "Wrong buffer 
capacity")(buffer.capacity)
+
+        // Skips column header
+        buffer.position(headerSize)
+        assertResult(DictionaryEncoding.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
+
+        val dictionary = buildDictionary(buffer).toMap
+
+        dictValues.foreach { i =>
+          assertResult(i, "Wrong dictionary entry") {
+            dictionary(values(i))
+          }
+        }
+
+        inputSeq.foreach { i =>
+          assertResult(i.toShort, "Wrong column element 
value")(buffer.getShort())
+        }
+
+        // -------------
+        // Tests decoder
+        // -------------
+
+        // Rewinds, skips column header and 4 more bytes for compression 
scheme ID
+        buffer.rewind().position(headerSize + 4)
+
+        val decoder = DictionaryEncoding.decoder(buffer, columnType)
+        val mutableRow = new GenericMutableRow(1)
+
+        if (inputSeq.nonEmpty) {
+          inputSeq.foreach { i =>
+            assert(decoder.hasNext)
+            assertResult(values(i), "Wrong decoded value") {
+              decoder.next(mutableRow, 0)
+              columnType.getField(mutableRow, 0)
+            }
+          }
+        }
+
+        assert(!decoder.hasNext)
+      }
+    }
+
+    test(s"$DictionaryEncoding with $typeName: empty") {
+      skeleton(0, Seq.empty)
+    }
+
+    test(s"$DictionaryEncoding with $typeName: simple case") {
+      skeleton(2, Seq(0, 1, 0, 1))
+    }
+
+    test(s"$DictionaryEncoding with $typeName: dictionary overflow") {
+      skeleton(DictionaryEncoding.MAX_DICT_SIZE + 1, 0 to 
DictionaryEncoding.MAX_DICT_SIZE)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
new file mode 100644
index 0000000..988a577
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar.compression
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.execution.columnar._
+import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
+import org.apache.spark.sql.types.IntegralType
+
+class IntegralDeltaSuite extends SparkFunSuite {
+  testIntegralDelta(new IntColumnStats, INT, IntDelta)
+  testIntegralDelta(new LongColumnStats, LONG, LongDelta)
+
+  def testIntegralDelta[I <: IntegralType](
+      columnStats: ColumnStats,
+      columnType: NativeColumnType[I],
+      scheme: CompressionScheme) {
+
+    def skeleton(input: Seq[I#InternalType]) {
+      // -------------
+      // Tests encoder
+      // -------------
+
+      val builder = TestCompressibleColumnBuilder(columnStats, columnType, 
scheme)
+      val deltas = if (input.isEmpty) {
+        Seq.empty[Long]
+      } else {
+        (input.tail, input.init).zipped.map {
+          case (x: Int, y: Int) => (x - y).toLong
+          case (x: Long, y: Long) => x - y
+        }
+      }
+
+      input.map { value =>
+        val row = new GenericMutableRow(1)
+        columnType.setField(row, 0, value)
+        builder.appendFrom(row, 0)
+      }
+
+      val buffer = builder.build()
+      // Column type ID + null count + null positions
+      val headerSize = CompressionScheme.columnHeaderSize(buffer)
+
+      // Compression scheme ID + compressed contents
+      val compressedSize = 4 + (if (deltas.isEmpty) {
+        0
+      } else {
+        val oneBoolean = columnType.defaultSize
+        1 + oneBoolean + deltas.map {
+          d => if (math.abs(d) <= Byte.MaxValue) 1 else 1 + oneBoolean
+        }.sum
+      })
+
+      // 4 extra bytes for compression scheme type ID
+      assertResult(headerSize + compressedSize, "Wrong buffer 
capacity")(buffer.capacity)
+
+      buffer.position(headerSize)
+      assertResult(scheme.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
+
+      if (input.nonEmpty) {
+        assertResult(Byte.MinValue, "The first byte should be an escaping 
mark")(buffer.get())
+        assertResult(input.head, "The first value is 
wrong")(columnType.extract(buffer))
+
+        (input.tail, deltas).zipped.foreach { (value, delta) =>
+          if (math.abs(delta) <= Byte.MaxValue) {
+            assertResult(delta, "Wrong delta")(buffer.get())
+          } else {
+            assertResult(Byte.MinValue, "Expecting escaping mark 
here")(buffer.get())
+            assertResult(value, "Wrong value")(columnType.extract(buffer))
+          }
+        }
+      }
+
+      // -------------
+      // Tests decoder
+      // -------------
+
+      // Rewinds, skips column header and 4 more bytes for compression scheme 
ID
+      buffer.rewind().position(headerSize + 4)
+
+      val decoder = scheme.decoder(buffer, columnType)
+      val mutableRow = new GenericMutableRow(1)
+
+      if (input.nonEmpty) {
+        input.foreach{
+          assert(decoder.hasNext)
+          assertResult(_, "Wrong decoded value") {
+            decoder.next(mutableRow, 0)
+            columnType.getField(mutableRow, 0)
+          }
+        }
+      }
+      assert(!decoder.hasNext)
+    }
+
+    test(s"$scheme: empty column") {
+      skeleton(Seq.empty)
+    }
+
+    test(s"$scheme: simple case") {
+      val input = columnType match {
+        case INT => Seq(2: Int, 1: Int, 2: Int, 130: Int)
+        case LONG => Seq(2: Long, 1: Long, 2: Long, 130: Long)
+      }
+
+      skeleton(input.map(_.asInstanceOf[I#InternalType]))
+    }
+
+    test(s"$scheme: long random series") {
+      // Have to workaround with `Any` since no `ClassTag[I#JvmType]` 
available here.
+      val input = Array.fill[Any](10000)(makeRandomValue(columnType))
+      skeleton(input.map(_.asInstanceOf[I#InternalType]))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/014c0f7a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
new file mode 100644
index 0000000..ce3affb
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar.compression
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.execution.columnar._
+import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
+import org.apache.spark.sql.types.AtomicType
+
+class RunLengthEncodingSuite extends SparkFunSuite {
+  testRunLengthEncoding(new NoopColumnStats, BOOLEAN)
+  testRunLengthEncoding(new ByteColumnStats, BYTE)
+  testRunLengthEncoding(new ShortColumnStats, SHORT)
+  testRunLengthEncoding(new IntColumnStats, INT)
+  testRunLengthEncoding(new LongColumnStats, LONG)
+  testRunLengthEncoding(new StringColumnStats, STRING)
+
+  def testRunLengthEncoding[T <: AtomicType](
+      columnStats: ColumnStats,
+      columnType: NativeColumnType[T]) {
+
+    val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
+
+    def skeleton(uniqueValueCount: Int, inputRuns: Seq[(Int, Int)]) {
+      // -------------
+      // Tests encoder
+      // -------------
+
+      val builder = TestCompressibleColumnBuilder(columnStats, columnType, 
RunLengthEncoding)
+      val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 
uniqueValueCount)
+      val inputSeq = inputRuns.flatMap { case (index, run) =>
+        Seq.fill(run)(index)
+      }
+
+      inputSeq.foreach(i => builder.appendFrom(rows(i), 0))
+      val buffer = builder.build()
+
+      // Column type ID + null count + null positions
+      val headerSize = CompressionScheme.columnHeaderSize(buffer)
+
+      // Compression scheme ID + compressed contents
+      val compressedSize = 4 + inputRuns.map { case (index, _) =>
+        // 4 extra bytes each run for run length
+        columnType.actualSize(rows(index), 0) + 4
+      }.sum
+
+      // 4 extra bytes for compression scheme type ID
+      assertResult(headerSize + compressedSize, "Wrong buffer 
capacity")(buffer.capacity)
+
+      // Skips column header
+      buffer.position(headerSize)
+      assertResult(RunLengthEncoding.typeId, "Wrong compression scheme 
ID")(buffer.getInt())
+
+      inputRuns.foreach { case (index, run) =>
+        assertResult(values(index), "Wrong column element 
value")(columnType.extract(buffer))
+        assertResult(run, "Wrong run length")(buffer.getInt())
+      }
+
+      // -------------
+      // Tests decoder
+      // -------------
+
+      // Rewinds, skips column header and 4 more bytes for compression scheme 
ID
+      buffer.rewind().position(headerSize + 4)
+
+      val decoder = RunLengthEncoding.decoder(buffer, columnType)
+      val mutableRow = new GenericMutableRow(1)
+
+      if (inputSeq.nonEmpty) {
+        inputSeq.foreach { i =>
+          assert(decoder.hasNext)
+          assertResult(values(i), "Wrong decoded value") {
+            decoder.next(mutableRow, 0)
+            columnType.getField(mutableRow, 0)
+          }
+        }
+      }
+
+      assert(!decoder.hasNext)
+    }
+
+    test(s"$RunLengthEncoding with $typeName: empty column") {
+      skeleton(0, Seq.empty)
+    }
+
+    test(s"$RunLengthEncoding with $typeName: simple case") {
+      skeleton(2, Seq(0 -> 2, 1 ->2))
+    }
+
+    test(s"$RunLengthEncoding with $typeName: run length == 1") {
+      skeleton(2, Seq(0 -> 1, 1 ->1))
+    }
+
+    test(s"$RunLengthEncoding with $typeName: single long run") {
+      skeleton(1, Seq(0 -> 1000))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to