Repository: spark
Updated Branches:
  refs/heads/branch-1.0 32c960a01 -> 2853e56f6


[SPARK-1678][SPARK-1679] In-memory compression bug fix and made compression 
configurable, disabled by default

In-memory compression is now configurable in `SparkConf` by the 
`spark.sql.inMemoryCompression.enabled` property, and is disabled by default.

To help code review, the bug fix is in [the first 
commit](https://github.com/liancheng/spark/commit/d537a367edf0bf24d0b925cc58b21d805ccbc11f),
 compression configuration is in [the second 
one](https://github.com/liancheng/spark/commit/4ce09aa8aa820bbbbbaa0f3f084a6cff1d4e6195).

Author: Cheng Lian <lian.cs....@gmail.com>

Closes #608 from liancheng/spark-1678 and squashes the following commits:

66c3a8d [Cheng Lian] Renamed in-memory compression configuration key
f8fb3a0 [Cheng Lian] Added assertion for testing .hasNext of various decoder
4ce09aa [Cheng Lian] Made in-memory compression configurable via SparkConf
d537a36 [Cheng Lian] Fixed SPARK-1678
(cherry picked from commit 6d721c5f7131f7c9fe56c524133d70cb37f1222d)

Signed-off-by: Patrick Wendell <pwend...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2853e56f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2853e56f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2853e56f

Branch: refs/heads/branch-1.0
Commit: 2853e56f61eb8282596a177412000ddfc1a94af3
Parents: 32c960a
Author: Cheng Lian <lian.cs....@gmail.com>
Authored: Mon May 5 19:38:59 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Mon May 5 19:39:11 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLContext.scala |  7 +++--
 .../spark/sql/columnar/ColumnBuilder.scala      | 17 ++++++++---
 .../columnar/InMemoryColumnarTableScan.scala    |  8 ++++--
 .../sql/columnar/NullableColumnBuilder.scala    |  4 +--
 .../CompressibleColumnAccessor.scala            |  4 ++-
 .../compression/CompressibleColumnBuilder.scala | 12 +++++++-
 .../compression/compressionSchemes.scala        |  2 +-
 .../apache/spark/sql/execution/SparkPlan.scala  |  2 +-
 .../scala/org/apache/spark/sql/TestData.scala   | 11 +++++++
 .../columnar/InMemoryColumnarQuerySuite.scala   | 30 ++++++++++++++++++--
 .../compression/BooleanBitSetSuite.scala        |  7 ++++-
 .../compression/DictionaryEncodingSuite.scala   |  7 +++--
 .../compression/IntegralDeltaSuite.scala        |  7 ++++-
 .../compression/RunLengthEncodingSuite.scala    |  7 +++--
 .../TestCompressibleColumnBuilder.scala         |  2 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  2 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |  2 +-
 17 files changed, 105 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2853e56f/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index e25201a..bfebfa0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -162,8 +162,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
   /** Caches the specified table in-memory. */
   def cacheTable(tableName: String): Unit = {
     val currentTable = catalog.lookupRelation(None, tableName)
+    val useCompression =
+      
sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", 
false)
     val asInMemoryRelation =
-      InMemoryColumnarTableScan(currentTable.output, 
executePlan(currentTable).executedPlan)
+      InMemoryColumnarTableScan(
+        currentTable.output, executePlan(currentTable).executedPlan, 
useCompression)
 
     catalog.registerTable(None, tableName, 
SparkLogicalPlan(asInMemoryRelation))
   }
@@ -173,7 +176,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
     EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match {
       // This is kind of a hack to make sure that if this was just an RDD 
registered as a table,
       // we reregister the RDD as a table.
-      case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: 
ExistingRdd)) =>
+      case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: 
ExistingRdd, _)) =>
         inMem.cachedColumnBuffers.unpersist()
         catalog.unregisterTable(None, tableName)
         catalog.registerTable(None, tableName, SparkLogicalPlan(e))

