This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.0-preview
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0-preview by this 
push:
     new 8640b90  [SPARK-29349][SQL] Support FETCH_PRIOR in Thriftserver fetch 
request
8640b90 is described below

commit 8640b90a3e69c61b93afdee8a71180810ccf25e5
Author: Juliusz Sompolski <[email protected]>
AuthorDate: Tue Oct 15 23:22:19 2019 -0700

    [SPARK-29349][SQL] Support FETCH_PRIOR in Thriftserver fetch request
    
    ### What changes were proposed in this pull request?
    
    Support FETCH_PRIOR fetching in Thriftserver, and report correct fetch 
start offset it TFetchResultsResp.results.startRowOffset
    
    The semantics of FETCH_PRIOR are as follow: Assuming the previous fetch 
returned a block of rows from offsets [10, 20)
    * calling FETCH_PRIOR(maxRows=5) will scroll back and return rows [5, 10)
    * calling FETCH_PRIOR(maxRows=10) again, will scroll back, but can't go 
earlier than 0. It will nevertheless return 10 rows, returning rows [0, 10) 
(overlapping with the previous fetch)
    * calling FETCH_PRIOR(maxRows=4) again will again return rows starting from 
offset 0 - [0, 4)
    * calling FETCH_NEXT(maxRows=6) after that will move the cursor forward and 
return rows [4, 10)
    
    ##### Client/server backwards/forwards compatibility:
    
    Old driver with new server:
    * Drivers that don't support FETCH_PRIOR will not attempt to use it
    * Field TFetchResultsResp.results.startRowOffset was not set, old drivers 
don't depend on it.
    
    New driver with old server
    * Using an older thriftserver with FETCH_PRIOR will make the thriftserver 
return unsupported operation error. The driver can then recognize that it's an 
old server.
    * Older thriftserver will return 
TFetchResultsResp.results.startRowOffset=0. If the client driver receives 0, it 
can know that it can not rely on it as correct offset. If the client driver 
intentionally wants to fetch from 0, it can use FETCH_FIRST.
    
    ### Why are the changes needed?
    
    It's intended to be used to recover after connection errors. If a client 
