Repository: spark
Updated Branches:
  refs/heads/master 037fe4d2b -> 7dbca68e9


[BUGFIX] In-memory columnar storage bug fixes

Fixed several bugs of in-memory columnar storage to make 
`HiveInMemoryCompatibilitySuite` pass.

@rxin @marmbrus It is reasonable to include `HiveInMemoryCompatibilitySuite` in 
this PR, but I didn't, since it significantly increases test execution time. 
What do you think?

**UPDATE** `HiveCompatibilitySuite` has been made to cache tables in memory. 
`HiveInMemoryCompatibilitySuite` was removed.

Author: Cheng Lian <lian.cs....@gmail.com>
Author: Michael Armbrust <mich...@databricks.com>

Closes #374 from liancheng/inMemBugFix and squashes the following commits:

6ad6d9b [Cheng Lian] Merged HiveCompatibilitySuite and 
HiveInMemoryCompatibilitySuite
5bdbfe7 [Cheng Lian] Revert 882c538 & 8426ddc, which introduced regression
882c538 [Cheng Lian] Remove attributes field from InMemoryColumnarTableScan
32cc9ce [Cheng Lian] Code style cleanup
99382bf [Cheng Lian] Enable compression by default
4390bcc [Cheng Lian] Report error for any Throwable in HiveComparisonTest
d1df4fd [Michael Armbrust] Remove test tables that might always get created 
anyway?
ab9e807 [Michael Armbrust] Fix the logged console version of failed test cases 
to use the new syntax.
1965123 [Michael Armbrust] Don't use coalesce for gathering all data to a 
single partition, as it does not work correctly with mutable rows.
e36cdd0 [Michael Armbrust] Spelling.
2d0e168 [Michael Armbrust] Run Hive tests in-memory too.
6360723 [Cheng Lian] Made PreInsertionCasts support SparkLogicalPlan and 
InMemoryColumnarTableScan
c9b0f6f [Cheng Lian] Let InsertIntoTable support InMemoryColumnarTableScan
9c8fc40 [Cheng Lian] Disable compression by default
e619995 [Cheng Lian] Bug fix: incorrect byte order in 
CompressionScheme.columnHeaderSize
8426ddc [Cheng Lian] Bug fix: InMemoryColumnarTableScan should cache columns 
specified by the attributes argument
036cd09 [Cheng Lian] Clean up unused imports
44591a5 [Cheng Lian] Bug fix: NullableColumnAccessor.hasNext must take nulls 
into account
052bf41 [Cheng Lian] Bug fix: should only gather compressibility info for 
non-null values
95b3301 [Cheng Lian] Fixed bugs in IntegralDelta


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7dbca68e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7dbca68e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7dbca68e

