Repository: spark
Updated Branches:
  refs/heads/master c9f840046 -> e749f5ded


[SPARK-4191][SQL]move wrapperFor to HiveInspectors to reuse it

Move wrapperFor in InsertIntoHiveTable to HiveInspectors to reuse them, this 
method can be reused when writing date with ObjectInspector(such as orc support)

Author: wangfei <[email protected]>
Author: scwf <[email protected]>

Closes #3057 from scwf/reuse-wraperfor and squashes the following commits:

7ccf932 [scwf] fix conflicts
d44f4da [wangfei] fix imports
9bf1b50 [wangfei] revert no related change
9a5276a [wangfei] move wrapfor to hiveinspector to reuse them


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e749f5de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e749f5de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e749f5de

Branch: refs/heads/master
Commit: e749f5dedbad412430b86e7290085095f8dec0d1
Parents: c9f8400
Author: wangfei <[email protected]>
Authored: Sun Nov 2 15:45:55 2014 -0800
Committer: Michael Armbrust <[email protected]>
Committed: Sun Nov 2 15:45:55 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/hive/HiveInspectors.scala  | 47 +++++++++++++++++++-
 .../hive/execution/InsertIntoHiveTable.scala    | 44 +-----------------
 2 files changed, 48 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e749f5de/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 1e2bf5c..58815da 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.hadoop.hive.common.`type`.HiveDecimal
+import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory
 import org.apache.hadoop.hive.serde2.objectinspector._
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector._
@@ -114,6 +114,51 @@ private[hive] trait HiveInspectors {
           unwrap(si.getStructFieldData(data,r), 
r.getFieldObjectInspector)).toArray)
   }
 
+
+  /**
+   * Wraps with Hive types based on object inspector.
+   * TODO: Consolidate all hive OI/data interface code.
+   */
+  /**
+   * Wraps with Hive types based on object inspector.
+   * TODO: Consolidate all hive OI/data interface code.
+   */
+  protected def wrapperFor(oi: ObjectInspector): Any => Any = oi match {
+    case _: JavaHiveVarcharObjectInspector =>
+      (o: Any) => new HiveVarchar(o.asInstanceOf[String], 
o.asInstanceOf[String].size)
+
+    case _: JavaHiveDecimalObjectInspector =>
+      (o: Any) => 
HiveShim.createDecimal(o.asInstanceOf[Decimal].toBigDecimal.underlying())
+
+    case soi: StandardStructObjectInspector =>
+      val wrappers = soi.getAllStructFieldRefs.map(ref => 
wrapperFor(ref.getFieldObjectInspector))
+      (o: Any) => {
+        val struct = soi.create()
+        (soi.getAllStructFieldRefs, wrappers, 
o.asInstanceOf[Row]).zipped.foreach {
+          (field, wrapper, data) => soi.setStructFieldData(struct, field, 
wrapper(data))
+        }
+        struct
+      }
+
+    case loi: ListObjectInspector =>
+      val wrapper = wrapperFor(loi.getListElementObjectInspector)
+      (o: Any) => seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper))
+
+    case moi: MapObjectInspector =>
+      // The Predef.Map is scala.collection.immutable.Map.
+      // Since the map values can be mutable, we explicitly import 
scala.collection.Map at here.
+      import scala.collection.Map
+
+      val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector)
+      val valueWrapper = wrapperFor(moi.getMapValueObjectInspector)
+      (o: Any) => mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, 
value) =>
+        keyWrapper(key) -> valueWrapper(value)
+      })
+
+    case _ =>
+      identity[Any]
+  }
+
   /**
    * Converts native catalyst types to the types expected by Hive
    * @param a the value to be wrapped

http://git-wip-us.apache.org/repos/asf/spark/blob/e749f5de/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 92bc1c6..74b4e7a 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution
 
 import scala.collection.JavaConversions._
 
-import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
+import org.apache.hadoop.hive.common.`type`.HiveVarchar
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.metastore.MetaStoreUtils
@@ -52,7 +52,7 @@ case class InsertIntoHiveTable(
     child: SparkPlan,
     overwrite: Boolean)
     (@transient sc: HiveContext)
-  extends UnaryNode with Command {
+  extends UnaryNode with Command with HiveInspectors {
 
   @transient lazy val outputClass = 
newSerializer(table.tableDesc).getSerializedClass
   @transient private lazy val hiveContext = new Context(sc.hiveconf)
@@ -68,46 +68,6 @@ case class InsertIntoHiveTable(
 
   def output = child.output
 
-  /**
-   * Wraps with Hive types based on object inspector.
-   * TODO: Consolidate all hive OI/data interface code.
-   */
-  protected def wrapperFor(oi: ObjectInspector): Any => Any = oi match {
-    case _: JavaHiveVarcharObjectInspector =>
-      (o: Any) => new HiveVarchar(o.asInstanceOf[String], 
o.asInstanceOf[String].size)
-
-    case _: JavaHiveDecimalObjectInspector =>
-      (o: Any) => 
HiveShim.createDecimal(o.asInstanceOf[Decimal].toBigDecimal.underlying())
-
-    case soi: StandardStructObjectInspector =>
-      val wrappers = soi.getAllStructFieldRefs.map(ref => 
wrapperFor(ref.getFieldObjectInspector))
-      (o: Any) => {
-        val struct = soi.create()
-        (soi.getAllStructFieldRefs, wrappers, 
o.asInstanceOf[Row]).zipped.foreach {
-          (field, wrapper, data) => soi.setStructFieldData(struct, field, 
wrapper(data))
-        }
-        struct
-      }
-
-    case loi: ListObjectInspector =>
-      val wrapper = wrapperFor(loi.getListElementObjectInspector)
-      (o: Any) => seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper))
-
-    case moi: MapObjectInspector =>
-      // The Predef.Map is scala.collection.immutable.Map.
-      // Since the map values can be mutable, we explicitly import 
scala.collection.Map at here.
-      import scala.collection.Map
-
-      val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector)
-      val valueWrapper = wrapperFor(moi.getMapValueObjectInspector)
-      (o: Any) => mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, 
value) =>
-        keyWrapper(key) -> valueWrapper(value)
-      })
-
-    case _ =>
-      identity[Any]
-  }
-
   def saveAsHiveFile(
       rdd: RDD[Row],
       valueClass: Class[_],


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to