Repository: spark
Updated Branches:
  refs/heads/master bca43cd63 -> e17a76efd


[SPARK-16563][SQL] fix spark sql thrift server FetchResults bug

## What changes were proposed in this pull request?

Add a constant iterator which point to head of result. The header will be used 
to reset iterator when fetch result from first row repeatedly.
JIRA ticket https://issues.apache.org/jira/browse/SPARK-16563

## How was this patch tested?

This bug was found when using Cloudera HUE connecting to spark sql thrift 
server, currently SQL statement result can be only fetched for once. The fix 
was tested manually with Cloudera HUE, With this fix, HUE can fetch spark SQL 
results repeatedly through thrift server.

Author: Alice <alice.g...@gmail.com>
Author: Alice <g...@garena.com>

Closes #14218 from alicegugu/SparkSQLFetchResultsBug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e17a76ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e17a76ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e17a76ef

Branch: refs/heads/master
Commit: e17a76efdb44837c38388a4d0e62436065cd4dc9
Parents: bca43cd
Author: Alice <alice.g...@gmail.com>
Authored: Mon Aug 8 18:00:04 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Aug 8 18:00:04 2016 -0700

----------------------------------------------------------------------
 .../SparkExecuteStatementOperation.scala        | 12 +++++
 .../thriftserver/HiveThriftServer2Suites.scala  | 48 ++++++++++++++++++++
 2 files changed, 60 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e17a76ef/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index e8bcdd7..b2717ec 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -51,6 +51,7 @@ private[hive] class SparkExecuteStatementOperation(
 
   private var result: DataFrame = _
   private var iter: Iterator[SparkRow] = _
+  private var iterHeader: Iterator[SparkRow] = _
   private var dataTypes: Array[DataType] = _
   private var statementId: String = _
 
@@ -110,6 +111,14 @@ private[hive] class SparkExecuteStatementOperation(
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
     val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, 
getProtocolVersion)
+
+    // Reset iter to header when fetching start from first row
+    if (order.equals(FetchOrientation.FETCH_FIRST)) {
+      val (ita, itb) = iterHeader.duplicate
+      iter = ita
+      iterHeader = itb
+    }
+
     if (!iter.hasNext) {
       resultRowSet
     } else {
@@ -228,6 +237,9 @@ private[hive] class SparkExecuteStatementOperation(
           result.collect().iterator
         }
       }
+      val (itra, itrb) = iter.duplicate
+      iterHeader = itra
+      iter = itrb
       dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
     } catch {
       case e: HiveSQLException =>

http://git-wip-us.apache.org/repos/asf/spark/blob/e17a76ef/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index e388c2a..8f2c4fa 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -36,6 +36,8 @@ import org.apache.hive.service.auth.PlainSaslHelper
 import org.apache.hive.service.cli.GetInfoType
 import org.apache.hive.service.cli.thrift.TCLIService.Client
 import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient
+import org.apache.hive.service.cli.FetchOrientation
+import org.apache.hive.service.cli.FetchType
 import org.apache.thrift.protocol.TBinaryProtocol
 import org.apache.thrift.transport.TSocket
 import org.scalatest.BeforeAndAfterAll
@@ -91,6 +93,52 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest 
{
     }
   }
 
+  test("SPARK-16563 ThriftCLIService FetchResults repeat fetching result") {
+    withCLIServiceClient { client =>
+      val user = System.getProperty("user.name")
+      val sessionHandle = client.openSession(user, "")
+
+      withJdbcStatement { statement =>
+        val queries = Seq(
+          "DROP TABLE IF EXISTS test_16563",
+          "CREATE TABLE test_16563(key INT, val STRING)",
+          s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE 
test_16563")
+
+        queries.foreach(statement.execute)
+        val confOverlay = new java.util.HashMap[java.lang.String, 
java.lang.String]
+        val operationHandle = client.executeStatement(
+          sessionHandle,
+          "SELECT * FROM test_16563",
+          confOverlay)
+
+        // Fetch result first time
+        assertResult(5, "Fetching result first time from next row") {
+
+          val rows_next = client.fetchResults(
+            operationHandle,
+            FetchOrientation.FETCH_NEXT,
+            1000,
+            FetchType.QUERY_OUTPUT)
+
+          rows_next.numRows()
+        }
+
+        // Fetch result second time from first row
+        assertResult(5, "Repeat fetching result from first row") {
+
+          val rows_first = client.fetchResults(
+            operationHandle,
+            FetchOrientation.FETCH_FIRST,
+            1000,
+            FetchType.QUERY_OUTPUT)
+
+          rows_first.numRows()
+        }
+        statement.executeQuery("DROP TABLE IF EXISTS test_16563")
+      }
+    }
+  }
+
   test("JDBC query execution") {
     withJdbcStatement { statement =>
       val queries = Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to