lost connection during fetching (e.g. of rows [10, 20)), and wants to reconnect 
and continue, it could not know whether the request  got lost before reaching 
the server, or on the response back. When it issued another FETCH_NEXT(10) 
request after reconnecting, because TFetchResultsResp.results.startRowOffset 
was not set, it could not know if the server will return rows [10,20) (because 
the previous request didn't [...]
    
    Driver should always use FETCH_PRIOR after a broken connection.
    * If the Thriftserver returns unsuported operation error, the driver knows 
that it's an old server that doesn't support it. The driver then must error the 
query, as it will also not support returning the correct startRowOffset, so the 
driver cannot reliably guarantee if it hadn't lost any rows on the fetch cursor.
    * If the driver gets a response to FETCH_PRIOR, it should also have a 
correctly set startRowOffset, which the driver can use to position itself back 
where it left off before the connection broke.
    * If FETCH_NEXT was used after a broken connection on the first fetch, and 
returned with an startRowOffset=0, then the client driver can't know if it's 0 
because it's the older server version, or if it's genuinely 0. Better to call 
FETCH_PRIOR, as scrolling back may anyway be possibly required after a broken 
connection.
    
    This way it is implemented in a backwards/forwards compatible way, and 
doesn't require bumping the protocol version. FETCH_ABSOLUTE might have been 
better, but that would require a bigger protocol change, as there is currently 
no field to specify the requested absolute offset.
    
    ### Does this PR introduce any user-facing change?
    
    ODBC/JDBC drivers connecting to Thriftserver may now implement using the 
FETCH_PRIOR fetch order to scroll back in query results, and check 
TFetchResultsResp.results.startRowOffset if their cursor position is consistent 
after connection errors.
    
    ### How was this patch tested?
    
    Added tests to HiveThriftServer2Suites
    
    Closes #26014 from juliuszsompolski/SPARK-29349.
    
    Authored-by: Juliusz Sompolski <[email protected]>
    Signed-off-by: Yuming Wang <[email protected]>
---
 .../SparkExecuteStatementOperation.scala           | 37 ++++++++-
 .../thriftserver/HiveThriftServer2Suites.scala     | 89 +++++++++++++++++++++-
 sql/hive-thriftserver/v1.2.1/if/TCLIService.thrift |  5 +-
 .../hive/service/cli/operation/Operation.java      |  5 +-
 sql/hive-thriftserver/v2.3.5/if/TCLIService.thrift |  5 +-
 .../hive/service/cli/operation/Operation.java      |  5 +-
 6 files changed, 134 insertions(+), 12 deletions(-)

diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 9ca6c39..90a428d 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -57,7 +57,8 @@ private[hive] class SparkExecuteStatementOperation(
   // This is only used when `spark.sql.thriftServer.incrementalCollect` is set 
to `false`.
   // In case of `true`, this will be `None` and FETCH_FIRST will trigger 
re-execution.
   private var resultList: Option[Array[SparkRow]] = _
-
+  private var previousFetchEndOffset: Long = 0
+  private var previousFetchStartOffset: Long = 0
   private var iter: Iterator[SparkRow] = _
   private var dataTypes: Array[DataType] = _
   private var statementId: String = _
@@ -113,14 +114,18 @@ private[hive] class SparkExecuteStatementOperation(
   }
 
   def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = 
withSchedulerPool {
+    log.info(s"Received getNextRowSet request order=${order} and 
maxRowsL=${maxRowsL} " +
+      s"with ${statementId}")
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
     val resultRowSet: RowSet =
       ThriftserverShimUtils.resultRowSet(getResultSetSchema, 
getProtocolVersion)
 
-    // Reset iter to header when fetching start from first row
-    if (order.equals(FetchOrientation.FETCH_FIRST)) {
+    // Reset iter when FETCH_FIRST or FETCH_PRIOR
+    if ((order.equals(FetchOrientation.FETCH_FIRST) ||
+        order.equals(FetchOrientation.FETCH_PRIOR)) && previousFetchEndOffset 
!= 0) {
+      // Reset the iterator to the beginning of the query.
       iter = if 
(sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
         resultList = None
         result.toLocalIterator.asScala
@@ -132,6 +137,28 @@ private[hive] class SparkExecuteStatementOperation(
       }
     }
 
+    var resultOffset = {
+      if (order.equals(FetchOrientation.FETCH_FIRST)) {
+        logInfo(s"FETCH_FIRST request with $statementId. Resetting to 
resultOffset=0")
+        0
+      } else if (order.equals(FetchOrientation.FETCH_PRIOR)) {
+        // TODO: FETCH_PRIOR should be handled more efficiently than rewinding 
to beginning and
+        // reiterating.
+        val targetOffset = math.max(previousFetchStartOffset - maxRowsL, 0)
+        logInfo(s"FETCH_PRIOR request with $statementId. Resetting to 
resultOffset=$targetOffset")
+        var off = 0
+        while (off < targetOffset && iter.hasNext) {
+          iter.next()
+          off += 1
+        }
+        off
+      } else { // FETCH_NEXT
+        previousFetchEndOffset
+      }
+    }
+
+    resultRowSet.setStartOffset(resultOffset)
+    previousFetchStartOffset = resultOffset
     if (!iter.hasNext) {
       resultRowSet
     } else {
@@ -152,7 +179,11 @@ private[hive] class SparkExecuteStatementOperation(
         }
         resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
         curRow += 1
+        resultOffset += 1
       }
+      previousFetchEndOffset = resultOffset
+      log.info(s"Returning result set with ${curRow} rows from offsets " +
+        s"[$previousFetchStartOffset, $previousFetchEndOffset) with 
$statementId")
       resultRowSet
     }
   }
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 8a5526e..3c8d25d 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets
 import java.sql.{Date, DriverManager, SQLException, Statement}
 import java.util.{Locale, UUID}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.{ExecutionContext, Future, Promise}
@@ -34,7 +35,7 @@ import com.google.common.io.Files
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hive.jdbc.HiveDriver
 import org.apache.hive.service.auth.PlainSaslHelper
-import org.apache.hive.service.cli.{FetchOrientation, FetchType, GetInfoType}
+import org.apache.hive.service.cli.{FetchOrientation, FetchType, GetInfoType, 
RowSet}
 import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient
 import org.apache.thrift.protocol.TBinaryProtocol
 import org.apache.thrift.transport.TSocket
@@ -684,6 +685,92 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftJdbcTest {
       
assert(e.getMessage.contains("org.apache.spark.sql.catalyst.parser.ParseException"))
     }
   }
+
+  test("ThriftCLIService FetchResults FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR") {
+    def checkResult(rows: RowSet, start: Long, end: Long): Unit = {
+      assert(rows.getStartOffset() == start)
+      assert(rows.numRows() == end - start)
+      rows.iterator.asScala.zip((start until end).iterator).foreach { case 
(row, v) =>
+        assert(row(0).asInstanceOf[Long] === v)
+      }
+    }
+
+    withCLIServiceClient { client =>
+      val user = System.getProperty("user.name")
+      val sessionHandle = client.openSession(user, "")
+
+      val confOverlay = new java.util.HashMap[java.lang.String, 
java.lang.String]
+      val operationHandle = client.executeStatement(
+        sessionHandle,
+        "SELECT * FROM range(10)",
+        confOverlay) // 10 rows result with sequence 0, 1, 2, ..., 9
+      var rows: RowSet = null
+
+      // Fetch 5 rows with FETCH_NEXT
+      rows = client.fetchResults(
+        operationHandle, FetchOrientation.FETCH_NEXT, 5, 
FetchType.QUERY_OUTPUT)
+      checkResult(rows, 0, 5) // fetched [0, 5)
+
+      // Fetch another 2 rows with FETCH_NEXT
+      rows = client.fetchResults(
+        operationHandle, FetchOrientation.FETCH_NEXT, 2, 
FetchType.QUERY_OUTPUT)
+      checkResult(rows, 5, 7) // fetched [5, 7)
+
+      // FETCH_PRIOR 3 rows
+      rows = client.fetchResults(
+        operationHandle, FetchOrientation.FETCH_PRIOR, 3, 
FetchType.QUERY_OUTPUT)
+      checkResult(rows, 2, 5) // fetched [2, 5)
+
+      // FETCH_PRIOR again will scroll back to 0, and then the returned result
+      // may overlap the results of previous FETCH_PRIOR
+      rows = client.fetchResults(
+        operationHandle, FetchOrientation.FETCH_PRIOR, 3, 
FetchType.QUERY_OUTPUT)
+      checkResult(rows, 0, 3) // fetched [0, 3)
+
+      // FETCH_PRIOR again will stay at 0
+      rows = client.fetchResults(
+        operationHandle, FetchOrientation.FETCH_PRIOR, 4, 
FetchType.QUERY_OUTPUT)
+      checkResult(rows, 0, 4) // fetched [0, 4)
+
+      // FETCH_NEXT will continue moving forward from offset 4
+      rows = client.fetchResults(
+        operationHandle, FetchOrientation.FETCH_NEXT, 10, 
FetchType.QUERY_OUTPUT)
+      checkResult(rows, 4, 10) // fetched [4, 10) until the end of results
+
+      // FETCH_NEXT is at end of results
+      rows = client.fetchResults(
+        operationHandle, FetchOrientation.FETCH_NEXT, 5, 
FetchType.QUERY_OUTPUT)
+      checkResult(rows, 10, 10) // fetched empty [10, 10) (at end of results)
+
+      // FETCH_NEXT is at end of results again
+      rows = client.fetchResults(
+        operationHandle, FetchOrientation.FETCH_NEXT, 2, 
FetchType.QUERY_OUTPUT)
+      checkResult(rows, 10, 10) // fetched empty [10, 10) (at end of results)
+
+      // FETCH_PRIOR 1 rows yet again
+      rows = client.fetchResults(
+        operationHandle, FetchOrientation.FETCH_PRIOR, 1, 
FetchType.QUERY_OUTPUT)
+      checkResult(rows, 9, 10) // fetched [9, 10)
+
+      // FETCH_NEXT will return 0 yet again
+      rows = client.fetchResults(
+        operationHandle, FetchOrientation.FETCH_NEXT, 5, 
FetchType.QUERY_OUTPUT)
+      checkResult(rows, 10, 10) // fetched empty [10, 10) (at end of results)
+
+      // FETCH_FIRST results from first row
+      rows = client.fetchResults(
+        operationHandle, FetchOrientation.FETCH_FIRST, 3, 
FetchType.QUERY_OUTPUT)
+      checkResult(rows, 0, 3) // fetch [0, 3)
+
+      // Fetch till the end rows with FETCH_NEXT"
+      rows = client.fetchResults(
+        operationHandle, FetchOrientation.FETCH_NEXT, 1000, 
FetchType.QUERY_OUTPUT)
+      checkResult(rows, 3, 10) // fetched [3, 10)
+
+      client.closeOperation(operationHandle)
+      client.closeSession(sessionHandle)
+    }
+  }
 }
 
 class SingleSessionSuite extends HiveThriftJdbcTest {
diff --git a/sql/hive-thriftserver/v1.2.1/if/TCLIService.thrift 
b/sql/hive-thriftserver/v1.2.1/if/TCLIService.thrift
index 7cd6fa3..225e319 100644
--- a/sql/hive-thriftserver/v1.2.1/if/TCLIService.thrift
+++ b/sql/hive-thriftserver/v1.2.1/if/TCLIService.thrift
@@ -1028,7 +1028,6 @@ enum TFetchOrientation {
   FETCH_NEXT,
 
   // Get the previous rowset. The fetch offset is ignored.
-  // NOT SUPPORTED
   FETCH_PRIOR,
 
   // Return the rowset at the given fetch offset relative
@@ -1056,8 +1055,8 @@ struct TFetchResultsReq {
   // Operation from which to fetch results.
   1: required TOperationHandle operationHandle
 
-  // The fetch orientation. For V1 this must be either
-  // FETCH_NEXT or FETCH_FIRST. Defaults to FETCH_NEXT.
+  // The fetch orientation. This must be either
+  // FETCH_NEXT, FETCH_PRIOR or FETCH_FIRST. Defaults to FETCH_NEXT.
   2: required TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT
   
   // Max number of rows that should be returned in
diff --git 
a/sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/Operation.java
 
b/sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/Operation.java
index 19153b6..51bb287 100644
--- 
a/sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/Operation.java
+++ 
b/sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/Operation.java
@@ -58,7 +58,10 @@ public abstract class Operation {
   private long lastAccessTime;
 
   protected static final EnumSet<FetchOrientation> 
DEFAULT_FETCH_ORIENTATION_SET =
-      EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
+      EnumSet.of(
+          FetchOrientation.FETCH_NEXT,
+          FetchOrientation.FETCH_FIRST,
+          FetchOrientation.FETCH_PRIOR);
 
   protected Operation(HiveSession parentSession, OperationType opType, boolean 
runInBackground) {
     this.parentSession = parentSession;
diff --git a/sql/hive-thriftserver/v2.3.5/if/TCLIService.thrift 
b/sql/hive-thriftserver/v2.3.5/if/TCLIService.thrift
index 824b049..9026cd2 100644
--- a/sql/hive-thriftserver/v2.3.5/if/TCLIService.thrift
+++ b/sql/hive-thriftserver/v2.3.5/if/TCLIService.thrift
@@ -1105,7 +1105,6 @@ enum TFetchOrientation {
   FETCH_NEXT,
 
   // Get the previous rowset. The fetch offset is ignored.
-  // NOT SUPPORTED
   FETCH_PRIOR,
 
   // Return the rowset at the given fetch offset relative
@@ -1133,8 +1132,8 @@ struct TFetchResultsReq {
   // Operation from which to fetch results.
   1: required TOperationHandle operationHandle
 
-  // The fetch orientation. For V1 this must be either
-  // FETCH_NEXT or FETCH_FIRST. Defaults to FETCH_NEXT.
+  // The fetch orientation. This must be either
+  // FETCH_NEXT, FETCH_PRIOR or FETCH_FIRST. Defaults to FETCH_NEXT.
   2: required TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT
 
   // Max number of rows that should be returned in
diff --git 
a/sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/Operation.java
 
b/sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/Operation.java
index 788fcde..f26c715 100644
--- 
a/sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/Operation.java
+++ 
b/sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/Operation.java
@@ -64,7 +64,10 @@ public abstract class Operation {
   protected final QueryState queryState;
 
   protected static final EnumSet<FetchOrientation> 
DEFAULT_FETCH_ORIENTATION_SET =
-      EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
+      EnumSet.of(
+          FetchOrientation.FETCH_NEXT,
+          FetchOrientation.FETCH_FIRST,
+          FetchOrientation.FETCH_PRIOR);
 
   protected Operation(HiveSession parentSession, OperationType opType) {
     this(parentSession, null, opType);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to