This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git

commit 6166da46337a75d0131c591e4b3aa339961514d2
Author: Kurt Deschler <[email protected]>
AuthorDate: Thu Nov 16 13:11:06 2023 -0500

    HIVE-27873: Fix getOperationStatus and optimize fetch (Kurt Deschler, 
reviewed by Attila Turoczy, Denys Kuzmenko)
    
    This patch fixes a major performance issue fetching result from Impala.
    The problem was that Impala does not set isHasResultSet during
    getOperationStatus() calls, resulting in that RPC getting called and
    logging a completion message for every row fetched. Optimizes the fetch
    path to minimize conditional checks in the fast path.
    
    Closes #4902
---
 .../org/apache/hive/jdbc/HiveQueryResultSet.java   | 120 +++++++++++----------
 1 file changed, 63 insertions(+), 57 deletions(-)

diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java 
b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
index 7acb852cb95..375d1165248 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
@@ -53,6 +53,7 @@ import org.apache.hive.service.rpc.thrift.TRowSet;
 import org.apache.hive.service.rpc.thrift.TTableSchema;
 import org.apache.hive.service.rpc.thrift.TTypeQualifierValue;
 import org.apache.hive.service.rpc.thrift.TTypeQualifiers;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,17 +67,18 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
 
   private TCLIService.Iface client;
   private TOperationHandle stmtHandle;
+  private TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT;
+  private boolean check_operation_status;
   private int maxRows;
   private int fetchSize;
-  private int rowsFetched = 0;
+  private long rowsFetched = 0;
+  private boolean fetchDone = false;
 
   private RowSet fetchedRows;
   private Iterator<Object[]> fetchedRowsItr;
   private boolean isClosed = false;
   private boolean emptyResultSet = false;
   private boolean isScrollable = false;
-  private boolean fetchFirst = false;
-  private TGetOperationStatusResp operationStatus = null;
 
   private final TProtocolVersion protocol;
 
@@ -182,11 +184,10 @@ public class HiveQueryResultSet extends HiveBaseResultSet 
{
     }
     this.emptyResultSet = builder.emptyResultSet;
     this.maxRows = builder.maxRows;
-    if (builder.emptyResultSet) {
-      this.maxRows = 0;
-    }
+    check_operation_status = (statement instanceof HiveStatement);
     this.isScrollable = builder.isScrollable;
     this.protocol = builder.getProtocolVersion();
