Repository: incubator-livy
Updated Branches:
  refs/heads/master 39fa887cf -> 47d3ee6b6


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/47d3ee6b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ThriftResultSet.scala
----------------------------------------------------------------------
diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ThriftResultSet.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ThriftResultSet.scala
new file mode 100644
index 0000000..e555b34
--- /dev/null
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/serde/ThriftResultSet.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.serde
+
+import java.nio.ByteBuffer
+import java.util
+
+import com.google.common.primitives.{Booleans, Bytes, Doubles, Ints, Longs, 
Shorts}
+import org.apache.hive.service.rpc.thrift.{TBinaryColumn, TBoolColumn, 
TByteColumn, TColumn, TDoubleColumn, TI16Column, TI32Column, TI64Column, 
TProtocolVersion, TRow, TRowSet, TStringColumn}
+import org.apache.hive.service.rpc.thrift.TProtocolVersion._
+
+import org.apache.livy.thriftserver.session.{ColumnBuffer, DataType, ResultSet}
+import org.apache.livy.thriftserver.types.Schema
+
+abstract class ThriftResultSet {
+  def toTRowSet: TRowSet
+  def addRow(row: Array[Any]): Unit
+  def setRowOffset(rowOffset: Long): Unit
+  def numRows: Int
+}
+
+object ThriftResultSet {
+  def apply(types: Array[DataType], protocolVersion: TProtocolVersion): 
ThriftResultSet = {
+    // Older version of Hive protocol require the result set to be returned in 
a row
+    // oriented way. We do not support those versions, so it is useless to 
implement it.
+    assert(protocolVersion.getValue >= HIVE_CLI_SERVICE_PROTOCOL_V6.getValue)
+    new ColumnOrientedResultSet(types.map(new ColumnBuffer(_)))
+  }
+
+  def apply(schema: Schema, protocolVersion: TProtocolVersion): 
ThriftResultSet = {
+    apply(schema.fields.map(_.fieldType.dataType), protocolVersion)
+  }
+
+  def apply(resultSet: ResultSet): ThriftResultSet = {
+    new ColumnOrientedResultSet(resultSet.getColumns)
+  }
+
+  def toTColumn(columnBuffer: ColumnBuffer): TColumn = {
+    val value = new TColumn
+    val nullsArray = columnBuffer.getNulls.toByteArray
+    val nullMasks = ByteBuffer.wrap(nullsArray)
+    columnBuffer.getType match {
+      case DataType.BOOLEAN =>
+        val bools = columnBuffer.getValues.asInstanceOf[Array[Boolean]]
+        value.setBoolVal(new TBoolColumn(Booleans.asList(bools: _*), 
nullMasks))
+      case DataType.BYTE =>
+        val bytes = columnBuffer.getValues.asInstanceOf[Array[Byte]]
+        value.setByteVal(new TByteColumn(Bytes.asList(bytes: _*), nullMasks))
+      case DataType.SHORT =>
+        val shorts = columnBuffer.getValues.asInstanceOf[Array[Short]]
+        value.setI16Val(new TI16Column(Shorts.asList(shorts: _*), nullMasks))
+      case DataType.INTEGER =>
+        val integers = columnBuffer.getValues.asInstanceOf[Array[Int]]
+        value.setI32Val(new TI32Column(Ints.asList(integers: _*), nullMasks))
+      case DataType.LONG =>
+        val longs = columnBuffer.getValues.asInstanceOf[Array[Long]]
+        value.setI64Val(new TI64Column(Longs.asList(longs: _*), nullMasks))
+      case DataType.FLOAT =>
+        val floats = columnBuffer.getValues.asInstanceOf[Array[Float]]
+        val doubles = new util.ArrayList[java.lang.Double](floats.length)
+        var i = 0
+        while (i < floats.length) {
+          doubles.add(floats(i).toString.toDouble)
+          i += 1
+        }
+        value.setDoubleVal(new TDoubleColumn(doubles, nullMasks))
+      case DataType.DOUBLE =>
+        val doubles = columnBuffer.getValues.asInstanceOf[Array[Double]]
+        value.setDoubleVal(new TDoubleColumn(Doubles.asList(doubles: _*), 
nullMasks))
+      case DataType.BINARY =>
+        val binaries = 
columnBuffer.getValues.asInstanceOf[util.List[ByteBuffer]]
+        value.setBinaryVal(new TBinaryColumn(binaries, nullMasks))
+      case _ =>
+        val strings = columnBuffer.getValues.asInstanceOf[util.List[String]]
+        value.setStringVal(new TStringColumn(strings, nullMasks))
+    }
+    value
+  }
+}
+
+/**
+ * [[ThriftResultSet]] implementation which uses columns to store data.
+ */
+class ColumnOrientedResultSet(
+    private val columns: Array[ColumnBuffer]) extends ThriftResultSet {
+
+  private var rowOffset: Long = _
+
+  def addRow(fields: Array[Any]): Unit = {
+    var i = 0
+    while (i < fields.length) {
+      val field = fields(i)
+      columns(i).add(field)
+      i += 1
+    }
+  }
+
+  override def toTRowSet: TRowSet = {
+    val tRowSet = new TRowSet(rowOffset, new util.ArrayList[TRow])
+    columns.foreach { c =>
+      tRowSet.addToColumns(ThriftResultSet.toTColumn(c))
+    }
+    tRowSet
+  }
+
+  override def setRowOffset(rowOffset: Long): Unit = this.rowOffset = rowOffset
+
+  override def numRows: Int = columns.headOption.map(_.size).getOrElse(0)
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/47d3ee6b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataTypeUtils.scala
----------------------------------------------------------------------
diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataTypeUtils.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataTypeUtils.scala
index f61dd30..8220c27 100644
--- 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataTypeUtils.scala
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/DataTypeUtils.scala
@@ -17,17 +17,10 @@
 
 package org.apache.livy.thriftserver.types
 
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.serde2.thrift.Type
-import org.apache.hive.service.cli.TableSchema
 import org.json4s.{DefaultFormats, JValue}
 import org.json4s.JsonAST.{JObject, JString}
 import org.json4s.jackson.JsonMethods.parse
 
-import org.apache.livy.thriftserver.session.DataType
-
 /**
  * Utility class for converting and representing Spark and Hive data types.
  */
@@ -35,71 +28,45 @@ object DataTypeUtils {
   // Used for JSON conversion
   private implicit val formats = DefaultFormats
 
-  /**
-   * Returns the Hive [[Type]] used in the thrift communications for the given 
Livy type.
-   */
-  def toHiveThriftType(ltype: DataType): Type = {
-    ltype match {
-      case DataType.BOOLEAN => Type.BOOLEAN_TYPE
-      case DataType.BYTE => Type.TINYINT_TYPE
-      case DataType.SHORT => Type.SMALLINT_TYPE
-      case DataType.INTEGER => Type.INT_TYPE
-      case DataType.LONG => Type.BIGINT_TYPE
-      case DataType.FLOAT => Type.FLOAT_TYPE
-      case DataType.DOUBLE => Type.DOUBLE_TYPE
-      case DataType.BINARY => Type.BINARY_TYPE
-      case _ => Type.STRING_TYPE
+  private def toFieldType(jValue: JValue): FieldType = {
+    jValue match {
+      case JString(t) => BasicDataType(t)
+      case o: JObject => complexToDataType(o)
+      case _ => throw new IllegalArgumentException(
+        s"Spark type was neither a string nor a object. It was: $jValue.")
+    }
+  }
+
+  private def complexToDataType(sparkType: JObject): FieldType = {
+    (sparkType \ "type").extract[String] match {
+      case "array" => ArrayType(toFieldType(sparkType \ "elementType"))
+      case "struct" =>
+        val fields = (sparkType \ "fields").children.map { f =>
+          // TODO: get comment from metadata
+          Field((f \ "name").extract[String], toFieldType(f \ "type"), "")
+        }
+        StructType(fields.toArray)
+      case "map" =>
+        MapType(toFieldType(sparkType \ "keyType"), toFieldType(sparkType \ 
"valueType"))
+      case "udt" => toFieldType(sparkType \ "sqlType")
     }
   }
 
   /**
    * Converts a JSON representing the Spark schema (the one returned by 
`df.schema.json`) into
-   * a Hive [[TableSchema]] instance.
+   * a [[Schema]] instance.
    *
    * @param sparkJson a [[String]] containing the JSON representation of a 
Spark Dataframe schema
-   * @return a [[TableSchema]] representing the schema provided as input
+   * @return a [[Schema]] representing the schema provided as input
    */
-  def toHiveTableSchema(sparkJson: String): TableSchema = {
+  def schemaFromSparkJson(sparkJson: String): Schema = {
     val schema = parse(sparkJson) \ "fields"
     val fields = schema.children.map { field =>
       val name = (field \ "name").extract[String]
-      val hiveType = toHive(field \ "type")
-      new FieldSchema(name, hiveType, "")
-    }
-    new TableSchema(fields.asJava)
-  }
-
-  private def toHive(jValue: JValue): String = {
-    jValue match {
-      case JString(t) => primitiveToHive(t)
-      case o: JObject => complexToHive(o)
-      case _ => throw new IllegalArgumentException(
-        s"Spark type was neither a string nor a object. It was: $jValue.")
-    }
-  }
-
-  private def primitiveToHive(sparkType: String): String = {
-    sparkType match {
-      case "integer" => "int"
-      case "long" => "bigint"
-      case "short" => "smallint"
-      case "byte" => "tinyint"
-      case "null" => "void"
-      // boolean, string, float, double, decimal, date, timestamp are the same
-      case other => other
-    }
-  }
-
-  private def complexToHive(sparkType: JObject): String = {
-    (sparkType \ "type").extract[String] match {
-      case "array" => s"array<${toHive(sparkType \ "elementType")}>"
-      case "struct" =>
-        val fields = (sparkType \ "fields").children.map { f =>
-          s"${(f \ "name").extract[String]}:${toHive(f \ "type")}"
-        }
-        s"struct<${fields.mkString(",")}>"
-      case "map" => s"map<${toHive(sparkType \ "keyType")}, ${toHive(sparkType 
\ "valueType")}>"
-      case "udt" => toHive(sparkType \ "sqlType")
+      val hiveType = toFieldType(field \ "type")
+      // TODO: retrieve comment from metadata
+      Field(name, hiveType, "")
     }
+    Schema(fields.toArray)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/47d3ee6b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/Schema.scala
----------------------------------------------------------------------
diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/Schema.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/Schema.scala
new file mode 100644
index 0000000..6e06474
--- /dev/null
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/types/Schema.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver.types
+
+import org.apache.hive.service.rpc.thrift.{TColumnDesc, TPrimitiveTypeEntry, 
TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
+
+import org.apache.livy.thriftserver.session.DataType
+
+private[thriftserver] trait FieldType {
+  def name: String
+  def dataType: DataType
+}
+
+sealed trait ComplexFieldType extends FieldType {
+  override val dataType = DataType.STRING
+}
+
+case class BasicDataType(name: String) extends FieldType {
+
+  override val dataType: DataType = name match {
+    case "boolean" => DataType.BOOLEAN
+    case "byte" => DataType.BYTE
+    case "short" => DataType.SHORT
+    case "integer" => DataType.INTEGER
+    case "long" => DataType.LONG
+    case "float" => DataType.FLOAT
+    case "double" => DataType.DOUBLE
+    case "binary" => DataType.BINARY
+    case _ => DataType.STRING
+  }
+}
+
+case class StructType(fields: Array[Field]) extends ComplexFieldType {
+  override val name = "struct"
+}
+
+case class ArrayType(elementsType: FieldType) extends ComplexFieldType {
+  val name = "array"
+}
+
+case class MapType(keyType: FieldType, valueType: FieldType) extends 
ComplexFieldType {
+  val name = "map"
+}
+
+case class Field(name: String, fieldType: FieldType, comment: String)
+
+class Schema(val fields: Array[Field]) {
+
+  def toTTableSchema: TTableSchema = {
+    val tTableSchema = new TTableSchema()
+    fields.zipWithIndex.foreach { case (f, idx) =>
+      tTableSchema.addToColumns(Schema.columnDescriptor(f, idx))
+    }
+    tTableSchema
+  }
+}
+
+object Schema {
+  def apply(fields: Field*): Schema = new Schema(fields.toArray)
+
+  def apply(fields: Array[Field]): Schema = new Schema(fields)
+
+  def apply(names: Array[String], types: Array[FieldType]): Schema = {
+    assert(names.length == types.length)
+    val fields = names.zip(types).map { case (n, dt) => Field(n, dt, "") }
+    apply(fields)
+  }
+
+  private def columnDescriptor(field: Field, index: Int): TColumnDesc = {
+    val tColumnDesc = new TColumnDesc
+    tColumnDesc.setColumnName(field.name)
+    tColumnDesc.setComment(field.comment)
+    tColumnDesc.setTypeDesc(toTTypeDesc(field.fieldType.dataType))
+    tColumnDesc.setPosition(index)
+    tColumnDesc
+  }
+
+  private def toTTypeDesc(dt: DataType): TTypeDesc = {
+    val typeId = dt match {
+      case DataType.BOOLEAN => TTypeId.BOOLEAN_TYPE
+      case DataType.BYTE => TTypeId.TINYINT_TYPE
+      case DataType.SHORT => TTypeId.SMALLINT_TYPE
+      case DataType.INTEGER => TTypeId.INT_TYPE
+      case DataType.LONG => TTypeId.BIGINT_TYPE
+      case DataType.FLOAT => TTypeId.FLOAT_TYPE
+      case DataType.DOUBLE => TTypeId.DOUBLE_TYPE
+      case DataType.BINARY => TTypeId.BINARY_TYPE
+      case _ => TTypeId.STRING_TYPE
+    }
+    val primitiveEntry = new TPrimitiveTypeEntry(typeId)
+    val entry = TTypeEntry.primitiveEntry(primitiveEntry)
+    val desc = new TTypeDesc
+    desc.addToTypes(entry)
+    desc
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/47d3ee6b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/ui/ThriftJsonServlet.scala
----------------------------------------------------------------------
diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/ui/ThriftJsonServlet.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/ui/ThriftJsonServlet.scala
index a7db3e9..ad1b5dc 100644
--- 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/ui/ThriftJsonServlet.scala
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/ui/ThriftJsonServlet.scala
@@ -35,7 +35,7 @@ class ThriftJsonServlet(val basePath: String) extends 
JsonServlet {
 
   get("/sessions") {
     val thriftSessions = LivyThriftServer.getInstance.map { server =>
-      val sessionManager = server.getSessionManager()
+      val sessionManager = server.getSessionManager
       sessionManager.getSessions.map { sessionHandle =>
         val info = sessionManager.getSessionInfo(sessionHandle)
         SessionInfo(sessionHandle.getSessionId.toString,

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/47d3ee6b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerBaseTest.scala
----------------------------------------------------------------------
diff --git 
a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerBaseTest.scala
 
b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerBaseTest.scala
index f1fb247..9a7823d 100644
--- 
a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerBaseTest.scala
+++ 
b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerBaseTest.scala
@@ -19,9 +19,7 @@ package org.apache.livy.thriftserver
 
 import java.sql.{Connection, DriverManager, Statement}
 
-import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hive.jdbc.HiveDriver
-import org.apache.hive.service.Service.STATE
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
 import org.apache.livy.LivyConf
@@ -56,13 +54,8 @@ abstract class ThriftServerBaseTest extends FunSuite with 
BeforeAndAfterAll {
 
   override def beforeAll(): Unit = {
     Class.forName(classOf[HiveDriver].getCanonicalName)
-    livyConf.set(s"livy.${HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE}", 
mode.toString)
-    val portConfKey = if (mode == ServerMode.http) {
-      s"livy.${HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT}"
-    } else {
-      s"livy.${HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT}"
-    }
-    livyConf.set(portConfKey, port.toString)
+    livyConf.set(LivyConf.THRIFT_TRANSPORT_MODE, mode.toString)
+    livyConf.set(LivyConf.THRIFT_SERVER_PORT, port)
 
     // Set formatted Spark and Scala version into livy configuration, this 
will be used by
     // session creation.

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/47d3ee6b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
----------------------------------------------------------------------
diff --git 
a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
 
b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
index 6eea6f3..3099436 100644
--- 
a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
+++ 
b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
@@ -77,7 +77,7 @@ class BinaryThriftServerSuite extends ThriftServerBaseTest 
with CommonThriftTest
 
   test("Reuse existing session") {
     withJdbcConnection { _ =>
-      val sessionManager = LivyThriftServer.getInstance.get.getSessionManager()
+      val sessionManager = LivyThriftServer.getInstance.get.getSessionManager
       val sessionHandle = sessionManager.getSessions.head
       // Blocks until the session is ready
       val session = sessionManager.getLivySession(sessionHandle)
@@ -106,7 +106,7 @@ class BinaryThriftServerSuite extends ThriftServerBaseTest 
with CommonThriftTest
       val s1 = c.createStatement()
       s1.execute(s"create database $db")
       s1.close()
-      val sessionManager = LivyThriftServer.getInstance.get.getSessionManager()
+      val sessionManager = LivyThriftServer.getInstance.get.getSessionManager
       val sessionHandle = sessionManager.getSessions.head
       // Blocks until the session is ready
       val session = sessionManager.getLivySession(sessionHandle)

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/47d3ee6b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/RegisterSessionJob.java
----------------------------------------------------------------------
diff --git 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/RegisterSessionJob.java
 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/RegisterSessionJob.java
index 6a6bec3..03bad5d 100644
--- 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/RegisterSessionJob.java
+++ 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/RegisterSessionJob.java
@@ -25,7 +25,6 @@ import org.apache.livy.JobContext;
  * job context.
  */
 public class RegisterSessionJob implements Job<Boolean> {
-
   private final String sessionId;
 
   public RegisterSessionJob() {
@@ -41,5 +40,4 @@ public class RegisterSessionJob implements Job<Boolean> {
     ThriftSessionState.create(ctx, sessionId);
     return true;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/47d3ee6b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/StatementState.java
----------------------------------------------------------------------
diff --git 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/StatementState.java
 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/StatementState.java
index abbe866..5238845 100644
--- 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/StatementState.java
+++ 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/StatementState.java
@@ -22,8 +22,6 @@ import java.util.Iterator;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.types.StructType;
 
-import org.apache.livy.JobContext;
-
 /**
  * State related to one user statement.
  */

Reply via email to