http://git-wip-us.apache.org/repos/asf/spark/blob/2853e56f/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
index 048ee66..4be048c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -28,7 +28,7 @@ private[sql] trait ColumnBuilder {
   /**
    * Initializes with an approximate lower bound on the expected number of 
elements in this column.
    */
-  def initialize(initialSize: Int, columnName: String = "")
+  def initialize(initialSize: Int, columnName: String = "", useCompression: 
Boolean = false)
 
   /**
    * Appends `row(ordinal)` to the column builder.
@@ -55,7 +55,11 @@ private[sql] class BasicColumnBuilder[T <: DataType, 
JvmType](
 
   protected var buffer: ByteBuffer = _
 
-  override def initialize(initialSize: Int, columnName: String = "") = {
+  override def initialize(
+      initialSize: Int,
+      columnName: String = "",
+      useCompression: Boolean = false) = {
+
     val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else 
initialSize
     this.columnName = columnName
 
@@ -130,7 +134,12 @@ private[sql] object ColumnBuilder {
     }
   }
 
-  def apply(typeId: Int, initialSize: Int = 0, columnName: String = ""): 
ColumnBuilder = {
+  def apply(
+      typeId: Int,
+      initialSize: Int = 0,
+      columnName: String = "",
+      useCompression: Boolean = false): ColumnBuilder = {
+
     val builder = (typeId match {
       case INT.typeId     => new IntColumnBuilder
       case LONG.typeId    => new LongColumnBuilder
@@ -144,7 +153,7 @@ private[sql] object ColumnBuilder {
       case GENERIC.typeId => new GenericColumnBuilder
     }).asInstanceOf[ColumnBuilder]
 
-    builder.initialize(initialSize, columnName)
+    builder.initialize(initialSize, columnName, useCompression)
     builder
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2853e56f/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 8a24733..fdf28e1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -20,8 +20,12 @@ package org.apache.spark.sql.columnar
 import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
 import org.apache.spark.sql.execution.{SparkPlan, LeafNode}
 import org.apache.spark.sql.Row
+import org.apache.spark.SparkConf
 
-private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], 
child: SparkPlan)
+private[sql] case class InMemoryColumnarTableScan(
+    attributes: Seq[Attribute],
+    child: SparkPlan,
+    useCompression: Boolean)
   extends LeafNode {
 
   override def output: Seq[Attribute] = attributes
@@ -30,7 +34,7 @@ private[sql] case class InMemoryColumnarTableScan(attributes: 
Seq[Attribute], ch
     val output = child.output
     val cached = child.execute().mapPartitions { iterator =>
       val columnBuilders = output.map { attribute =>
-        ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name)
+        ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, 
attribute.name, useCompression)
       }.toArray
 
       var row: Row = null

http://git-wip-us.apache.org/repos/asf/spark/blob/2853e56f/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
index 2a3b6fc..d008806 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
@@ -40,12 +40,12 @@ private[sql] trait NullableColumnBuilder extends 
ColumnBuilder {
   private var pos: Int = _
   private var nullCount: Int = _
 
-  abstract override def initialize(initialSize: Int, columnName: String) {
+  abstract override def initialize(initialSize: Int, columnName: String, 
useCompression: Boolean) {
     nulls = ByteBuffer.allocate(1024)
     nulls.order(ByteOrder.nativeOrder())
     pos = 0
     nullCount = 0
-    super.initialize(initialSize, columnName)
+    super.initialize(initialSize, columnName, useCompression)
   }
 
   abstract override def appendFrom(row: Row, ordinal: Int) {

http://git-wip-us.apache.org/repos/asf/spark/blob/2853e56f/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala
index 878cb84..b4120a3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala
@@ -32,5 +32,7 @@ private[sql] trait CompressibleColumnAccessor[T <: 
NativeType] extends ColumnAcc
     decoder = CompressionScheme(underlyingBuffer.getInt()).decoder(buffer, 
columnType)
   }
 
-  abstract override def extractSingle(buffer: ByteBuffer): T#JvmType = 
decoder.next()
+  abstract override def hasNext = super.hasNext || decoder.hasNext
+
+  override def extractSingle(buffer: ByteBuffer): T#JvmType = decoder.next()
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2853e56f/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
index 0f808f6..4c6675c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
@@ -47,7 +47,17 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
 
   import CompressionScheme._
 
-  val compressionEncoders = 
schemes.filter(_.supports(columnType)).map(_.encoder[T])
+  var compressionEncoders: Seq[Encoder[T]] = _
+
+  abstract override def initialize(initialSize: Int, columnName: String, 
useCompression: Boolean) {
+    compressionEncoders =
+      if (useCompression) {
+        schemes.filter(_.supports(columnType)).map(_.encoder[T])
+      } else {
+        Seq(PassThrough.encoder)
+      }
+    super.initialize(initialSize, columnName, useCompression)
+  }
 
   protected def isWorthCompressing(encoder: Encoder[T]) = {
     encoder.compressionRatio < 0.8

http://git-wip-us.apache.org/repos/asf/spark/blob/2853e56f/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
index 800009d..8cf9ec7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
@@ -157,7 +157,7 @@ private[sql] case object RunLengthEncoding extends 
CompressionScheme {
       currentValue
     }
 
-    override def hasNext = buffer.hasRemaining
+    override def hasNext = valueCount < run || buffer.hasRemaining
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2853e56f/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 50124dd..235a9b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -77,7 +77,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
     SparkLogicalPlan(
       alreadyPlanned match {
         case ExistingRdd(output, rdd) => 
ExistingRdd(output.map(_.newInstance), rdd)
-        case scan @ InMemoryColumnarTableScan(output, child) =>
+        case scan @ InMemoryColumnarTableScan(output, _, _) =>
           scan.copy(attributes = output.map(_.newInstance))
         case _ => sys.error("Multiple instance of the same relation detected.")
       }).asInstanceOf[this.type]

http://git-wip-us.apache.org/repos/asf/spark/blob/2853e56f/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
index 002b7f0..b5973c0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -73,4 +73,15 @@ object TestData {
       ArrayData(Seq(1,2,3), Seq(Seq(1,2,3))) ::
       ArrayData(Seq(2,3,4), Seq(Seq(2,3,4))) :: Nil)
   arrayData.registerAsTable("arrayData")
+
+  case class StringData(s: String)
+  val repeatedData =
+    TestSQLContext.sparkContext.parallelize(List.fill(2)(StringData("test")))
+  repeatedData.registerAsTable("repeatedData")
+
+  val nullableRepeatedData =
+    TestSQLContext.sparkContext.parallelize(
+      List.fill(2)(StringData(null)) ++
+      List.fill(2)(StringData("test")))
+  nullableRepeatedData.registerAsTable("nullableRepeatedData")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2853e56f/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index 16a13b8..31c5dfb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest {
 
   test("simple columnar query") {
     val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
-    val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
+    val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, 
true))
 
     checkAnswer(scan, testData.collect().toSeq)
   }
 
   test("projection") {
     val plan = TestSQLContext.executePlan(testData.select('value, 
'key).logicalPlan).executedPlan
-    val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
+    val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, 
true))
 
     checkAnswer(scan, testData.collect().map {
       case Row(key: Int, value: String) => value -> key
@@ -44,9 +44,33 @@ class InMemoryColumnarQuerySuite extends QueryTest {
 
   test("SPARK-1436 regression: in-memory columns must be able to be accessed 
multiple times") {
     val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
-    val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
+    val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, 
true))
 
     checkAnswer(scan, testData.collect().toSeq)
     checkAnswer(scan, testData.collect().toSeq)
   }
+
+  test("SPARK-1678 regression: compression must not lose repeated values") {
+    checkAnswer(
+      sql("SELECT * FROM repeatedData"),
+      repeatedData.collect().toSeq)
+
+    TestSQLContext.cacheTable("repeatedData")
+
+    checkAnswer(
+      sql("SELECT * FROM repeatedData"),
+      repeatedData.collect().toSeq)
+  }
+
+  test("with null values") {
+    checkAnswer(
+      sql("SELECT * FROM nullableRepeatedData"),
+      nullableRepeatedData.collect().toSeq)
+
+    TestSQLContext.cacheTable("nullableRepeatedData")
+
+    checkAnswer(
+      sql("SELECT * FROM nullableRepeatedData"),
+      nullableRepeatedData.collect().toSeq)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2853e56f/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
index a754f98..93259a1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
@@ -72,7 +72,12 @@ class BooleanBitSetSuite extends FunSuite {
     buffer.rewind().position(headerSize + 4)
 
     val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
-    values.foreach(expectResult(_, "Wrong decoded value")(decoder.next()))
+    if (values.nonEmpty) {
+      values.foreach {
+        assert(decoder.hasNext)
+        expectResult(_, "Wrong decoded value")(decoder.next())
+      }
+    }
     assert(!decoder.hasNext)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2853e56f/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
index eab2798..198dcd8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
@@ -98,8 +98,11 @@ class DictionaryEncodingSuite extends FunSuite {
 
         val decoder = DictionaryEncoding.decoder(buffer, columnType)
 
-        inputSeq.foreach { i =>
-          expectResult(values(i), "Wrong decoded value")(decoder.next())
+        if (inputSeq.nonEmpty) {
+          inputSeq.foreach { i =>
+            assert(decoder.hasNext)
+            expectResult(values(i), "Wrong decoded value")(decoder.next())
+          }
         }
 
         assert(!decoder.hasNext)

http://git-wip-us.apache.org/repos/asf/spark/blob/2853e56f/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
index ce419ca..46af6e0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
@@ -96,7 +96,12 @@ class IntegralDeltaSuite extends FunSuite {
       buffer.rewind().position(headerSize + 4)
 
       val decoder = scheme.decoder(buffer, columnType)
-      input.foreach(expectResult(_, "Wrong decoded value")(decoder.next()))
+      if (input.nonEmpty) {
+        input.foreach{
+          assert(decoder.hasNext)
+          expectResult(_, "Wrong decoded value")(decoder.next())
+        }
+      }
       assert(!decoder.hasNext)
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2853e56f/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
index 89f9b60..d3b73ba 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
@@ -81,8 +81,11 @@ class RunLengthEncodingSuite extends FunSuite {
 
       val decoder = RunLengthEncoding.decoder(buffer, columnType)
 
-      inputSeq.foreach { i =>
-        expectResult(values(i), "Wrong decoded value")(decoder.next())
+      if (inputSeq.nonEmpty) {
+        inputSeq.foreach { i =>
+          assert(decoder.hasNext)
+          expectResult(values(i), "Wrong decoded value")(decoder.next())
+        }
       }
 
       assert(!decoder.hasNext)

http://git-wip-us.apache.org/repos/asf/spark/blob/2853e56f/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
index 81bf5e9..6d688ea 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
@@ -38,7 +38,7 @@ object TestCompressibleColumnBuilder {
       scheme: CompressionScheme) = {
 
     val builder = new TestCompressibleColumnBuilder(columnStats, columnType, 
Seq(scheme))
-    builder.initialize(0)
+    builder.initialize(0, "", useCompression = true)
     builder
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2853e56f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6c90788..ba837a2 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -130,7 +130,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
         castChildOutput(p, table, child)
 
       case p @ 
logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
-        _, HiveTableScan(_, table, _))), _, child, _) =>
+        _, HiveTableScan(_, table, _), _)), _, child, _) =>
         castChildOutput(p, table, child)
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2853e56f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index d9a6e0e..b215707 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -45,7 +45,7 @@ private[hive] trait HiveStrategies {
       case logical.InsertIntoTable(table: MetastoreRelation, partition, child, 
overwrite) =>
         InsertIntoHiveTable(table, partition, planLater(child), 
overwrite)(hiveContext) :: Nil
       case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
-        _, HiveTableScan(_, table, _))), partition, child, overwrite) =>
+        _, HiveTableScan(_, table, _), _)), partition, child, overwrite) =>
         InsertIntoHiveTable(table, partition, planLater(child), 
overwrite)(hiveContext) :: Nil
       case _ => Nil
     }

Reply via email to