Branch: refs/heads/master
Commit: 7dbca68e92416ec5f023c8807bb06470c01a6d3a
Parents: 037fe4d
Author: Cheng Lian <lian.cs....@gmail.com>
Authored: Mon Apr 14 15:22:43 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Mon Apr 14 15:22:43 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SchemaRDD.scala  |  2 +-
 .../sql/columnar/NullableColumnAccessor.scala   |  2 +
 .../compression/CompressibleColumnBuilder.scala |  4 +-
 .../compression/CompressionScheme.scala         |  4 +-
 .../compression/compressionSchemes.scala        | 20 +++-----
 .../apache/spark/sql/execution/Exchange.scala   |  9 +++-
 .../apache/spark/sql/execution/SparkPlan.scala  |  4 +-
 .../org/apache/spark/sql/CachedTableSuite.scala |  5 +-
 .../spark/sql/columnar/ColumnarQuerySuite.scala | 42 ----------------
 .../columnar/InMemoryColumnarQuerySuite.scala   | 52 ++++++++++++++++++++
 .../columnar/NullableColumnAccessorSuite.scala  |  4 ++
 .../compression/IntegralDeltaSuite.scala        | 15 ++++--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 42 ++++++++++------
 .../apache/spark/sql/hive/HiveStrategies.scala  |  4 ++
 .../org/apache/spark/sql/hive/TestHive.scala    | 10 ++--
 .../org/apache/spark/sql/hive/hiveUdfs.scala    | 16 +++---
 .../sql/hive/execution/HiveComparisonTest.scala | 10 +---
 .../hive/execution/HiveCompatibilitySuite.scala | 12 ++++-
 18 files changed, 150 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 16da7fd..9150041 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -99,7 +99,7 @@ class SchemaRDD(
   def baseSchemaRDD = this
 
   // 
=========================================================================================
-  // RDD functions: Copy the interal row representation so we present 
immutable data to users.
+  // RDD functions: Copy the internal row representation so we present 
immutable data to users.
   // 
=========================================================================================
 
   override def compute(split: Partition, context: TaskContext): Iterator[Row] =

http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
index 7d49ab0..b7f8826 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
@@ -54,4 +54,6 @@ private[sql] trait NullableColumnAccessor extends 
ColumnAccessor {
 
     pos += 1
   }
+
+  abstract override def hasNext = seenNulls < nullCount || super.hasNext
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
index fd3b1ad..0f808f6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
@@ -65,7 +65,9 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
 
   abstract override def appendFrom(row: Row, ordinal: Int) {
     super.appendFrom(row, ordinal)
-    gatherCompressibilityStats(row, ordinal)
+    if (!row.isNullAt(ordinal)) {
+      gatherCompressibilityStats(row, ordinal)
+    }
   }
 
   abstract override def build() = {

http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
index c605a8e..ba1810d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.columnar.compression
 
-import java.nio.ByteBuffer
+import java.nio.{ByteOrder, ByteBuffer}
 
 import org.apache.spark.sql.catalyst.types.NativeType
 import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType}
@@ -84,7 +84,7 @@ private[sql] object CompressionScheme {
   }
 
   def columnHeaderSize(columnBuffer: ByteBuffer): Int = {
-    val header = columnBuffer.duplicate()
+    val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder)
     val nullCount = header.getInt(4)
     // Column type ID + null count + null positions
     4 + 4 + 4 * nullCount

http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
index e92cf5a..800009d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
@@ -397,26 +397,27 @@ private[sql] sealed abstract class IntegralDelta[I <: 
IntegralType] extends Comp
 
       if (initial) {
         initial = false
-        prev = value
         _compressedSize += 1 + columnType.defaultSize
       } else {
         val (smallEnough, _) = byteSizedDelta(value, prev)
         _compressedSize += (if (smallEnough) 1 else 1 + columnType.defaultSize)
       }
+
+      prev = value
     }
 
     override def compress(from: ByteBuffer, to: ByteBuffer, columnType: 
NativeColumnType[I]) = {
       to.putInt(typeId)
 
       if (from.hasRemaining) {
-        val prev = columnType.extract(from)
-
+        var prev = columnType.extract(from)
         to.put(Byte.MinValue)
         columnType.append(prev, to)
 
         while (from.hasRemaining) {
           val current = columnType.extract(from)
           val (smallEnough, delta) = byteSizedDelta(current, prev)
+          prev = current
 
           if (smallEnough) {
             to.put(delta)
@@ -443,13 +444,8 @@ private[sql] sealed abstract class IntegralDelta[I <: 
IntegralType] extends Comp
 
     override def next() = {
       val delta = buffer.get()
-
-      if (delta > Byte.MinValue) {
-        addDelta(prev, delta)
-      } else {
-        prev = columnType.extract(buffer)
-        prev
-      }
+      prev = if (delta > Byte.MinValue) addDelta(prev, delta) else 
columnType.extract(buffer)
+      prev
     }
 
     override def hasNext = buffer.hasRemaining
@@ -465,7 +461,7 @@ private[sql] case object IntDelta extends 
IntegralDelta[IntegerType.type] {
 
   override protected def byteSizedDelta(x: Int, y: Int): (Boolean, Byte) = {
     val delta = x - y
-    if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
+    if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: 
Byte)
   }
 }
 
@@ -478,6 +474,6 @@ private[sql] case object LongDelta extends 
IntegralDelta[LongType.type] {
 
   override protected def byteSizedDelta(x: Long, y: Long): (Boolean, Byte) = {
     val delta = x - y
-    if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
+    if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: 
Byte)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 450c142..070557e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -61,7 +61,14 @@ case class Exchange(newPartitioning: Partitioning, child: 
SparkPlan) extends Una
         shuffled.map(_._1)
 
       case SinglePartition =>
-        child.execute().coalesce(1, shuffle = true)
+        val rdd = child.execute().mapPartitions { iter =>
+          val mutablePair = new MutablePair[Null, Row]()
+          iter.map(r => mutablePair.update(null, r))
+        }
+        val partitioner = new HashPartitioner(1)
+        val shuffled = new ShuffledRDD[Null, Row, MutablePair[Null, Row]](rdd, 
partitioner)
+        shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
+        shuffled.map(_._2)
 
       case _ => sys.error(s"Exchange not implemented for $newPartitioning")
       // TODO: Handle BroadcastPartitioning.

http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index daa423c..5d89697 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -70,8 +70,8 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
     SparkLogicalPlan(
       alreadyPlanned match {
         case ExistingRdd(output, rdd) => 
ExistingRdd(output.map(_.newInstance), rdd)
-        case InMemoryColumnarTableScan(output, child) =>
-          InMemoryColumnarTableScan(output.map(_.newInstance), child)
+        case scan @ InMemoryColumnarTableScan(output, child) =>
+          scan.copy(attributes = output.map(_.newInstance))
         case _ => sys.error("Multiple instance of the same relation detected.")
       }).asInstanceOf[this.type]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 7c6a642..0331f90 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -17,11 +17,10 @@
 
 package org.apache.spark.sql
 
-import org.scalatest.FunSuite
 import org.apache.spark.sql.TestData._
-import org.apache.spark.sql.test.TestSQLContext
-import org.apache.spark.sql.execution.SparkLogicalPlan
 import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.test.TestSQLContext
 
 class CachedTableSuite extends QueryTest {
   TestData // Load test tables.

http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala
deleted file mode 100644
index 2ed4cf2..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.columnar
-
-import org.apache.spark.sql.{QueryTest, TestData}
-import org.apache.spark.sql.execution.SparkLogicalPlan
-import org.apache.spark.sql.test.TestSQLContext
-
-class ColumnarQuerySuite extends QueryTest {
-  import TestData._
-  import TestSQLContext._
-
-  test("simple columnar query") {
-    val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
-    val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
-
-    checkAnswer(scan, testData.collect().toSeq)
-  }
-
-  test("SPARK-1436 regression: in-memory columns must be able to be accessed 
multiple times") {
-    val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
-    val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
-
-    checkAnswer(scan, testData.collect().toSeq)
-    checkAnswer(scan, testData.collect().toSeq)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
new file mode 100644
index 0000000..16a13b8
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar
+
+import org.apache.spark.sql.{QueryTest, TestData}
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.test.TestSQLContext
+
+class InMemoryColumnarQuerySuite extends QueryTest {
+  import TestData._
+  import TestSQLContext._
+
+  test("simple columnar query") {
+    val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
+    val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
+
+    checkAnswer(scan, testData.collect().toSeq)
+  }
+
+  test("projection") {
+    val plan = TestSQLContext.executePlan(testData.select('value, 
'key).logicalPlan).executedPlan
+    val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
+
+    checkAnswer(scan, testData.collect().map {
+      case Row(key: Int, value: String) => value -> key
+    }.toSeq)
+  }
+
+  test("SPARK-1436 regression: in-memory columns must be able to be accessed 
multiple times") {
+    val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
+    val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
+
+    checkAnswer(scan, testData.collect().toSeq)
+    checkAnswer(scan, testData.collect().toSeq)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
index 4a21eb6..35ab14c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
@@ -68,12 +68,16 @@ class NullableColumnAccessorSuite extends FunSuite {
       val row = new GenericMutableRow(1)
 
       (0 until 4).foreach { _ =>
+        assert(accessor.hasNext)
         accessor.extractTo(row, 0)
         assert(row(0) === randomRow(0))
 
+        assert(accessor.hasNext)
         accessor.extractTo(row, 0)
         assert(row.isNullAt(0))
       }
+
+      assert(!accessor.hasNext)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
index 1390e5e..ce419ca 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.FunSuite
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
 import org.apache.spark.sql.catalyst.types.IntegralType
 import org.apache.spark.sql.columnar._
+import org.apache.spark.sql.columnar.ColumnarTestUtils._
 
 class IntegralDeltaSuite extends FunSuite {
   testIntegralDelta(new IntColumnStats,  INT,  IntDelta)
@@ -63,7 +64,7 @@ class IntegralDeltaSuite extends FunSuite {
       } else {
         val oneBoolean = columnType.defaultSize
         1 + oneBoolean + deltas.map {
-          d => if (math.abs(d) < Byte.MaxValue) 1 else 1 + oneBoolean
+          d => if (math.abs(d) <= Byte.MaxValue) 1 else 1 + oneBoolean
         }.sum
       })
 
@@ -78,7 +79,7 @@ class IntegralDeltaSuite extends FunSuite {
         expectResult(input.head, "The first value is 
wrong")(columnType.extract(buffer))
 
         (input.tail, deltas).zipped.foreach { (value, delta) =>
-          if (delta < Byte.MaxValue) {
+          if (math.abs(delta) <= Byte.MaxValue) {
             expectResult(delta, "Wrong delta")(buffer.get())
           } else {
             expectResult(Byte.MinValue, "Expecting escaping mark 
here")(buffer.get())
@@ -105,11 +106,17 @@ class IntegralDeltaSuite extends FunSuite {
 
     test(s"$scheme: simple case") {
       val input = columnType match {
-        case INT  => Seq(1: Int,  2: Int,  130: Int)
-        case LONG => Seq(1: Long, 2: Long, 130: Long)
+        case INT  => Seq(2: Int,  1: Int,  2: Int,  130: Int)
+        case LONG => Seq(2: Long, 1: Long, 2: Long, 130: Long)
       }
 
       skeleton(input.map(_.asInstanceOf[I#JvmType]))
     }
+
+    test(s"$scheme: long random series") {
+      // Have to workaround with `Any` since no `ClassTag[I#JvmType]` 
available here.
+      val input = Array.fill[Any](10000)(makeRandomValue(columnType))
+      skeleton(input.map(_.asInstanceOf[I#JvmType]))
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index fc053c5..c36b587 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
 
 /* Implicit conversions */
 import scala.collection.JavaConversions._
@@ -115,23 +117,31 @@ class HiveMetastoreCatalog(hive: HiveContext) extends 
Catalog with Logging {
       case p: LogicalPlan if !p.childrenResolved => p
 
       case p @ InsertIntoTable(table: MetastoreRelation, _, child, _) =>
-        val childOutputDataTypes = child.output.map(_.dataType)
-        // Only check attributes, not partitionKeys since they are always 
strings.
-        // TODO: Fully support inserting into partitioned tables.
-        val tableOutputDataTypes = table.attributes.map(_.dataType)
-
-        if (childOutputDataTypes == tableOutputDataTypes) {
-          p
-        } else {
-          // Only do the casting when child output data types differ from 
table output data types.
-          val castedChildOutput = child.output.zip(table.output).map {
-            case (input, output) if input.dataType != output.dataType =>
-              Alias(Cast(input, output.dataType), input.name)()
-            case (input, _) => input
-          }
-
-          p.copy(child = logical.Project(castedChildOutput, child))
+        castChildOutput(p, table, child)
+
+      case p @ 
logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
+        _, HiveTableScan(_, table, _))), _, child, _) =>
+        castChildOutput(p, table, child)
+    }
+
+    def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: 
LogicalPlan) = {
+      val childOutputDataTypes = child.output.map(_.dataType)
+      // Only check attributes, not partitionKeys since they are always 
strings.
+      // TODO: Fully support inserting into partitioned tables.
+      val tableOutputDataTypes = table.attributes.map(_.dataType)
+
+      if (childOutputDataTypes == tableOutputDataTypes) {
+        p
+      } else {
+        // Only do the casting when child output data types differ from table 
output data types.
+        val castedChildOutput = child.output.zip(table.output).map {
+          case (input, output) if input.dataType != output.dataType =>
+            Alias(Cast(input, output.dataType), input.name)()
+          case (input, _) => input
         }
+
+        p.copy(child = logical.Project(castedChildOutput, child))
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 3ca1d93..ac817b2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
 
 trait HiveStrategies {
   // Possibly being too clever with types here... or not clever enough.
@@ -42,6 +43,9 @@ trait HiveStrategies {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.InsertIntoTable(table: MetastoreRelation, partition, child, 
overwrite) =>
         InsertIntoHiveTable(table, partition, planLater(child), 
overwrite)(hiveContext) :: Nil
+      case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
+        _, HiveTableScan(_, table, _))), partition, child, overwrite) =>
+        InsertIntoHiveTable(table, partition, planLater(child), 
overwrite)(hiveContext) :: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 2fea970..465e5f1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -160,12 +160,6 @@ class TestHiveContext(sc: SparkContext) extends 
LocalHiveContext(sc) {
     TestTable("src1",
       "CREATE TABLE src1 (key INT, value STRING)".cmd,
       s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO 
TABLE src1".cmd),
-    TestTable("dest1",
-      "CREATE TABLE IF NOT EXISTS dest1 (key INT, value STRING)".cmd),
-    TestTable("dest2",
-      "CREATE TABLE IF NOT EXISTS dest2 (key INT, value STRING)".cmd),
-    TestTable("dest3",
-      "CREATE TABLE IF NOT EXISTS dest3 (key INT, value STRING)".cmd),
     TestTable("srcpart", () => {
       runSqlHive(
         "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds 
STRING, hr STRING)")
@@ -257,6 +251,7 @@ class TestHiveContext(sc: SparkContext) extends 
LocalHiveContext(sc) {
 
   private val loadedTables = new collection.mutable.HashSet[String]
 
+  var cacheTables: Boolean = false
   def loadTestTable(name: String) {
     if (!(loadedTables contains name)) {
       // Marks the table as loaded first to prevent infite mutually recursive 
table loading.
@@ -265,6 +260,9 @@ class TestHiveContext(sc: SparkContext) extends 
LocalHiveContext(sc) {
       val createCmds =
         testTables.get(name).map(_.commands).getOrElse(sys.error(s"Unknown 
test table $name"))
       createCmds.foreach(_())
+
+      if (cacheTables)
+        cacheTable(name)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index f9b437d..55a4363 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -130,8 +130,7 @@ trait HiveFunctionFactory {
   }
 }
 
-abstract class HiveUdf
-    extends Expression with Logging with HiveFunctionFactory {
+abstract class HiveUdf extends Expression with Logging with 
HiveFunctionFactory {
   self: Product =>
 
   type UDFType
@@ -146,7 +145,7 @@ abstract class HiveUdf
   lazy val functionInfo = getFunctionInfo(name)
   lazy val function = createFunction[UDFType](name)
 
-  override def toString = 
s"${nodeName}#${functionInfo.getDisplayName}(${children.mkString(",")})"
+  override def toString = 
s"$nodeName#${functionInfo.getDisplayName}(${children.mkString(",")})"
 }
 
 case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends 
HiveUdf {
@@ -202,10 +201,11 @@ case class HiveSimpleUdf(name: String, children: 
Seq[Expression]) extends HiveUd
   }
 }
 
-case class HiveGenericUdf(
-    name: String,
-    children: Seq[Expression]) extends HiveUdf with HiveInspectors {
+case class HiveGenericUdf(name: String, children: Seq[Expression])
+  extends HiveUdf with HiveInspectors {
+
   import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._
+
   type UDFType = GenericUDF
 
   @transient
@@ -357,7 +357,7 @@ case class HiveGenericUdaf(
 
   override def toString = s"$nodeName#$name(${children.mkString(",")})"
 
-  def newInstance = new HiveUdafFunction(name, children, this)
+  def newInstance() = new HiveUdafFunction(name, children, this)
 }
 
 /**
@@ -435,7 +435,7 @@ case class HiveGenericUdtf(
     }
   }
 
-  override def toString() = s"$nodeName#$name(${children.mkString(",")})"
+  override def toString = s"$nodeName#$name(${children.mkString(",")})"
 }
 
 case class HiveUdafFunction(

http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 3cc4562..6c91f40 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -218,10 +218,7 @@ abstract class HiveComparisonTest
         val quotes = "\"\"\""
         queryList.zipWithIndex.map {
           case (query, i) =>
-            s"""
-              |val q$i = $quotes$query$quotes.q
-              |q$i.stringResult()
-            """.stripMargin
+            s"""val q$i = hql($quotes$query$quotes); q$i.collect()"""
         }.mkString("\n== Console version of this test ==\n", "\n", "\n")
       }
 
@@ -287,7 +284,6 @@ abstract class HiveComparisonTest
                         |Error: ${e.getMessage}
                         |${stackTraceToString(e)}
                         |$queryString
-                        |$consoleTestCase
                       """.stripMargin
                     stringToFile(
                       new File(hiveFailedDirectory, testCaseName),
@@ -304,7 +300,7 @@ abstract class HiveComparisonTest
         val catalystResults = queryList.zip(hiveResults).map { case 
(queryString, hive) =>
           val query = new TestHive.HiveQLQueryExecution(queryString)
           try { (query, prepareAnswer(query, query.stringResult())) } catch {
-            case e: Exception =>
+            case e: Throwable =>
               val errorMessage =
                 s"""
                   |Failed to execute query using catalyst:
@@ -313,8 +309,6 @@ abstract class HiveComparisonTest
                   |$query
                   |== HIVE - ${hive.size} row(s) ==
                   |${hive.mkString("\n")}
-                  |
-                  |$consoleTestCase
                 """.stripMargin
               stringToFile(new File(failedDirectory, testCaseName), 
errorMessage + consoleTestCase)
               fail(errorMessage)

http://git-wip-us.apache.org/repos/asf/spark/blob/7dbca68e/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index f76e16b..c3cfa3d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -17,16 +17,26 @@
 
 package org.apache.spark.sql.hive.execution
 
+import org.scalatest.BeforeAndAfter
+
 import org.apache.spark.sql.hive.TestHive
 
 /**
  * Runs the test cases that are included in the hive distribution.
  */
-class HiveCompatibilitySuite extends HiveQueryFileTest {
+class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
   // TODO: bundle in jar files... get from classpath
   lazy val hiveQueryDir = 
TestHive.getHiveFile("ql/src/test/queries/clientpositive")
   def testCases = hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") 
-> f)
 
+  override def beforeAll() {
+    TestHive.cacheTables = true
+  }
+
+  override def afterAll() {
+    TestHive.cacheTables = false
+  }
+
   /** A list of tests deemed out of scope currently and thus completely 
disregarded. */
   override def blackList = Seq(
     // These tests use hooks that are not on the classpath and thus break all 
subsequent execution.

Reply via email to