+    InitEmptyIterator();
   }
 
   /**
@@ -271,6 +272,15 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
     colNames.forEach(i -> normalizedColumnNames.add(i.toLowerCase()));
   }
 
+  private void InitEmptyIterator() throws SQLException {
+    try {
+      fetchedRows = RowSetFactory.create(new TRowSet(), protocol);
+      fetchedRowsItr = fetchedRows.iterator();
+    } catch (TException e) {
+      throw new SQLException(e);
+    }
+  }
+
   @Override
   public void close() throws SQLException {
     if (this.statement != null && (this.statement instanceof HiveStatement)) {
@@ -290,7 +300,7 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
     client = null;
     stmtHandle = null;
     isClosed = true;
-    operationStatus = null;
+    InitEmptyIterator();
   }
 
   private void closeOperationHandle(TOperationHandle stmtHandle) throws 
SQLException {
@@ -307,66 +317,55 @@ public class HiveQueryResultSet extends HiveBaseResultSet 
{
     }
   }
 
-  /**
-   * Moves the cursor down one row from its current position.
-   *
-   * @see java.sql.ResultSet#next()
-   * @throws SQLException
-   *           if a database access error occurs.
-   */
-  public boolean next() throws SQLException {
+  private boolean nextRowBatch() throws SQLException {
     if (isClosed) {
       throw new SQLException("Resultset is closed");
     }
-    if (emptyResultSet || (maxRows > 0 && rowsFetched >= maxRows)) {
+    if ((maxRows > 0 && rowsFetched >= maxRows) || emptyResultSet || 
fetchDone) {
       return false;
     }
-
-    /*
-     * Poll on the operation status, till the operation is complete.
-     * We need to wait only for HiveStatement to complete.
-     * HiveDatabaseMetaData which also uses this ResultSet returns only after 
the RPC is complete.
-     */
-    // when isHasResultSet is set, the query transitioned from running -> 
complete and is not expected go back to
-    // running state when fetching results (implicit state transition)
-    if ((statement instanceof HiveStatement) && (operationStatus == null || 
!operationStatus.isHasResultSet())) {
-      operationStatus = ((HiveStatement) 
statement).waitForOperationToComplete();
+    if (check_operation_status) {
+      TGetOperationStatusResp operationStatus =
+        ((HiveStatement) statement).waitForOperationToComplete();
+      check_operation_status = false;
     }
 
     try {
-      TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT;
-      if (fetchFirst) {
-        // If we are asked to start from beginning, clear the current fetched 
resultset
-        orientation = TFetchOrientation.FETCH_FIRST;
-        fetchedRows = null;
-        fetchedRowsItr = null;
-        fetchFirst = false;
-      }
-      if (fetchedRows == null || !fetchedRowsItr.hasNext()) {
-        TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle,
-            orientation, fetchSize);
-        LOG.debug("HiveQueryResultsFetchReq: {}", fetchReq);
-        TFetchResultsResp fetchResp;
-        fetchResp = client.FetchResults(fetchReq);
-        Utils.verifySuccessWithInfo(fetchResp.getStatus());
-
-        TRowSet results = fetchResp.getResults();
-        fetchedRows = RowSetFactory.create(results, protocol);
-        fetchedRowsItr = fetchedRows.iterator();
+      int fetchSizeBounded = fetchSize;
+      if (maxRows > 0 && rowsFetched + fetchSize > maxRows) {
+        fetchSizeBounded = maxRows - (int)rowsFetched;
       }
+      TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle,
+          orientation, fetchSizeBounded);
+      TFetchResultsResp fetchResp = client.FetchResults(fetchReq);
+      Utils.verifySuccessWithInfo(fetchResp.getStatus());
+      fetchDone = !fetchResp.isHasMoreRows();
+
+      fetchedRows = RowSetFactory.create(fetchResp.getResults(), protocol);
+    } catch (TException ex) {
+      ex.printStackTrace();
+      throw new SQLException("Error retrieving next row", ex);
+    }
 
-      if (!fetchedRowsItr.hasNext()) {
-        return false;
-      }
+    orientation = TFetchOrientation.FETCH_NEXT;
+    fetchedRowsItr = fetchedRows.iterator();
 
-      row = fetchedRowsItr.next();
-      rowsFetched++;
-    } catch (SQLException eS) {
-      throw eS;
-    } catch (Exception ex) {
-      throw new SQLException("Error retrieving next row", ex);
+    return fetchedRowsItr.hasNext();
+  }
+
+  /**
+   * Moves the cursor down one row from its current position.
+   *
+   * @see java.sql.ResultSet#next()
+   * @throws SQLException
+   *           if a database access error occurs.
+   */
+  public boolean next() throws SQLException {
+    if (!fetchedRowsItr.hasNext() && !nextRowBatch()) {
+      return false;
     }
-    // NOTE: fetchOne doesn't throw new 
SQLFeatureNotSupportedException("Method not supported").
+    row = fetchedRowsItr.next();
+    rowsFetched++;
     return true;
   }
 
@@ -430,8 +429,12 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
     if (!isScrollable) {
       throw new SQLException("Method not supported for TYPE_FORWARD_ONLY 
resultset");
     }
-    fetchFirst = true;
+
+    // If we are asked to start from begining, clear the current fetched 
resultset
+    InitEmptyIterator();
+    orientation = TFetchOrientation.FETCH_FIRST;
     rowsFetched = 0;
+    fetchDone = false;
   }
 
   @Override
@@ -444,7 +447,10 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
 
   @Override
   public int getRow() throws SQLException {
-    return rowsFetched;
+    if (rowsFetched > Integer.MAX_VALUE) {
+      throw new SQLException("getRow() result exceeds Int.MAX_VALUE");
+    }
+    return (int)rowsFetched;
   }
 
   @Override

Reply via email to